Skip to content

[Draft] just-in-time (JIT) meta computations #947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 49 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
3d3b8a7
Remove remaining wip pieces in tests
dshemetov May 26, 2022
c71b3f4
Put long name test back
dshemetov May 26, 2022
090c0df
schema refinement, dbjobs speed increase, and added migration routine
melange396 May 31, 2022
b540699
fix to re-do changes the last commit undid from the commit before it.…
melange396 Jun 2, 2022
dca9e18
removed run_dbjobs_old() and cleaned up table name references
melange396 Jun 2, 2022
d3130de
removed process_status column of load table, moved dbjobs() call to e…
melange396 Jun 2, 2022
d65797f
adding method to fix autoincrement counter
melange396 Jun 14, 2022
3f27050
Update integrations/server/test_covidcast_endpoints.py
melange396 Jun 14, 2022
43fc1da
Update src/acquisition/covidcast/database.py
melange396 Jun 14, 2022
fd3d03f
Update src/server/endpoints/covidcast.py
melange396 Jun 14, 2022
0fceb43
changes from comments on PR
melange396 Jun 15, 2022
33dfbde
Apply suggestions from code review & discussion
krivard Jun 15, 2022
1f2c175
Switch to CREATE TABLE SELECT for signal_latest
krivard Jun 15, 2022
c9a2708
Remove dev setup from prod v4 DDL
krivard Jun 15, 2022
7e28e0f
Specify schema for each DDL file
krivard Jun 15, 2022
83b6988
Merge pull request #903 from cmu-delphi/v4-schema-revisions-release-p…
krivard Jun 15, 2022
aa81eb8
proper cli arguments incl a flag to resume processing (by id)
melange396 Jun 21, 2022
b3ebbac
Bump tzinfo from 1.2.9 to 1.2.10 in /docs
dependabot[bot] Jul 22, 2022
3f0ff0f
Merge pull request #946 from cmu-delphi/dependabot/bundler/docs/tzinf…
krivard Jul 22, 2022
440fa05
Add JIT meta caching
dshemetov Aug 2, 2022
392a0af
Remove some more relative imports for now
dshemetov Aug 3, 2022
45561dc
removed bad index hint, added permuted indexes
melange396 Aug 17, 2022
70c7dce
Merge branch 'dev' into v4-schema-revisions-release-prep
krivard Aug 18, 2022
66976b4
changed dbjobs print() calls to proper logging & removed legacy_id co…
melange396 Aug 18, 2022
47b6bf1
changes to simplify conflict resolution in https://github.com/cmu-del…
melange396 Aug 18, 2022
eece6a9
whoops, missed adding the changes from a test file to previous commit
melange396 Aug 18, 2022
e6f59f3
proper index names for signal_latest
melange396 Aug 18, 2022
65437c6
Merge branch 'v4-schema-revisions-release-prep' into v4-srrpp-migrations
krivard Aug 18, 2022
dc02a64
Merge pull request #922 from cmu-delphi/v4-srrpp-migrations
krivard Aug 19, 2022
5ef37db
JIT tests cleanup:
dshemetov Aug 23, 2022
278ea4c
JIT tests cleanup:
dshemetov Aug 23, 2022
2dc967f
JIT tests cleanup:
dshemetov Aug 23, 2022
cf14d76
incorporation of test improvements from parallel branch (#959)
melange396 Aug 26, 2022
1fe5b9c
renamed v4 db objects: load, latest, and history tables, and their id…
melange396 Aug 29, 2022
0d4188b
cli option for specifying # of metadata worker threads
melange396 Aug 29, 2022
5e3eb51
added flag to Makefile for new percona db image (and some whitespace …
melange396 Aug 30, 2022
b9ab3f3
converted JOINs in VIEWs to 'USING' instead of 'ON'
melange396 Aug 30, 2022
a61a5db
added ANALYZE TABLE to the end of acquisition runs (and some whitespa…
melange396 Aug 30, 2022
ab773f7
help string clarification
melange396 Aug 30, 2022
037e211
en-structure logging statement
krivard Aug 30, 2022
c890712
Merge pull request #964 from cmu-delphi/metadata_threads_and_etc
melange396 Aug 30, 2022
789a404
Fix mocked tests in test_csv_to_database.py
dshemetov Aug 30, 2022
3026299
halt acquisition when unexpected data found in load table
melange396 Aug 31, 2022
a6d53f4
Merge pull request #967 from cmu-delphi/db_load_state_exception
melange396 Sep 1, 2022
09013fa
Merge branch 'v4-schema-revisions-release-prep' into ds/remove-wip-tests
melange396 Sep 1, 2022
2e49339
Merge pull request #917 from cmu-delphi/ds/remove-wip-tests
melange396 Sep 1, 2022
c558fc9
Merge branch 'v4-schema-revisions-release-prep' into ds/jit-update-meta
dshemetov Sep 8, 2022
e8b18d9
Fix tests
dshemetov Sep 8, 2022
ecd49bc
Add recent AB tests and recent changes
dshemetov Oct 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
- name: Start services
run: |
docker network create --driver bridge delphi-net
docker run --rm -d -p 13306:3306 --network delphi-net --name delphi_database_epidata delphi_database_epidata
docker run --rm -d -p 13306:3306 --network delphi-net --name delphi_database_epidata --cap-add=sys_nice delphi_database_epidata
docker run --rm -d -p 10080:80 --env "SQLALCHEMY_DATABASE_URI=mysql+mysqldb://user:pass@delphi_database_epidata:3306/epidata" --env "FLASK_SECRET=abc" --env "FLASK_PREFIX=/epidata" --network delphi-net --name delphi_web_epidata delphi_web_epidata
docker ps

Expand Down
22 changes: 20 additions & 2 deletions dev/docker/database/epidata/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# start with the `delphi_database` image
FROM delphi_database
# start with a standard percona mysql image
FROM percona:ps-8

# percona exits with the mysql user but we need root for additional setup
USER root

# use delphi's timezome
RUN ln -s -f /usr/share/zoneinfo/America/New_York /etc/localtime

# specify a development-only password for the database user "root"
ENV MYSQL_ROOT_PASSWORD pass

# create the `epidata` database
ENV MYSQL_DATABASE epidata
Expand All @@ -8,8 +17,17 @@ ENV MYSQL_DATABASE epidata
ENV MYSQL_USER user
ENV MYSQL_PASSWORD pass

# provide DDL which will configure dev environment at container startup
COPY repos/delphi/delphi-epidata/dev/docker/database/epidata/_init.sql /docker-entrypoint-initdb.d/

# provide DDL which will create empty tables at container startup
COPY repos/delphi/delphi-epidata/src/ddl/*.sql /docker-entrypoint-initdb.d/

# provide additional configuration needed for percona
COPY repos/delphi/delphi-epidata/dev/docker/database/mysql.d/*.cnf /etc/my.cnf.d/

# grant access to SQL scripts
RUN chmod o+r /docker-entrypoint-initdb.d/*.sql

# restore mysql user for percona
USER mysql
2 changes: 2 additions & 0 deletions dev/docker/database/epidata/_init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE DATABASE covid;
GRANT ALL ON covid.* TO 'user';
2 changes: 2 additions & 0 deletions dev/docker/database/mysql.d/my.cnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[mysqld]
default_authentication_plugin=mysql_native_password
17 changes: 9 additions & 8 deletions dev/local/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@
# Creates all prereq images (delphi_database, delphi_python) only if they don't
# exist. If you need to rebuild a prereq, you're probably doing something
# complicated, and can figure out the rebuild command on your own.
#
#
#
#
# Commands:
#
#
# web: Stops currently-running delphi_web_epidata instances, if any.
# Rebuilds delphi_web_epidata image.
# Runs image in the background and pipes stdout to a log file.
#
#
# db: Stops currently-running delphi_database_epidata instances, if any.
# Rebuilds delphi_database_epidata image.
# Runs image in the background and pipes stdout to a log file.
# Blocks until database is ready to receive connections.
#
#
# python: Rebuilds delphi_web_python image. You shouldn't need to do this
# often; only if you are installing a new environment, or have
# made changes to delphi-epidata/dev/docker/python/Dockerfile.
Expand All @@ -35,7 +35,7 @@
#
# clean: Cleans up dangling Docker images.
#
#
#
# Optional arguments:
# pdb=1 Drops you into debug mode upon test failure, if running tests.
# test= Only runs tests in the directories provided here, e.g.
Expand Down Expand Up @@ -105,11 +105,12 @@ db:
@# Run the database
@docker run --rm -p 127.0.0.1:13306:3306 \
--network delphi-net --name delphi_database_epidata \
--cap-add=sys_nice \
delphi_database_epidata >$(LOG_DB) 2>&1 &

@# Block until DB is ready
@while true; do \
sed -n '/Temporary server stopped/,/mysqld: ready for connections/p' $(LOG_DB) | grep "ready for connections" && break; \
sed -n '/mysqld: ready for connections/p' $(LOG_DB) | grep "ready for connections" && break; \
tail -1 $(LOG_DB); \
sleep 1; \
done
Expand All @@ -127,7 +128,7 @@ py:
all: web db py

.PHONY=test
test:
test:
@docker run -i --rm --network delphi-net \
--mount type=bind,source=$(CWD)repos/delphi/delphi-epidata,target=/usr/src/app/repos/delphi/delphi-epidata,readonly \
--mount type=bind,source=$(CWD)repos/delphi/delphi-epidata/src,target=/usr/src/app/delphi/epidata,readonly \
Expand Down
2 changes: 1 addition & 1 deletion docs/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ GEM
thread_safe (0.3.6)
typhoeus (1.4.0)
ethon (>= 0.9.0)
tzinfo (1.2.9)
tzinfo (1.2.10)
thread_safe (~> 0.1)
tzinfo-data (1.2021.1)
tzinfo (>= 1.0.0)
Expand Down
23 changes: 14 additions & 9 deletions integrations/acquisition/covidcast/test_covidcast_meta_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

# first party
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
import delphi.operations.secrets as secrets
import delphi.epidata.acquisition.covidcast.database as live
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.database_meta import DatabaseMeta
from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main

# py3tester coverage target (equivalent to `import *`)
Expand Down Expand Up @@ -40,9 +40,9 @@ def setUp(self):
cur = cnx.cursor()

# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table epimetric_load")
cur.execute("truncate table epimetric_full")
cur.execute("truncate table epimetric_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
Expand Down Expand Up @@ -71,14 +71,19 @@ def test_caching(self):

# insert dummy data
self.cur.execute(f'''
INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig');
INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`)
VALUES
(42, 'src', 'sig');
''')
self.cur.execute(f'''
INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES (96, 'state', 'pa'), (97, 'state', 'wa');
INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`)
VALUES
(96, 'state', 'pa'),
(97, 'state', 'wa');
''')
self.cur.execute(f'''
INSERT INTO
`signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, `time_type`,
`epimetric_latest` (`epimetric_id`, `signal_key_id`, `geo_key_id`, `time_type`,
`time_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`,
`issue`, `lag`, `missing_value`,
Expand All @@ -92,7 +97,7 @@ def test_caching(self):
self.cnx.commit()

# make sure the live utility is serving something sensible
cvc_database = live.Database()
cvc_database = DatabaseMeta()
cvc_database.connect()
epidata1 = cvc_database.compute_covidcast_meta()
cvc_database.disconnect(False)
Expand Down
61 changes: 7 additions & 54 deletions integrations/acquisition/covidcast/test_csv_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.csv_to_database import main
from delphi.epidata.acquisition.covidcast.dbjobs_runner import main as dbjobs_main
import delphi.operations.secrets as secrets

# py3tester coverage target (equivalent to `import *`)
Expand All @@ -37,9 +36,9 @@ def setUp(self):
cur = cnx.cursor()

# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table epimetric_load")
cur.execute("truncate table epimetric_full")
cur.execute("truncate table epimetric_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
Expand Down Expand Up @@ -79,9 +78,9 @@ def apply_lag(expected_epidata):

def verify_timestamps_and_defaults(self):
self.cur.execute('''
select value_updated_timestamp from signal_history
select value_updated_timestamp from epimetric_full
UNION ALL
select value_updated_timestamp from signal_latest''')
select value_updated_timestamp from epimetric_latest''')
for (value_updated_timestamp,) in self.cur:
self.assertGreater(value_updated_timestamp, 0)

Expand All @@ -102,8 +101,6 @@ def test_uploading(self):
log_file=log_file_directory +
"output.log",
data_dir=data_dir,
is_wip_override=False,
not_wip_override=False,
specific_issue_date=False)
uploader_column_rename = {"geo_id": "geo_value", "val": "value", "se": "stderr", "missing_val": "missing_value", "missing_se": "missing_stderr"}

Expand All @@ -123,7 +120,6 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({ "time_value": [20200419] * 3, "signal": [signal_name] * 3, "direction": [None] * 3})], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records")
Expand Down Expand Up @@ -152,7 +148,6 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -187,7 +182,6 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_response = {'result': -2, 'message': 'no results'}
Expand All @@ -213,7 +207,6 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values_df = pd.concat([values, pd.DataFrame({
Expand All @@ -232,42 +225,6 @@ def test_uploading(self):
self.setUp()


with self.subTest("Valid wip"):
values = pd.DataFrame({
"geo_id": ["me", "nd", "wa"],
"val": [10.0, 20.0, 30.0],
"se": [0.01, 0.02, 0.03],
"sample_size": [100.0, 200.0, 300.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3
})
signal_name = "wip_prototype"
values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False)

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
"time_value": [20200419] * 3,
"signal": [signal_name] * 3,
"direction": [None] * 3
})], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records")
expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'}

self.assertEqual(response, expected_response)
self.verify_timestamps_and_defaults()

# Verify that files were archived
path = data_dir + f'/archive/successful/src-name/20200419_state_wip_prototype.csv.gz'
self.assertIsNotNone(os.stat(path))

self.tearDown()
self.setUp()


with self.subTest("Valid signal with name length 32<x<64"):
values = pd.DataFrame({
"geo_id": ["pa"],
Expand All @@ -278,12 +235,11 @@ def test_uploading(self):
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING]
})
signal_name = "wip_really_long_name_that_will_be_accepted"
signal_name = "really_long_name_that_will_be_accepted"
values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False)

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand All @@ -310,12 +266,11 @@ def test_uploading(self):
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING]
})
signal_name = "wip_really_long_name_that_will_get_truncated_lorem_ipsum_dolor_sit_amet"
signal_name = "really_long_name_that_will_get_truncated_lorem_ipsum_dolor_sit_amet"
values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False)

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_response = {'result': -2, 'message': 'no results'}
Expand All @@ -332,7 +287,6 @@ def test_uploading(self):
f.write('this,header,is,wrong\n')

main(args)
dbjobs_main()

path = data_dir + '/archive/failed/src-name/20200420_state_test.csv'
self.assertIsNotNone(os.stat(path))
Expand All @@ -346,7 +300,6 @@ def test_uploading(self):
f.write('file name is wrong\n')

main(args)
dbjobs_main()

path = data_dir + '/archive/failed/unknown/hello.csv'
self.assertIsNotNone(os.stat(path))
Expand Down
Loading