Skip to content

renamed v4 db objects: load, latest, and history tables, and their id columns #963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ def setUp(self):
cur = cnx.cursor()

# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table epimetric_load")
cur.execute("truncate table epimetric_full")
cur.execute("truncate table epimetric_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
Expand Down Expand Up @@ -83,7 +83,7 @@ def test_caching(self):
''')
self.cur.execute(f'''
INSERT INTO
`signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, `time_type`,
`epimetric_latest` (`epimetric_id`, `signal_key_id`, `geo_key_id`, `time_type`,
`time_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`,
`issue`, `lag`, `missing_value`,
Expand Down
10 changes: 5 additions & 5 deletions integrations/acquisition/covidcast/test_csv_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ def setUp(self):
cur = cnx.cursor()

# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table epimetric_load")
cur.execute("truncate table epimetric_full")
cur.execute("truncate table epimetric_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
Expand Down Expand Up @@ -78,9 +78,9 @@ def apply_lag(expected_epidata):

def verify_timestamps_and_defaults(self):
self.cur.execute('''
select value_updated_timestamp from signal_history
select value_updated_timestamp from epimetric_full
UNION ALL
select value_updated_timestamp from signal_latest''')
select value_updated_timestamp from epimetric_latest''')
for (value_updated_timestamp,) in self.cur:
self.assertGreater(value_updated_timestamp, 0)

Expand Down
8 changes: 4 additions & 4 deletions integrations/acquisition/covidcast/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def _find_matches_for_row(self, row):
cols = "source signal time_type time_value geo_type geo_value issue".split()
results = {}
cur = self._db._cursor
for table in ['signal_latest_v', 'signal_history_v']:
for table in ['epimetric_latest_v', 'epimetric_full_v']:
q = f"SELECT * FROM {table} WHERE "
# NOTE: repr() puts str values in single quotes but simply 'string-ifies' numerics;
# getattr() accesses members by string of their name
Expand All @@ -34,9 +34,9 @@ def test_id_sync(self):
# AUTOINCREMENT pk id from the load table. this test is intended to make sure that they
# appropriately stay in sync with each other

pk_column = 'signal_data_id'
histor_view = 'signal_history_v'
latest_view = 'signal_latest_v'
pk_column = 'epimetric_id'
histor_view = 'epimetric_full_v'
latest_view = 'epimetric_latest_v'

# add a data point
base_row, _ = self._make_placeholder_row()
Expand Down
2 changes: 1 addition & 1 deletion integrations/acquisition/covidcast/test_delete_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def setUp(self):
self._db = Database()
self._db.connect()

for table in "signal_load signal_latest signal_history geo_dim signal_dim".split():
for table in "epimetric_load epimetric_latest epimetric_full geo_dim signal_dim".split():
self._db._cursor.execute(f"TRUNCATE TABLE {table}")


Expand Down
10 changes: 5 additions & 5 deletions integrations/server/test_covidcast_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class CovidcastMetaTests(unittest.TestCase):
}

template = '''
INSERT INTO `signal_latest` (
`signal_data_id`, `signal_key_id`, `geo_key_id`,
INSERT INTO `epimetric_latest` (
`epimetric_id`, `signal_key_id`, `geo_key_id`,
`time_type`, `time_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`,
`issue`, `lag`, `missing_value`,
Expand All @@ -59,9 +59,9 @@ def setUp(self):
cur = cnx.cursor()

# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table epimetric_load")
cur.execute("truncate table epimetric_full")
cur.execute("truncate table epimetric_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
Expand Down
62 changes: 31 additions & 31 deletions src/acquisition/covidcast/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ class Database:

DATABASE_NAME = 'covid'

load_table = "signal_load"
load_table = "epimetric_load"
# if you want to deal with foreign key ids: use table
# if you want to deal with source/signal names, geo type/values, etc: use view
latest_table = "signal_latest"
latest_table = "epimetric_latest"
latest_view = latest_table + "_v"
history_table = "signal_history"
history_table = "epimetric_full"
history_view = history_table + "_v"
# TODO: consider using class variables like this for dimension table names too
# TODO: also consider that for composite key tuples, like short_comp_key and long_comp_key as used in delete_batch()
Expand Down Expand Up @@ -123,20 +123,20 @@ def _reset_load_table_ai_counter(self):
"""Corrects the AUTO_INCREMENT counter in the load table.

To be used in emergencies only, if the load table was accidentally TRUNCATEd.
This ensures any `signal_data_id`s generated by the load table will not collide with the history or latest tables.
This ensures any `epimetric_id`s generated by the load table will not collide with the history or latest tables.
This is also destructive to any data in the load table.
"""

self._cursor.execute(f'DELETE FROM signal_load')
self._cursor.execute(f'DELETE FROM epimetric_load')
# NOTE: 'ones' are used as filler here for the (required) NOT NULL columns.
self._cursor.execute(f"""
INSERT INTO signal_load
(signal_data_id,
INSERT INTO epimetric_load
(epimetric_id,
source, `signal`, geo_type, geo_value, time_type, time_value, issue, `lag`, value_updated_timestamp)
VALUES
((SELECT 1+MAX(signal_data_id) FROM signal_history),
((SELECT 1+MAX(epimetric_id) FROM epimetric_full),
'1', '1', '1', '1', '1', 1, 1, 1, 1);""")
self._cursor.execute(f'DELETE FROM signal_load')
self._cursor.execute(f'DELETE FROM epimetric_load')

def insert_or_update_bulk(self, cc_rows):
return self.insert_or_update_batch(cc_rows)
Expand Down Expand Up @@ -227,7 +227,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False
def run_dbjobs(self):

# we do this LEFT JOIN trick because mysql cant do set difference (aka EXCEPT or MINUS)
# (as in " select distinct source, signal from signal_dim minus select distinct source, signal from signal_load ")
# (as in " select distinct source, signal from signal_dim minus select distinct source, signal from epimetric_load ")
signal_dim_add_new_load = f'''
INSERT INTO signal_dim (`source`, `signal`)
SELECT DISTINCT sl.source, sl.signal
Expand All @@ -245,20 +245,20 @@ def run_dbjobs(self):
WHERE gd.geo_type IS NULL
'''

signal_history_load = f'''
epimetric_full_load = f'''
INSERT INTO {self.history_table}
(signal_data_id, signal_key_id, geo_key_id, issue, data_as_of_dt,
(epimetric_id, signal_key_id, geo_key_id, issue, data_as_of_dt,
time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp,
computation_as_of_dt, missing_value, missing_stderr, missing_sample_size)
SELECT
signal_data_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt,
epimetric_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt,
time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp,
computation_as_of_dt, missing_value, missing_stderr, missing_sample_size
FROM `{self.load_table}` sl
INNER JOIN signal_dim sd USING (source, `signal`)
INNER JOIN geo_dim gd USING (geo_type, geo_value)
ON DUPLICATE KEY UPDATE
`signal_data_id` = sl.`signal_data_id`,
`epimetric_id` = sl.`epimetric_id`,
`value_updated_timestamp` = sl.`value_updated_timestamp`,
`value` = sl.`value`,
`stderr` = sl.`stderr`,
Expand All @@ -269,21 +269,21 @@ def run_dbjobs(self):
`missing_sample_size` = sl.`missing_sample_size`
'''

signal_latest_load = f'''
epimetric_latest_load = f'''
INSERT INTO {self.latest_table}
(signal_data_id, signal_key_id, geo_key_id, issue, data_as_of_dt,
(epimetric_id, signal_key_id, geo_key_id, issue, data_as_of_dt,
time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp,
computation_as_of_dt, missing_value, missing_stderr, missing_sample_size)
SELECT
signal_data_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt,
epimetric_id, sd.signal_key_id, gd.geo_key_id, issue, data_as_of_dt,
time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp,
computation_as_of_dt, missing_value, missing_stderr, missing_sample_size
FROM `{self.load_table}` sl
INNER JOIN signal_dim sd USING (source, `signal`)
INNER JOIN geo_dim gd USING (geo_type, geo_value)
WHERE is_latest_issue = 1
ON DUPLICATE KEY UPDATE
`signal_data_id` = sl.`signal_data_id`,
`epimetric_id` = sl.`epimetric_id`,
`value_updated_timestamp` = sl.`value_updated_timestamp`,
`value` = sl.`value`,
`stderr` = sl.`stderr`,
Expand All @@ -296,7 +296,7 @@ def run_dbjobs(self):
'''

# NOTE: DO NOT `TRUNCATE` THIS TABLE! doing so will ruin the AUTO_INCREMENT counter that the history and latest tables depend on...
signal_load_delete_processed = f'''
epimetric_load_delete_processed = f'''
DELETE FROM `{self.load_table}`
'''

Expand All @@ -313,17 +313,17 @@ def run_dbjobs(self):
time_q.append(time.time())
logger.debug('geo_dim_add_new_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2])

self._cursor.execute(signal_history_load)
self._cursor.execute(epimetric_full_load)
time_q.append(time.time())
logger.debug('signal_history_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2])
logger.debug('epimetric_full_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2])

self._cursor.execute(signal_latest_load)
self._cursor.execute(epimetric_latest_load)
time_q.append(time.time())
logger.debug('signal_latest_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2])
logger.debug('epimetric_latest_load', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2])

self._cursor.execute(signal_load_delete_processed)
self._cursor.execute(epimetric_load_delete_processed)
time_q.append(time.time())
logger.debug('signal_load_delete_processed', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2])
logger.debug('epimetric_load_delete_processed', rows=self._cursor.rowcount, elapsed=time_q[-1]-time_q[-2])
except Exception as e:
raise e

Expand Down Expand Up @@ -395,35 +395,35 @@ def delete_batch(self, cc_deletions):

add_history_id_sql = f'''
UPDATE {tmp_table_name} d INNER JOIN {self.history_view} h USING ({long_comp_key})
SET d.delete_history_id=h.signal_data_id;
SET d.delete_history_id=h.epimetric_id;
'''

# if a row we are deleting also appears in the 'latest' table (with a matching 'issue')...
mark_for_update_latest_sql = f'''
UPDATE {tmp_table_name} d INNER JOIN {self.latest_view} ell USING ({long_comp_key})
SET d.update_latest=1, d.delete_latest_id=ell.signal_data_id;
SET d.update_latest=1, d.delete_latest_id=ell.epimetric_id;
'''

delete_history_sql = f'''
DELETE h FROM {tmp_table_name} d INNER JOIN {self.history_table} h ON d.delete_history_id=h.signal_data_id;
DELETE h FROM {tmp_table_name} d INNER JOIN {self.history_table} h ON d.delete_history_id=h.epimetric_id;
'''

# ...remove it from 'latest'...
delete_latest_sql = f'''
DELETE ell FROM {tmp_table_name} d INNER JOIN {self.latest_table} ell ON d.delete_latest_id=ell.signal_data_id;
DELETE ell FROM {tmp_table_name} d INNER JOIN {self.latest_table} ell ON d.delete_latest_id=ell.epimetric_id;
'''

# ...and re-write that record with its next-latest issue (from 'history') instead.
# NOTE: this must be executed *AFTER* `delete_history_sql` to ensure we get the correct `issue`
# AND also after `delete_latest_sql` so that we dont get a key collision on insert.
update_latest_sql = f'''
INSERT INTO {self.latest_table}
(signal_data_id,
(epimetric_id,
signal_key_id, geo_key_id, time_type, time_value, issue,
value, stderr, sample_size, `lag`, value_updated_timestamp,
missing_value, missing_stderr, missing_sample_size)
SELECT
h.signal_data_id,
h.epimetric_id,
h.signal_key_id, h.geo_key_id, h.time_type, h.time_value, h.issue,
h.value, h.stderr, h.sample_size, h.`lag`, h.value_updated_timestamp,
h.missing_value, h.missing_stderr, h.missing_sample_size
Expand Down
18 changes: 9 additions & 9 deletions src/acquisition/covidcast/migrate_epidata_to_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ def start_tx(cursor):
cursor.execute('SET autocommit=0;') # starts a transaction as suggested in https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html
# NOTE: locks must be specified for any aliases of table names that are used
cursor.execute('''LOCK TABLES epidata.covidcast AS cc READ,
signal_load WRITE, signal_load AS sl WRITE,
signal_history WRITE,
signal_latest WRITE,
epimetric_load WRITE, epimetric_load AS sl WRITE,
epimetric_full WRITE,
epimetric_latest WRITE,
signal_dim WRITE, signal_dim AS sd READ,
geo_dim WRITE, geo_dim AS gd READ;''')
cursor.execute('SET unique_checks=0;')
Expand All @@ -87,7 +87,7 @@ def do_batches(db, start, upper_lim, batch_size):

# NOTE: first rows of column names are identical, second rows are for specifying a rename and a literal
batch_sql = f"""
INSERT INTO signal_load (
INSERT INTO epimetric_load (
`issue`, `source`, `signal`, geo_type, geo_value, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, is_latest_issue, missing_value, missing_stderr, missing_sample_size
) SELECT
`issue`, `source`, `signal`, geo_type, geo_value, time_type, time_value, `value`, stderr, sample_size, `lag`, value_updated_timestamp, is_latest_issue, missing_value, missing_stderr, missing_sample_size
Expand Down Expand Up @@ -150,7 +150,7 @@ def main(destination_schema, batch_size, start_id, upper_lim_override):
if start_id==0:
# clear tables in the v4 schema
print("truncating tables...")
for table in "signal_load signal_latest signal_history geo_dim signal_dim".split():
for table in "epimetric_load epimetric_latest epimetric_full geo_dim signal_dim".split():
db._cursor.execute(f"TRUNCATE TABLE {table}")
db.commit()
start_id = 1
Expand All @@ -160,12 +160,12 @@ def main(destination_schema, batch_size, start_id, upper_lim_override):

# get table counts [the quick and dirty way]
print("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-")
db._cursor.execute(f"SELECT MAX(signal_data_id) FROM signal_history;")
db._cursor.execute(f"SELECT MAX(epimetric_id) FROM epimetric_full;")
for (max_id,) in db._cursor:
print(f"signal_history: {max_id}")
db._cursor.execute(f"SELECT MAX(signal_data_id) FROM signal_latest;")
print(f"epimetric_full: {max_id}")
db._cursor.execute(f"SELECT MAX(epimetric_id) FROM epimetric_latest;")
for (max_id,) in db._cursor:
print(f"signal_latest: {max_id} (this should be <= the number above)")
print(f"epimetric_latest: {max_id} (this should be <= the number above)")
db._cursor.execute(f"SELECT COUNT(signal_key_id), MAX(signal_key_id) FROM signal_dim;")
for (count_id, max_id) in db._cursor:
print(f"signal_dim: count {count_id} / max {max_id}")
Expand Down
2 changes: 1 addition & 1 deletion src/acquisition/covidcast/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def setUp(self):
self._db.connect()

# empty all of the data tables
for table in "signal_load signal_latest signal_history geo_dim signal_dim".split():
for table in "epimetric_load epimetric_latest epimetric_full geo_dim signal_dim".split():
self._db._cursor.execute(f"TRUNCATE TABLE {table};")
self.localSetUp()
self._db._connection.commit()
Expand Down
Loading