Skip to content

V4 schema revisions candidate #903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
__pycache__/
*.pyc
*~
\#*#
.DS_Store
/.vscode
/delphi-epidata
Expand Down
3 changes: 3 additions & 0 deletions dev/docker/database/epidata/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ ENV MYSQL_DATABASE epidata
ENV MYSQL_USER user
ENV MYSQL_PASSWORD pass

# provide DDL which will configure dev environment at container startup
COPY repos/delphi/delphi-epidata/dev/docker/database/epidata/_init.sql /docker-entrypoint-initdb.d/

# provide DDL which will create empty tables at container startup
COPY repos/delphi/delphi-epidata/src/ddl/*.sql /docker-entrypoint-initdb.d/

Expand Down
2 changes: 2 additions & 0 deletions dev/docker/database/epidata/_init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE DATABASE covid;
GRANT ALL ON covid.* TO 'user';
3 changes: 2 additions & 1 deletion integrations/acquisition/covidcast/delete_batch.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
geo_id,value,stderr,sample_size,issue,time_value,geo_type,signal,source
d_nonlatest,0,0,0,1,0,geo,sig,src
d_latest, 0,0,0,3,0,geo,sig,src
d_latest, 0,0,0,3,0,geo,sig,src
d_justone, 0,0,0,1,0,geo,sig,src
44 changes: 21 additions & 23 deletions integrations/acquisition/covidcast/test_covidcast_meta_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@ def setUp(self):
user='user',
password='pass',
host='delphi_database_epidata',
database='epidata')
database='covid')
cur = cnx.cursor()

# clear the `covidcast` table
cur.execute('truncate table covidcast')
# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = ""')
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"')
cnx.commit()
cur.close()

Expand All @@ -67,30 +71,24 @@ def test_caching(self):

# insert dummy data
self.cur.execute(f'''
INSERT INTO
`covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`,
`time_value`, `geo_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`, `direction_updated_timestamp`,
`direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`,
`missing_stderr`,`missing_sample_size`)
VALUES
(0, 'src', 'sig', 'day', 'state', 20200422, 'pa',
123, 1, 2, 3, 456, 1, 20200422, 0, 1, False, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}),
(0, 'src', 'sig', 'day', 'state', 20200422, 'wa',
789, 1, 2, 3, 456, 1, 20200423, 1, 1, False, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig');
''')
self.cur.execute(f'''
INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES (96, 'state', 'pa'), (97, 'state', 'wa');
''')
self.cur.execute(f'''
INSERT INTO
`covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`,
`time_value`, `geo_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`, `direction_updated_timestamp`,
`direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`,
`missing_stderr`,`missing_sample_size`)
`signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, `time_type`,
`time_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`,
`issue`, `lag`, `missing_value`,
`missing_stderr`,`missing_sample_size`)
VALUES
(100, 'src', 'wip_sig', 'day', 'state', 20200422, 'pa',
456, 4, 5, 6, 789, -1, 20200422, 0, 1, True, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
(15, 42, 96, 'day', 20200422,
123, 1, 2, 3, 20200422, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}),
(16, 42, 97, 'day', 20200422,
789, 1, 2, 3, 20200423, 1, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
''')

self.cnx.commit()

# make sure the live utility is serving something sensible
Expand Down
32 changes: 26 additions & 6 deletions integrations/acquisition/covidcast/test_csv_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.csv_to_database import main
from delphi.epidata.acquisition.covidcast.dbjobs_runner import main as dbjobs_main
import delphi.operations.secrets as secrets

# py3tester coverage target (equivalent to `import *`)
Expand All @@ -32,9 +33,18 @@ def setUp(self):
user='user',
password='pass',
host='delphi_database_epidata',
database='epidata')
database='covid')
cur = cnx.cursor()
cur.execute('truncate table covidcast')

# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"')

cnx.commit()
cur.close()

Expand Down Expand Up @@ -68,11 +78,12 @@ def apply_lag(expected_epidata):
return expected_epidata

def verify_timestamps_and_defaults(self):
self.cur.execute('select value_updated_timestamp, direction_updated_timestamp, direction from covidcast')
for value_updated_timestamp, direction_updated_timestamp, direction in self.cur:
self.cur.execute('''
select value_updated_timestamp from signal_history
UNION ALL
select value_updated_timestamp from signal_latest''')
for (value_updated_timestamp,) in self.cur:
self.assertGreater(value_updated_timestamp, 0)
self.assertEqual(direction_updated_timestamp, 0)
self.assertIsNone(direction)

def test_uploading(self):
"""Scan, parse, upload, archive, serve, and fetch a covidcast signal."""
Expand Down Expand Up @@ -112,6 +123,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({ "time_value": [20200419] * 3, "signal": [signal_name] * 3, "direction": [None] * 3})], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records")
Expand Down Expand Up @@ -140,6 +152,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -174,6 +187,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_response = {'result': -2, 'message': 'no results'}
Expand All @@ -199,6 +213,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values_df = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -232,6 +247,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -267,6 +283,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -298,6 +315,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_response = {'result': -2, 'message': 'no results'}
Expand All @@ -314,6 +332,7 @@ def test_uploading(self):
f.write('this,header,is,wrong\n')

main(args)
dbjobs_main()

path = data_dir + '/archive/failed/src-name/20200420_state_test.csv'
self.assertIsNotNone(os.stat(path))
Expand All @@ -327,6 +346,7 @@ def test_uploading(self):
f.write('file name is wrong\n')

main(args)
dbjobs_main()

path = data_dir + '/archive/failed/unknown/hello.csv'
self.assertIsNotNone(os.stat(path))
Expand Down
90 changes: 90 additions & 0 deletions integrations/acquisition/covidcast/test_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import unittest

from delphi_utils import Nans
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
import delphi.operations.secrets as secrets

# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value

class TestTest(unittest.TestCase):

def setUp(self):
# use the local test instance of the database
secrets.db.host = 'delphi_database_epidata'
secrets.db.epi = ('user', 'pass')

self._db = Database()
self._db.connect()

# empty all of the data tables
for table in "signal_load signal_latest signal_history geo_dim signal_dim".split():
self._db._cursor.execute(f"TRUNCATE TABLE {table}")

def tearDown(self):
# close and destroy conenction to the database
self._db.disconnect(False)
del self._db

def _make_dummy_row(self):
return CovidcastRow('src', 'sig', 'day', 'state', 2022_02_22, 'pa', 2, 22, 222, nmv,nmv,nmv, 2022_02_22, 0)
# cols: ^ timeval v se ssz ^issue ^lag

def _insert_rows(self, rows):
# inserts rows into the database using the full acquisition process, including 'dbjobs' load into history & latest tables
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()
###db._connection.commit() # NOTE: this isnt needed here, but would be if using external access (like through client lib)

def _find_matches_for_row(self, row):
# finds (if existing) row from both history and latest views that matches long-key of provided CovidcastRow
cols = "source signal time_type time_value geo_type geo_value issue".split()
results = {}
cur = self._db._cursor
for table in ['signal_latest_v', 'signal_history_v']:
q = f"SELECT * FROM {table} WHERE "
# NOTE: repr() puts str values in single quotes but simply 'string-ifies' numerics;
# getattr() accesses members by string of their name
q += " AND ".join([f" `{c}` = {repr(getattr(row,c))} " for c in cols])
q += " LIMIT 1;"
cur.execute(q)
res = cur.fetchone()
if res:
results[table] = dict(zip(cur.column_names, res))
else:
results[table] = None
return results

def test_id_sync(self):
# the history and latest tables have a non-AUTOINCREMENT primary key id that is fed by the
# AUTOINCREMENT pk id from the load table. this test is intended to make sure that they
# appropriately stay in sync with each other

pk_column = 'signal_data_id'
histor_view = 'signal_history_v'
latest_view = 'signal_latest_v'

# add a data point
base_row = self._make_dummy_row()
self._insert_rows([base_row])
# ensure the primary keys match in the latest and history tables
matches = self._find_matches_for_row(base_row)
self.assertEqual(matches[latest_view][pk_column],
matches[histor_view][pk_column])
# save old pk value
old_pk_id = matches[latest_view][pk_column]

# add a reissue for said data point
next_row = self._make_dummy_row()
next_row.issue += 1
self._insert_rows([next_row])
# ensure the new keys also match
matches = self._find_matches_for_row(next_row)
self.assertEqual(matches[latest_view][pk_column],
matches[histor_view][pk_column])
# check new ids were used
new_pk_id = matches[latest_view][pk_column]
self.assertNotEqual(old_pk_id, new_pk_id)

# verify old issue is no longer in latest table
self.assertIsNone(self._find_matches_for_row(base_row)[latest_view])
Loading