Skip to content

Commit 557098a

Browse files
committed
JIT: add meta caching
1 parent 75c2a12 commit 557098a

File tree

10 files changed

+948
-328
lines changed

10 files changed

+948
-328
lines changed

integrations/acquisition/covidcast/test_covidcast_meta_caching.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010

1111
# first party
1212
from delphi_utils import Nans
13-
from delphi.epidata.client.delphi_epidata import Epidata
1413
import delphi.operations.secrets as secrets
15-
import delphi.epidata.acquisition.covidcast.database as live
14+
from delphi.epidata.client.delphi_epidata import Epidata
15+
from delphi.epidata.acquisition.covidcast.database_meta import DatabaseMeta
1616
from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main
1717

1818
# py3tester coverage target (equivalent to `import *`)
@@ -97,7 +97,7 @@ def test_caching(self):
9797
self.cnx.commit()
9898

9999
# make sure the live utility is serving something sensible
100-
cvc_database = live.Database()
100+
cvc_database = DatabaseMeta()
101101
cvc_database.connect()
102102
epidata1 = cvc_database.compute_covidcast_meta()
103103
cvc_database.disconnect(False)

integrations/server/test_covidcast_meta.py

Lines changed: 99 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,36 @@
11
"""Integration tests for the `covidcast_meta` endpoint."""
22

33
# standard library
4-
import unittest
4+
from datetime import date
5+
from itertools import chain
6+
from typing import Iterable, Optional
57

68
# third party
7-
import mysql.connector
9+
import numpy as np
10+
import pandas as pd
11+
import pytest
812
import requests
913

10-
#first party
14+
# first party
15+
import delphi.operations.secrets as secrets
1116
from delphi_utils import Nans
1217
from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_cache
13-
import delphi.operations.secrets as secrets
18+
from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow
19+
from delphi.epidata.acquisition.covidcast.database_meta import DatabaseMeta
20+
from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase
21+
1422

1523
# use the local instance of the Epidata API
1624
BASE_URL = 'http://delphi_web_epidata/epidata/api.php'
1725

1826

19-
class CovidcastMetaTests(unittest.TestCase):
27+
def _dicts_equal(d1: dict, d2: dict, ignore_keys: Optional[list] = None, rel: Optional[float] = None, abs: Optional[float] = None) -> bool:
28+
"""Compare dictionary values using floating point comparison for numeric values."""
29+
assert set(d1.keys()) == set(d2.keys()), "Dictionary keys should be the same."
30+
return all(d1.get(key) == pytest.approx(d2.get(key), rel=rel, abs=abs, nan_ok=True) for key in d1.keys() if (ignore_keys and key not in ignore_keys))
31+
32+
33+
class TestCovidcastMeta(CovidcastBase):
2034
"""Tests the `covidcast_meta` endpoint."""
2135

2236
src_sig_lookups = {
@@ -47,55 +61,45 @@ class CovidcastMetaTests(unittest.TestCase):
4761
%d, %d)
4862
'''
4963

50-
def setUp(self):
64+
def localSetUp(self):
5165
"""Perform per-test setup."""
5266

53-
# connect to the `epidata` database and clear the `covidcast` table
54-
cnx = mysql.connector.connect(
55-
user='user',
56-
password='pass',
57-
host='delphi_database_epidata',
58-
database='covid')
59-
cur = cnx.cursor()
60-
61-
# clear all tables
62-
cur.execute("truncate table epimetric_load")
63-
cur.execute("truncate table epimetric_full")
64-
cur.execute("truncate table epimetric_latest")
65-
cur.execute("truncate table geo_dim")
66-
cur.execute("truncate table signal_dim")
67-
# reset the `covidcast_meta_cache` table (it should always have one row)
68-
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"')
67+
# connect to the `epidata` database
68+
self.db = DatabaseMeta(base_url="http://delphi_web_epidata/epidata")
69+
self.db.connect(user="user", password="pass", host="delphi_database_epidata", database="covid")
70+
71+
# TODO: Switch when delphi_epidata client is released.
72+
self.db.delphi_epidata = False
6973

7074
# populate dimension tables
7175
for (src,sig) in self.src_sig_lookups:
72-
cur.execute('''
76+
self.db._cursor.execute('''
7377
INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`)
7478
VALUES (%d, '%s', '%s'); ''' % ( self.src_sig_lookups[(src,sig)], src, sig ))
7579
for (gt,gv) in self.geo_lookups:
76-
cur.execute('''
80+
self.db._cursor.execute('''
7781
INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`)
7882
VALUES (%d, '%s', '%s'); ''' % ( self.geo_lookups[(gt,gv)], gt, gv ))
7983

80-
cnx.commit()
81-
cur.close()
84+
self.db._connection.commit()
8285

8386
# initialize counter for tables without non-autoincrement id
8487
self.id_counter = 666
8588

86-
# make connection and cursor available to test cases
87-
self.cnx = cnx
88-
self.cur = cnx.cursor()
89-
9089
# use the local instance of the epidata database
9190
secrets.db.host = 'delphi_database_epidata'
9291
secrets.db.epi = ('user', 'pass')
9392

94-
95-
def tearDown(self):
93+
def localTearDown(self):
9694
"""Perform per-test teardown."""
97-
self.cur.close()
98-
self.cnx.close()
95+
self.db._cursor.close()
96+
self.db._connection.close()
97+
98+
def _insert_rows(self, rows: Iterable[CovidcastRow]):
99+
self.db.insert_or_update_bulk(list(rows))
100+
self.db.run_dbjobs()
101+
self.db._connection.commit()
102+
return rows
99103

100104
def insert_placeholder_data(self):
101105
expected = []
@@ -122,13 +126,13 @@ def insert_placeholder_data(self):
122126
})
123127
for tv in (1, 2):
124128
for gv, v in zip(('geo1', 'geo2'), (10, 20)):
125-
self.cur.execute(self.template % (
129+
self.db._cursor.execute(self.template % (
126130
self._get_id(),
127131
self.src_sig_lookups[(src,sig)], self.geo_lookups[(gt,gv)],
128132
tt, tv, v, tv, # re-use time value for issue
129133
Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING
130134
))
131-
self.cnx.commit()
135+
self.db._connection.commit()
132136
update_cache(args=None)
133137
return expected
134138

@@ -237,3 +241,62 @@ def fetch(**kwargs):
237241
self.assertEqual(len(res['epidata']), len(expected))
238242
self.assertEqual(res['epidata'][0], {})
239243

244+
def test_meta_values(self):
245+
"""This is an A/B test between the old meta compute approach and the new one which relies on an API call for JIT signals.
246+
247+
It relies on synthetic data.
248+
"""
249+
250+
def get_rows_gen(df: pd.DataFrame, filter_nans: bool = False) -> Iterable[CovidcastRow]:
251+
for row in df.itertuples(index=False):
252+
row_dict = row._asdict()
253+
if not filter_nans or (filter_nans and not any(map(pd.isna, row_dict.values()))):
254+
yield CovidcastRow(**row_dict)
255+
256+
start_date = date(2022, 4, 1)
257+
end_date = date(2022, 6, 1)
258+
n = (end_date - start_date).days + 1
259+
260+
# TODO: Build a more complex synthetic dataset here.
261+
# fmt: off
262+
cumulative_df = pd.DataFrame(
263+
{
264+
"source": ["jhu-csse"] * n + ["usa-facts"] * n,
265+
"signal": ["confirmed_cumulative_num"] * n + ["confirmed_cumulative_num"] * (n // 2 - 1) + [np.nan] + ["confirmed_cumulative_num"] * (n // 2),
266+
"time_value": chain(pd.date_range(start_date, end_date), pd.date_range(start_date, end_date)),
267+
"issue": chain(pd.date_range(start_date, end_date), pd.date_range(start_date, end_date)),
268+
"value": chain(range(n), range(n))
269+
}
270+
)
271+
incidence_df = (
272+
cumulative_df.set_index(["source", "time_value"])
273+
.groupby("source")
274+
.apply(lambda df: df.assign(
275+
signal="confirmed_incidence_num",
276+
value=df.value.diff(),
277+
issue=[max(window) if window.size >= 2 else np.nan for window in df.issue.rolling(2)]
278+
)
279+
)
280+
).reset_index()
281+
smoothed_incidence_df = (
282+
cumulative_df.set_index(["source", "time_value"])
283+
.groupby("source")
284+
.apply(lambda df: df.assign(
285+
signal="confirmed_7dav_incidence_num",
286+
value=df.value.rolling(7).mean().diff(),
287+
issue=[max(window) if window.size >= 7 else np.nan for window in df.issue.rolling(7)]
288+
)
289+
)
290+
).reset_index()
291+
# fmt: on
292+
293+
self._insert_rows(get_rows_gen(cumulative_df, filter_nans=True))
294+
self._insert_rows(get_rows_gen(incidence_df, filter_nans=True))
295+
self._insert_rows(get_rows_gen(smoothed_incidence_df, filter_nans=True))
296+
297+
meta_values = self.db.compute_covidcast_meta(jit=False)
298+
meta_values2 = self.db.compute_covidcast_meta(jit=True)
299+
300+
out = [_dicts_equal(x, y, ignore_keys=["max_lag"]) for x, y in zip(meta_values, meta_values2)]
301+
302+
assert all(out)

0 commit comments

Comments
 (0)