Skip to content

[JIT] meta caching system #995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: jit_computations
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

# first party
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
import delphi.operations.secrets as secrets
import delphi.epidata.acquisition.covidcast.database as live
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.database_meta import DatabaseMeta
from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main

# py3tester coverage target (equivalent to `import *`)
Expand Down Expand Up @@ -97,7 +97,7 @@ def test_caching(self):
self.cnx.commit()

# make sure the live utility is serving something sensible
cvc_database = live.Database()
cvc_database = DatabaseMeta()
cvc_database.connect()
epidata1 = cvc_database.compute_covidcast_meta()
cvc_database.disconnect(False)
Expand Down
15 changes: 8 additions & 7 deletions integrations/acquisition/covidcast/test_db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import unittest

from delphi_utils import Nans
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow, DBLoadStateException

from delphi.epidata.acquisition.covidcast.database import DBLoadStateException
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow
from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase
import delphi.operations.secrets as secrets


# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value

Expand All @@ -31,8 +32,8 @@ def _find_matches_for_row(self, row):

def test_insert_or_update_with_nonempty_load_table(self):
# make rows
a_row = self._make_placeholder_row()[0]
another_row = self._make_placeholder_row(time_value=self.DEFAULT_TIME_VALUE+1, issue=self.DEFAULT_ISSUE+1)[0]
a_row = CovidcastRow(time_value=20200202)
another_row = CovidcastRow(time_value=20200203, issue=20200203)
# insert one
self._db.insert_or_update_bulk([a_row])
# put something into the load table
Expand Down Expand Up @@ -61,7 +62,7 @@ def test_id_sync(self):
latest_view = 'epimetric_latest_v'

# add a data point
base_row, _ = self._make_placeholder_row()
base_row = CovidcastRow()
self._insert_rows([base_row])
# ensure the primary keys match in the latest and history tables
matches = self._find_matches_for_row(base_row)
Expand All @@ -71,7 +72,7 @@ def test_id_sync(self):
old_pk_id = matches[latest_view][pk_column]

# add a reissue for said data point
next_row, _ = self._make_placeholder_row()
next_row = CovidcastRow()
next_row.issue += 1
self._insert_rows([next_row])
# ensure the new keys also match
Expand Down
7 changes: 2 additions & 5 deletions integrations/acquisition/covidcast/test_delete_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
import unittest
from os import path

# third party
import mysql.connector

# first party
from delphi_utils import Nans
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
import delphi.operations.secrets as secrets
from delphi.epidata.acquisition.covidcast.database import Database
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow

# py3tester coverage target (equivalent to `import *`)
__test_target__ = 'delphi.epidata.acquisition.covidcast.database'
Expand Down
110 changes: 55 additions & 55 deletions integrations/client/test_delphi_epidata.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
"""Integration tests for delphi_epidata.py."""

# standard library
import unittest
import time
from unittest.mock import patch, MagicMock
from json import JSONDecodeError
from unittest.mock import MagicMock, patch

# third party
from aiohttp.client_exceptions import ClientResponseError
import mysql.connector
# first party
import pytest
from aiohttp.client_exceptions import ClientResponseError

# first party
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
# third party
import delphi.operations.secrets as secrets
from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_covidcast_meta_cache
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow
from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase
import delphi.operations.secrets as secrets
from delphi.epidata.client.delphi_epidata import Epidata
from delphi_utils import Nans


# py3tester coverage target
__test_target__ = 'delphi.epidata.client.delphi_epidata'
# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value
IGNORE_FIELDS = ["id", "direction_updated_timestamp", "value_updated_timestamp", "source", "time_type", "geo_type"]

def fake_epidata_endpoint(func):
"""This can be used as a decorator to enable a bogus Epidata endpoint to return 404 responses."""
Expand All @@ -30,9 +32,6 @@ def wrapper(*args):
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php'
return wrapper

# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value

class DelphiEpidataPythonClientTests(CovidcastBase):
"""Tests the Python client."""

Expand All @@ -54,12 +53,12 @@ def test_covidcast(self):

# insert placeholder data: three issues of one signal, one issue of another
rows = [
self._make_placeholder_row(issue=self.DEFAULT_ISSUE + i, value=i, lag=i)[0]
CovidcastRow(issue=20200202 + i, value=i, lag=i)
for i in range(3)
]
row_latest_issue = rows[-1]
rows.append(
self._make_placeholder_row(signal="sig2")[0]
CovidcastRow(signal="sig2")
)
self._insert_rows(rows)

Expand All @@ -70,10 +69,11 @@ def test_covidcast(self):
)

expected = [
self.expected_from_row(row_latest_issue),
self.expected_from_row(rows[-1])
row_latest_issue.as_dict(ignore_fields=IGNORE_FIELDS),
rows[-1].as_dict(ignore_fields=IGNORE_FIELDS)
]

self.assertEqual(response['epidata'], expected)
# check result
self.assertEqual(response, {
'result': 1,
Expand All @@ -89,10 +89,10 @@ def test_covidcast(self):

expected = [{
rows[0].signal: [
self.expected_from_row(row_latest_issue, self.DEFAULT_MINUS + ['signal']),
row_latest_issue.as_dict(ignore_fields=IGNORE_FIELDS + ['signal']),
],
rows[-1].signal: [
self.expected_from_row(rows[-1], self.DEFAULT_MINUS + ['signal']),
rows[-1].as_dict(ignore_fields=IGNORE_FIELDS + ['signal']),
],
}]

Expand All @@ -109,12 +109,12 @@ def test_covidcast(self):
**self.params_from_row(rows[0])
)

expected = self.expected_from_row(row_latest_issue)
expected = [row_latest_issue.as_dict(ignore_fields=IGNORE_FIELDS)]

# check result
self.assertEqual(response_1, {
'result': 1,
'epidata': [expected],
'epidata': expected,
'message': 'success',
})

Expand All @@ -124,13 +124,13 @@ def test_covidcast(self):
**self.params_from_row(rows[0], as_of=rows[1].issue)
)

expected = self.expected_from_row(rows[1])
expected = [rows[1].as_dict(ignore_fields=IGNORE_FIELDS)]

# check result
self.maxDiff=None
self.assertEqual(response_1a, {
'result': 1,
'epidata': [expected],
'epidata': expected,
'message': 'success',
})

Expand All @@ -141,8 +141,8 @@ def test_covidcast(self):
)

expected = [
self.expected_from_row(rows[0]),
self.expected_from_row(rows[1])
rows[0].as_dict(ignore_fields=IGNORE_FIELDS),
rows[1].as_dict(ignore_fields=IGNORE_FIELDS)
]

# check result
Expand All @@ -158,12 +158,12 @@ def test_covidcast(self):
**self.params_from_row(rows[0], lag=2)
)

expected = self.expected_from_row(row_latest_issue)
expected = [row_latest_issue.as_dict(ignore_fields=IGNORE_FIELDS)]

# check result
self.assertDictEqual(response_3, {
'result': 1,
'epidata': [expected],
'epidata': expected,
'message': 'success',
})
with self.subTest(name='long request'):
Expand Down Expand Up @@ -223,16 +223,16 @@ def test_geo_value(self):
# insert placeholder data: three counties, three MSAs
N = 3
rows = [
self._make_placeholder_row(geo_type="county", geo_value=str(i)*5, value=i)[0]
CovidcastRow(geo_type="county", geo_value=str(i)*5, value=i)
for i in range(N)
] + [
self._make_placeholder_row(geo_type="msa", geo_value=str(i)*5, value=i*10)[0]
CovidcastRow(geo_type="msa", geo_value=str(i)*5, value=i*10)
for i in range(N)
]
self._insert_rows(rows)

counties = [
self.expected_from_row(rows[i]) for i in range(N)
rows[i].as_dict(ignore_fields=IGNORE_FIELDS) for i in range(N)
]

def fetch(geo):
Expand All @@ -241,31 +241,31 @@ def fetch(geo):
)

# test fetch all
r = fetch('*')
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], counties)
request = fetch('*')
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], counties)
# test fetch a specific region
r = fetch('11111')
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], [counties[1]])
request = fetch('11111')
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], [counties[1]])
# test fetch a specific yet not existing region
r = fetch('55555')
self.assertEqual(r['message'], 'no results')
request = fetch('55555')
self.assertEqual(request['message'], 'no results')
# test fetch a multiple regions
r = fetch(['11111', '22222'])
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], [counties[1], counties[2]])
request = fetch(['11111', '22222'])
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], [counties[1], counties[2]])
# test fetch a multiple regions in another variant
r = fetch(['00000', '22222'])
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], [counties[0], counties[2]])
request = fetch(['00000', '22222'])
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], [counties[0], counties[2]])
# test fetch a multiple regions but one is not existing
r = fetch(['11111', '55555'])
self.assertEqual(r['message'], 'success')
self.assertEqual(r['epidata'], [counties[1]])
request = fetch(['11111', '55555'])
self.assertEqual(request['message'], 'success')
self.assertEqual(request['epidata'], [counties[1]])
# test fetch a multiple regions but specify no region
r = fetch([])
self.assertEqual(r['message'], 'no results')
request = fetch([])
self.assertEqual(request['message'], 'no results')

def test_covidcast_meta(self):
"""Test that the covidcast_meta endpoint returns expected data."""
Expand All @@ -275,7 +275,7 @@ def test_covidcast_meta(self):
# 2nd issue: 1 11 21
# 3rd issue: 2 12 22
rows = [
self._make_placeholder_row(time_value=self.DEFAULT_TIME_VALUE + t, issue=self.DEFAULT_ISSUE + i, value=t*10 + i)[0]
CovidcastRow(time_value=2020_02_02 + t, issue=2020_02_02 + i, value=t*10 + i)
for i in range(3) for t in range(3)
]
self._insert_rows(rows)
Expand All @@ -299,14 +299,14 @@ def test_covidcast_meta(self):
signal=rows[0].signal,
time_type=rows[0].time_type,
geo_type=rows[0].geo_type,
min_time=self.DEFAULT_TIME_VALUE,
max_time=self.DEFAULT_TIME_VALUE + 2,
min_time=2020_02_02,
max_time=2020_02_02 + 2,
num_locations=1,
min_value=2.,
mean_value=12.,
max_value=22.,
stdev_value=8.1649658, # population stdev, not sample, which is 10.
max_issue=self.DEFAULT_ISSUE + 2,
max_issue=2020_02_02 + 2,
min_lag=0,
max_lag=0, # we didn't set lag when inputting data
)
Expand All @@ -322,10 +322,10 @@ def test_async_epidata(self):
# insert placeholder data: three counties, three MSAs
N = 3
rows = [
self._make_placeholder_row(geo_type="county", geo_value=str(i)*5, value=i)[0]
CovidcastRow(geo_type="county", geo_value=str(i)*5, value=i)
for i in range(N)
] + [
self._make_placeholder_row(geo_type="msa", geo_value=str(i)*5, value=i*10)[0]
CovidcastRow(geo_type="msa", geo_value=str(i)*5, value=i*10)
for i in range(N)
]
self._insert_rows(rows)
Expand Down
Loading