diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index 4f89c2b1c..3c559be84 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -1,20 +1,26 @@ """Collects and reads covidcast data from a set of local CSV files.""" # standard library -from dataclasses import dataclass -from datetime import date -import glob import os import re +from dataclasses import dataclass +from datetime import date +from glob import glob +from typing import Iterator, NamedTuple, Optional, Tuple # third party -import pandas as pd import epiweeks as epi +import pandas as pd # first party from delphi_utils import Nans from delphi.utils.epiweek import delta_epiweeks -from .logger import get_structured_logger +from delphi.epidata.acquisition.covidcast.database import CovidcastRow +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger + +DFRow = NamedTuple('DFRow', [('geo_id', str), ('value', float), ('stderr', float), ('sample_size', float), ('missing_value', int), ('missing_stderr', int), ('missing_sample_size', int)]) +PathDetails = NamedTuple('PathDetails', [('issue', int), ('lag', int), ('source', str), ('signal', str), ('time_type', str), ('time_value', int), ('geo_type', str)]) + @dataclass class CsvRowValue: @@ -27,6 +33,7 @@ class CsvRowValue: missing_stderr: int missing_sample_size: int + class CsvImporter: """Finds and parses covidcast CSV files.""" @@ -60,6 +67,7 @@ class CsvImporter: "missing_sample_size": "Int64" } + @staticmethod def is_sane_day(value): """Return whether `value` is a sane (maybe not valid) YYYYMMDD date. @@ -76,6 +84,7 @@ def is_sane_day(value): return False return date(year=year,month=month,day=day) + @staticmethod def is_sane_week(value): """Return whether `value` is a sane (maybe not valid) YYYYWW epiweek. @@ -91,22 +100,24 @@ def is_sane_week(value): return False return value + @staticmethod - def find_issue_specific_csv_files(scan_dir, glob=glob): + def find_issue_specific_csv_files(scan_dir): logger = get_structured_logger('find_issue_specific_csv_files') - for path in sorted(glob.glob(os.path.join(scan_dir, '*'))): + for path in sorted(glob(os.path.join(scan_dir, '*'))): issuedir_match = CsvImporter.PATTERN_ISSUE_DIR.match(path.lower()) if issuedir_match and os.path.isdir(path): issue_date_value = int(issuedir_match.group(2)) issue_date = CsvImporter.is_sane_day(issue_date_value) if issue_date: logger.info(event='processing csv files from issue', detail=issue_date, file=path) - yield from CsvImporter.find_csv_files(path, issue=(issue_date, epi.Week.fromdate(issue_date)), glob=glob) + yield from CsvImporter.find_csv_files(path, issue=(issue_date, epi.Week.fromdate(issue_date))) else: logger.warning(event='invalid issue directory day', detail=issue_date_value, file=path) + @staticmethod - def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today())), glob=glob): + def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()))): """Recursively search for and yield covidcast-format CSV files. scan_dir: the directory to scan (recursively) @@ -122,11 +133,11 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() issue_value=-1 lag_value=-1 - for path in sorted(glob.glob(os.path.join(scan_dir, '*', '*'))): - + for path in sorted(glob(os.path.join(scan_dir, '*', '*'))): + # safe to ignore this file if not path.lower().endswith('.csv'): - # safe to ignore this file continue + # match a daily or weekly naming pattern daily_match = CsvImporter.PATTERN_DAILY.match(path.lower()) weekly_match = CsvImporter.PATTERN_WEEKLY.match(path.lower()) @@ -174,7 +185,8 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() yield (path, None) continue - yield (path, (source, signal, time_type, geo_type, time_value, issue_value, lag_value)) + yield (path, PathDetails(issue_value, lag_value, source, signal, time_type, time_value, geo_type)) + @staticmethod def is_header_valid(columns): @@ -182,6 +194,7 @@ def is_header_valid(columns): return set(columns) >= CsvImporter.REQUIRED_COLUMNS + @staticmethod def floaty_int(value: str) -> int: """Cast a string to an int, even if it looks like a float. @@ -195,6 +208,7 @@ def floaty_int(value: str) -> int: raise ValueError('not an int: "%s"' % str(value)) return int(float_value) + @staticmethod def maybe_apply(func, quantity): """Apply the given function to the given quantity if not null-ish.""" @@ -205,6 +219,7 @@ def maybe_apply(func, quantity): else: return func(quantity) + @staticmethod def validate_quantity(row, attr_quantity): """Take a row and validate a given associated quantity (e.g., val, se, stderr). @@ -218,6 +233,7 @@ def validate_quantity(row, attr_quantity): # val was a string or another data return "Error" + @staticmethod def validate_missing_code(row, attr_quantity, attr_name, filepath=None, logger=None): """Take a row and validate the missing code associated with @@ -250,8 +266,9 @@ def validate_missing_code(row, attr_quantity, attr_name, filepath=None, logger=N return missing_entry + @staticmethod - def extract_and_check_row(row, geo_type, filepath=None): + def extract_and_check_row(row: DFRow, geo_type: str, filepath: Optional[str] = None) -> Tuple[Optional[CsvRowValue], Optional[str]]: """Extract and return `CsvRowValue` from a CSV row, with sanity checks. Also returns the name of the field which failed sanity check, or None. @@ -331,8 +348,9 @@ def extract_and_check_row(row, geo_type, filepath=None): # return extracted and validated row values return (CsvRowValue(geo_id, value, stderr, sample_size, missing_value, missing_stderr, missing_sample_size), None) + @staticmethod - def load_csv(filepath, geo_type): + def load_csv(filepath: str, details: PathDetails) -> Iterator[Optional[CovidcastRow]]: """Load, validate, and yield data as `RowValues` from a CSV file. filepath: the CSV file to be loaded @@ -357,9 +375,32 @@ def load_csv(filepath, geo_type): table.rename(columns={"val": "value", "se": "stderr", "missing_val": "missing_value", "missing_se": "missing_stderr"}, inplace=True) for row in table.itertuples(index=False): - row_values, error = CsvImporter.extract_and_check_row(row, geo_type, filepath) + csv_row_values, error = CsvImporter.extract_and_check_row(row, details.geo_type, filepath) + if error: logger.warning(event = 'invalid value for row', detail=(str(row), error), file=filepath) yield None continue - yield row_values + + yield CovidcastRow( + details.source, + details.signal, + details.time_type, + details.geo_type, + details.time_value, + csv_row_values.geo_value, + csv_row_values.value, + csv_row_values.stderr, + csv_row_values.sample_size, + csv_row_values.missing_value, + csv_row_values.missing_stderr, + csv_row_values.missing_sample_size, + details.issue, + details.lag, + # These four fields are unused by database acquisition + # TODO: These will be used when CovidcastRow is updated. + # id=None, + # direction=None, + # direction_updated_timestamp=0, + # value_updated_timestamp=0, + ) diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index 34cbad663..842e820c9 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -4,10 +4,12 @@ import argparse import os import time +from logging import Logger +from typing import Callable, Iterable, Optional, Tuple # first party -from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter -from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow, DBLoadStateException +from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails +from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver from delphi.epidata.acquisition.covidcast.logger import get_structured_logger @@ -28,17 +30,19 @@ def get_argument_parser(): help="filename for log output (defaults to stdout)") return parser -def collect_files(data_dir, specific_issue_date,csv_importer_impl=CsvImporter): + +def collect_files(data_dir: str, specific_issue_date: bool): """Fetch path and data profile details for each file to upload.""" logger= get_structured_logger('collect_files') if specific_issue_date: - results = list(csv_importer_impl.find_issue_specific_csv_files(data_dir)) + results = list(CsvImporter.find_issue_specific_csv_files(data_dir)) else: - results = list(csv_importer_impl.find_csv_files(os.path.join(data_dir, 'receiving'))) + results = list(CsvImporter.find_csv_files(os.path.join(data_dir, 'receiving'))) logger.info(f'found {len(results)} files') return results -def make_handlers(data_dir, specific_issue_date, file_archiver_impl=FileArchiver): + +def make_handlers(data_dir: str, specific_issue_date: bool): if specific_issue_date: # issue-specific uploads are always one-offs, so we can leave all # files in place without worrying about cleaning up @@ -47,7 +51,7 @@ def handle_failed(path_src, filename, source, logger): def handle_successful(path_src, filename, source, logger): logger.info(event='archiving as successful',file=filename) - file_archiver_impl.archive_inplace(path_src, filename) + FileArchiver.archive_inplace(path_src, filename) else: # normal automation runs require some shuffling to remove files # from receiving and place them in the archive @@ -59,22 +63,24 @@ def handle_failed(path_src, filename, source, logger): logger.info(event='archiving as failed - ', detail=source, file=filename) path_dst = os.path.join(archive_failed_dir, source) compress = False - file_archiver_impl.archive_file(path_src, path_dst, filename, compress) + FileArchiver.archive_file(path_src, path_dst, filename, compress) # helper to archive a successful file with compression def handle_successful(path_src, filename, source, logger): logger.info(event='archiving as successful',file=filename) path_dst = os.path.join(archive_successful_dir, source) compress = True - file_archiver_impl.archive_file(path_src, path_dst, filename, compress) + FileArchiver.archive_file(path_src, path_dst, filename, compress) + return handle_successful, handle_failed + def upload_archive( - path_details, - database, - handlers, - logger, - csv_importer_impl=CsvImporter): + path_details: Iterable[Tuple[str, Optional[PathDetails]]], + database: Database, + handlers: Tuple[Callable], + logger: Logger + ): """Upload CSVs to the database and archive them using the specified handlers. :path_details: output from CsvImporter.find*_csv_files @@ -89,20 +95,16 @@ def upload_archive( total_modified_row_count = 0 # iterate over each file for path, details in path_details: - logger.info(event='handling',dest=path) + logger.info(event='handling', dest=path) path_src, filename = os.path.split(path) + # file path or name was invalid, source is unknown if not details: - # file path or name was invalid, source is unknown archive_as_failed(path_src, filename, 'unknown',logger) continue - (source, signal, time_type, geo_type, time_value, issue, lag) = details - - csv_rows = csv_importer_impl.load_csv(path, geo_type) - - cc_rows = CovidcastRow.fromCsvRows(csv_rows, source, signal, time_type, geo_type, time_value, issue, lag) - rows_list = list(cc_rows) + csv_rows = CsvImporter.load_csv(path, details) + rows_list = list(csv_rows) all_rows_valid = rows_list and all(r is not None for r in rows_list) if all_rows_valid: try: @@ -111,12 +113,13 @@ def upload_archive( logger.info( "Inserted database rows", row_count = modified_row_count, - source = source, - signal = signal, - geo_type = geo_type, - time_value = time_value, - issue = issue, - lag = lag) + source = details.source, + signal = details.signal, + geo_type = details.geo_type, + time_value = details.time_value, + issue = details.issue, + lag = details.lag + ) if modified_row_count is None or modified_row_count: # else would indicate zero rows inserted total_modified_row_count += (modified_row_count if modified_row_count else 0) database.commit() @@ -131,40 +134,37 @@ def upload_archive( # archive the current file based on validation results if all_rows_valid: - archive_as_successful(path_src, filename, source, logger) + archive_as_successful(path_src, filename, details.source, logger) else: - archive_as_failed(path_src, filename, source,logger) + archive_as_failed(path_src, filename, details.source, logger) return total_modified_row_count -def main( - args, - database_impl=Database, - collect_files_impl=collect_files, - upload_archive_impl=upload_archive): +def main(args): """Find, parse, and upload covidcast signals.""" logger = get_structured_logger("csv_ingestion", filename=args.log_file) start_time = time.time() # shortcut escape without hitting db if nothing to do - path_details = collect_files_impl(args.data_dir, args.specific_issue_date) + path_details = collect_files(args.data_dir, args.specific_issue_date) if not path_details: logger.info('nothing to do; exiting...') return logger.info("Ingesting CSVs", csv_count = len(path_details)) - database = database_impl() + database = Database() database.connect() try: - modified_row_count = upload_archive_impl( + modified_row_count = upload_archive( path_details, database, make_handlers(args.data_dir, args.specific_issue_date), - logger) + logger + ) logger.info("Finished inserting/updating database rows", row_count = modified_row_count) finally: database.do_analyze() @@ -175,5 +175,6 @@ def main( "Ingested CSVs into database", total_runtime_in_seconds=round(time.time() - start_time, 2)) + if __name__ == '__main__': main(get_argument_parser().parse_args()) diff --git a/src/acquisition/covidcast_nowcast/load_sensors.py b/src/acquisition/covidcast_nowcast/load_sensors.py index f443bbd48..73ce7eee5 100644 --- a/src/acquisition/covidcast_nowcast/load_sensors.py +++ b/src/acquisition/covidcast_nowcast/load_sensors.py @@ -6,7 +6,7 @@ import sqlalchemy import delphi.operations.secrets as secrets -from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter +from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails SENSOR_CSV_PATH = "/common/covidcast_nowcast/receiving/" SUCCESS_DIR = "archive/successful" @@ -52,7 +52,7 @@ def main(csv_path: str = SENSOR_CSV_PATH) -> None: _move_after_processing(filepath, success=True) -def load_and_prepare_file(filepath: str, attributes: tuple) -> pd.DataFrame: +def load_and_prepare_file(filepath: str, attributes: PathDetails) -> pd.DataFrame: """ Read CSV file into a DataFrame and add relevant attributes as new columns to match DB table. @@ -68,15 +68,14 @@ def load_and_prepare_file(filepath: str, attributes: tuple) -> pd.DataFrame: ------- DataFrame with additional attributes added as columns based on filename and current date. """ - source, signal, time_type, geo_type, time_value, issue_value, lag_value = attributes data = pd.read_csv(filepath, dtype=CSV_DTYPES) - data["source"] = source - data["signal"] = signal - data["time_type"] = time_type - data["geo_type"] = geo_type - data["time_value"] = time_value - data["issue"] = issue_value - data["lag"] = lag_value + data["source"] = attributes.source + data["signal"] = attributes.signal + data["time_type"] = attributes.time_type + data["geo_type"] = attributes.geo_type + data["time_value"] = attributes.time_value + data["issue"] = attributes.issue + data["lag"] = attributes.lag data["value_updated_timestamp"] = int(time.time()) return data diff --git a/tests/acquisition/covidcast/test_csv_importer.py b/tests/acquisition/covidcast/test_csv_importer.py index 3d3c22ecc..0906febd1 100644 --- a/tests/acquisition/covidcast/test_csv_importer.py +++ b/tests/acquisition/covidcast/test_csv_importer.py @@ -2,8 +2,7 @@ # standard library import unittest -from unittest.mock import MagicMock -from unittest.mock import patch +from unittest.mock import MagicMock, patch from datetime import date import numpy as np @@ -13,7 +12,7 @@ from delphi_utils import Nans from delphi.utils.epiweek import delta_epiweeks -from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, CsvRowValue +from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, CsvRowValue, PathDetails # py3tester coverage target __test_target__ = 'delphi.epidata.acquisition.covidcast.csv_importer' @@ -32,6 +31,7 @@ def test_is_sane_day(self): self.assertFalse(CsvImporter.is_sane_day(20200199)) self.assertFalse(CsvImporter.is_sane_day(202015)) + def test_is_sane_week(self): """Sanity check some weeks.""" @@ -42,37 +42,38 @@ def test_is_sane_week(self): self.assertFalse(CsvImporter.is_sane_week(202054)) self.assertFalse(CsvImporter.is_sane_week(20200418)) + + @patch("delphi.epidata.acquisition.covidcast.csv_importer.glob") @patch("os.path.isdir") - def test_find_issue_specific_csv_files(self,os_isdir_mock): + def test_find_issue_specific_csv_files(self, mock_os_isdir: MagicMock, mock_glob: MagicMock): """Recursively explore and find issue specific CSV files.""" # check valid path path_prefix='prefix/to/the/data/issue_20200408' - os_isdir_mock.return_value=True + mock_os_isdir.return_value=True issue_path=path_prefix+'ght/20200408_state_rawsearch.csv' - mock_glob = MagicMock() - mock_glob.glob.side_effect = ([path_prefix], [issue_path]) + mock_glob.side_effect = ([path_prefix], [issue_path]) #check if the day is a valid day. issuedir_match= CsvImporter.PATTERN_ISSUE_DIR.match(path_prefix.lower()) issue_date_value = int(issuedir_match.group(2)) self.assertTrue(CsvImporter.is_sane_day(issue_date_value)) - found = set(CsvImporter.find_issue_specific_csv_files(path_prefix, glob=mock_glob)) - self.assertTrue(len(found)>0) + found = set(CsvImporter.find_issue_specific_csv_files(path_prefix)) + self.assertTrue(len(found) > 0) # check unvalid path: path_prefix_invalid='invalid/prefix/to/the/data/issue_20200408' - os_isdir_mock.return_value=False + mock_os_isdir.return_value=False issue_path_invalid=path_prefix_invalid+'ght/20200408_state_rawsearch.csv' - mock_glob_invalid = MagicMock() - mock_glob_invalid.glob.side_effect = ([path_prefix_invalid], [issue_path_invalid]) + mock_glob.side_effect = ([path_prefix_invalid], [issue_path_invalid]) - found = set(CsvImporter.find_issue_specific_csv_files(path_prefix_invalid, glob=mock_glob_invalid)) + found = set(CsvImporter.find_issue_specific_csv_files(path_prefix_invalid)) self.assertFalse(len(found)>0) - def test_find_csv_files(self): + @patch("delphi.epidata.acquisition.covidcast.csv_importer.glob") + def test_find_csv_files(self, mock_glob: MagicMock): """Recursively explore and find CSV files.""" path_prefix = 'prefix/to/the/data/' @@ -96,19 +97,18 @@ def test_find_csv_files(self): # ignored path_prefix + 'ignored/README.md', ] - mock_glob = MagicMock() - mock_glob.glob.return_value = glob_paths + mock_glob.return_value = glob_paths - found = set(CsvImporter.find_csv_files(path_prefix, glob=mock_glob)) + found = set(CsvImporter.find_csv_files(path_prefix)) expected_issue_day=int(date.today().strftime("%Y%m%d")) expected_issue_week=int(str(epi.Week.fromdate(date.today()))) time_value_day = 20200408 expected = set([ - (glob_paths[0], ('fb_survey', 'cli', 'week', 'county', 202015, expected_issue_week, delta_epiweeks(202015, expected_issue_week))), - (glob_paths[1], ('ght', 'rawsearch', 'day', 'state', time_value_day, expected_issue_day, (date.today() - date(year=time_value_day // 10000, month=(time_value_day // 100) % 100, day=time_value_day % 100)).days)), - (glob_paths[2], ('valid', 'sig', 'day', 'nation', time_value_day, expected_issue_day, (date.today() - date(year=time_value_day // 10000, month=(time_value_day // 100) % 100, day=time_value_day % 100)).days)), - (glob_paths[3], ('valid', 'sig', 'day', 'hhs', time_value_day, expected_issue_day, (date.today() - date(year=time_value_day // 10000, month=(time_value_day // 100) % 100, day=time_value_day % 100)).days)), + (glob_paths[0], PathDetails(expected_issue_week, delta_epiweeks(202015, expected_issue_week), 'fb_survey', 'cli', 'week', 202015, 'county')), + (glob_paths[1], PathDetails(expected_issue_day, (date.today() - date(year=time_value_day // 10000, month=(time_value_day // 100) % 100, day=time_value_day % 100)).days, 'ght', 'rawsearch', 'day', time_value_day, 'state')), + (glob_paths[2], PathDetails(expected_issue_day, (date.today() - date(year=time_value_day // 10000, month=(time_value_day // 100) % 100, day=time_value_day % 100)).days, 'valid', 'sig', 'day', time_value_day, 'nation')), + (glob_paths[3], PathDetails(expected_issue_day, (date.today() - date(year=time_value_day // 10000, month=(time_value_day // 100) % 100, day=time_value_day % 100)).days, 'valid', 'sig', 'day', time_value_day, 'hhs')), (glob_paths[4], None), (glob_paths[5], None), (glob_paths[6], None), @@ -116,6 +116,7 @@ def test_find_csv_files(self): ]) self.assertEqual(found, expected) + def test_is_header_valid_allows_extra_columns(self): """Allow and ignore extra columns in the header.""" @@ -124,6 +125,7 @@ def test_is_header_valid_allows_extra_columns(self): self.assertTrue(CsvImporter.is_header_valid(columns)) self.assertTrue(CsvImporter.is_header_valid(columns | {'foo', 'bar'})) + def test_is_header_valid_does_not_depend_on_column_order(self): """Allow columns to appear in any order.""" @@ -132,6 +134,7 @@ def test_is_header_valid_does_not_depend_on_column_order(self): self.assertTrue(CsvImporter.is_header_valid(columns)) + def test_floaty_int(self): """Parse ints that may look like floats.""" @@ -141,6 +144,7 @@ def test_floaty_int(self): with self.assertRaises(ValueError): CsvImporter.floaty_int('-1.1') + def test_maybe_apply(self): """Apply a function to a value as long as it's not null-like.""" @@ -151,6 +155,7 @@ def test_maybe_apply(self): self.assertIsNone(CsvImporter.maybe_apply(float, '')) self.assertIsNone(CsvImporter.maybe_apply(float, None)) + def test_extract_and_check_row(self): """Apply various sanity checks to a row of data.""" @@ -221,21 +226,23 @@ def make_row( self.assertEqual(values.stderr, field.stderr) self.assertEqual(values.sample_size, field.sample_size) + @patch("pandas.read_csv") def test_load_csv_with_invalid_header(self, mock_read_csv): """Bail loading a CSV when the header is invalid.""" data = {'foo': [1, 2, 3]} filepath = 'path/name.csv' - geo_type = 'state' + details = PathDetails(20200101, 0, "src", "name", "day", 20200101, "state") mock_read_csv.return_value = pd.DataFrame(data) - rows = list(CsvImporter.load_csv(filepath, geo_type)) + rows = list(CsvImporter.load_csv(filepath, details)) self.assertTrue(mock_read_csv.called) self.assertTrue(mock_read_csv.call_args[0][0], filepath) self.assertEqual(rows, [None]) + @patch("pandas.read_csv") def test_load_csv_with_valid_header(self, mock_read_csv): """Yield sanity checked `RowValues` from a valid CSV file.""" @@ -248,10 +255,10 @@ def test_load_csv_with_valid_header(self, mock_read_csv): 'sample_size': ['301', '302', '303', '304'], } filepath = 'path/name.csv' - geo_type = 'state' + details = PathDetails(20200101, 0, "src", "name", "day", 20200101, "state") mock_read_csv.return_value = pd.DataFrame(data=data) - rows = list(CsvImporter.load_csv(filepath, geo_type)) + rows = list(CsvImporter.load_csv(filepath, details)) self.assertTrue(mock_read_csv.called) self.assertTrue(mock_read_csv.call_args[0][0], filepath) @@ -285,10 +292,10 @@ def test_load_csv_with_valid_header(self, mock_read_csv): 'missing_sample_size': [Nans.NOT_MISSING] * 2 + [Nans.REGION_EXCEPTION] * 2 + [None] } filepath = 'path/name.csv' - geo_type = 'state' + details = PathDetails(20200101, 0, "src", "name", "day", 20200101, "state") mock_read_csv.return_value = pd.DataFrame(data) - rows = list(CsvImporter.load_csv(filepath, geo_type)) + rows = list(CsvImporter.load_csv(filepath, details)) self.assertTrue(mock_read_csv.called) self.assertTrue(mock_read_csv.call_args[0][0], filepath) diff --git a/tests/acquisition/covidcast/test_csv_to_database.py b/tests/acquisition/covidcast/test_csv_to_database.py index 3f037f6b1..938070944 100644 --- a/tests/acquisition/covidcast/test_csv_to_database.py +++ b/tests/acquisition/covidcast/test_csv_to_database.py @@ -4,8 +4,9 @@ import argparse from typing import Iterable import unittest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch +from delphi.epidata.acquisition.covidcast.csv_importer import PathDetails from delphi.epidata.acquisition.covidcast.csv_to_database import get_argument_parser, main, collect_files, upload_archive, make_handlers # py3tester coverage target @@ -14,55 +15,62 @@ class UnitTests(unittest.TestCase): """Basic unit tests.""" + _path_details = [ + # a good file + ('path/a.csv', PathDetails(20200420, 1, 'src_a', 'sig_a', 'day', 20200419, 'hrr')), + # a file with a data error + ('path/b.csv', PathDetails(202017, 1, 'src_b', 'sig_b', 'week', 202016, 'msa')), + # emulate a file that's named incorrectly + ('path/c.csv', None) + ] def test_get_argument_parser(self): """Return a parser for command-line arguments.""" self.assertIsInstance(get_argument_parser(), argparse.ArgumentParser) - def _path_details(self): - return [ - # a good file - ('path/a.csv', ('src_a', 'sig_a', 'day', 'hrr', 20200419, 20200420, 1)), - # a file with a data error - ('path/b.csv', ('src_b', 'sig_b', 'week', 'msa', 202016, 202017, 1)), - # emulate a file that's named incorrectly - ('path/c.csv', None) - ] - def test_collect_files(self): + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.CsvImporter") + def test_collect_files(self, mock_csv_importer: MagicMock): """Scan the data directory.""" - mock_csv_importer = MagicMock() - mock_csv_importer.find_csv_files.return_value = self._path_details() - collect_files( - "fake_data_dir", - False, # no specific issue - csv_importer_impl=mock_csv_importer) + mock_csv_importer.find_csv_files.return_value = self._path_details + collect_files("fake_data_dir", False) # no specific issue self.assertEqual(mock_csv_importer.find_csv_files.call_count, 1) - - def test_upload_archive(self): + + + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.Database") + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.CsvImporter") + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.FileArchiver") + def test_upload_archive(self, mock_file_archiver: MagicMock, mock_csv_importer: MagicMock, mock_database: MagicMock): """Upload to the database, and archive.""" - def make_row(value): + def make_row(value: float, details: PathDetails): return MagicMock( + source=details.source, + signal=details.signal, + time_type=details.time_type, + geo_type=details.geo_type, + time_value=details.time_value, + issue=details.issue, + lag=details.lag, geo_value=value, value=value, stderr=value, sample_size=value, ) - def load_csv_impl(path, *args): + def load_csv_impl(path, details): if path == 'path/a.csv': # no validation errors - yield make_row('a1') - yield make_row('a2') - yield make_row('a3') + yield make_row('a1', details) + yield make_row('a2', details) + yield make_row('a3', details) elif path == 'path/b.csv': # one validation error - yield make_row('b1') + yield make_row('b1', details) yield None - yield make_row('b3') + yield make_row('b3', details) else: # fail the test for any other path raise Exception('unexpected path') @@ -71,20 +79,16 @@ def iter_len(l: Iterable) -> int: return len(list(l)) data_dir = 'data_dir' - mock_database = MagicMock() mock_database.insert_or_update_bulk = MagicMock(wraps=iter_len) - mock_csv_importer = MagicMock() mock_csv_importer.load_csv = load_csv_impl - mock_file_archiver = MagicMock() mock_logger = MagicMock() modified_row_count = upload_archive( - self._path_details(), + self._path_details, mock_database, - make_handlers(data_dir, False, - file_archiver_impl=mock_file_archiver), - mock_logger, - csv_importer_impl=mock_csv_importer) + make_handlers(data_dir, False), + mock_logger + ) self.assertEqual(modified_row_count, 3) # verify that appropriate rows were added to the database @@ -93,6 +97,7 @@ def iter_len(l: Iterable) -> int: actual_args = [[(a.source, a.signal, a.time_type, a.geo_type, a.time_value, a.geo_value, a.value, a.stderr, a.sample_size, a.issue, a.lag) for a in call.args[0]] for call in call_args_list] + expected_args = [ [('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a1', 'a1', 'a1', 'a1', 20200420, 1), ('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a2', 'a2', 'a2', 'a2', 20200420, 1), @@ -111,109 +116,97 @@ def iter_len(l: Iterable) -> int: ] self.assertEqual(actual_args, expected_args) - def test_main_successful(self): + + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.Database") + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.upload_archive") + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.collect_files") + def test_main_successful(self, mock_collect_files: MagicMock, mock_upload_archive: MagicMock, mock_database: MagicMock): """Run the main program successfully, then commit changes.""" # TODO: use an actual argparse object for the args instead of a MagicMock args = MagicMock(log_file=None, data_dir='data', specific_issue_date=False) - mock_database = MagicMock() - mock_database.count_all_rows.return_value = 0 - fake_database_impl = lambda: mock_database - mock_collect_files = MagicMock() + # `return_value` because we mocked the class constructor + mock_database.return_value.count_all_rows.return_value = 0 mock_collect_files.return_value = [("a",False)] - mock_upload_archive = MagicMock() - - main( - args, - database_impl=fake_database_impl, - collect_files_impl=mock_collect_files, - upload_archive_impl=mock_upload_archive) + main(args) self.assertTrue(mock_collect_files.called) self.assertEqual(mock_collect_files.call_args[0][0], 'data') self.assertTrue(mock_upload_archive.called) self.assertEqual(mock_upload_archive.call_args[0][0], [("a",False)]) - self.assertTrue(mock_database.connect.called) - self.assertTrue(mock_database.disconnect.called) - self.assertTrue(mock_database.disconnect.call_args[0][0]) + self.assertTrue(mock_database.return_value.connect.called) + self.assertTrue(mock_database.return_value.disconnect.called) + self.assertTrue(mock_database.return_value.disconnect.call_args[0][0]) + - def test_main_unsuccessful(self): + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.Database") + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.upload_archive", side_effect=Exception('testing')) + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.collect_files") + def test_main_unsuccessful(self, mock_collect_files: MagicMock, mock_upload_archive: MagicMock, mock_database: MagicMock): """Run the main program with failure, then commit changes.""" # TODO: use an actual argparse object for the args instead of a MagicMock args = MagicMock(log_file=None, data_dir='data', specific_issue_date=False) - mock_database = MagicMock() - mock_database.count_all_rows.return_value = 0 - fake_database_impl = lambda: mock_database - mock_upload_archive = MagicMock(side_effect=Exception('testing')) - mock_collect_files = MagicMock() - mock_collect_files.return_value=[("a",False)] + mock_database.return_value.count_all_rows.return_value = 0 + mock_collect_files.return_value = [("a",False)] with self.assertRaises(Exception): - main( - args, - database_impl=fake_database_impl, - collect_files_impl=mock_collect_files, - upload_archive_impl=mock_upload_archive) + main(args) self.assertTrue(mock_upload_archive.called) self.assertEqual(mock_upload_archive.call_args[0][0], [("a",False)]) - self.assertTrue(mock_database.connect.called) - self.assertTrue(mock_database.disconnect.called) - self.assertTrue(mock_database.disconnect.call_args[0][0]) + self.assertTrue(mock_database.return_value.connect.called) + self.assertTrue(mock_database.return_value.disconnect.called) + self.assertTrue(mock_database.return_value.disconnect.call_args[0][0]) + - def test_main_early_exit(self): + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.Database") + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.upload_archive") + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.collect_files") + def test_main_early_exit(self, mock_collect_files: MagicMock, mock_upload_archive: MagicMock, mock_database: MagicMock): """Run the main program with an empty receiving directory.""" # TODO: use an actual argparse object for the args instead of a MagicMock args = MagicMock(log_file=None, data_dir='data', specific_issue_date=False) - mock_database = MagicMock() mock_database.count_all_rows.return_value = 0 - fake_database_impl = lambda: mock_database - mock_collect_files = MagicMock() mock_collect_files.return_value = [] - mock_upload_archive = MagicMock() - main( - args, - database_impl=fake_database_impl, - collect_files_impl=mock_collect_files, - upload_archive_impl=mock_upload_archive) + main(args) self.assertTrue(mock_collect_files.called) self.assertEqual(mock_collect_files.call_args[0][0], 'data') self.assertFalse(mock_upload_archive.called) - self.assertFalse(mock_database.connect.called) - self.assertFalse(mock_database.disconnect.called) + self.assertFalse(mock_database.return_value.connect.called) + self.assertFalse(mock_database.return_value.disconnect.called) + - def test_database_exception_is_handled(self): + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.Database") + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.CsvImporter") + @patch("delphi.epidata.acquisition.covidcast.csv_to_database.FileArchiver") + def test_database_exception_is_handled(self, mock_file_archiver: MagicMock, mock_csv_importer: MagicMock, mock_database: MagicMock): """Gracefully handle database exceptions.""" data_dir = 'data_dir' - mock_database = MagicMock() mock_database.insert_or_update_bulk.side_effect = Exception('testing') - mock_csv_importer = MagicMock() mock_csv_importer.find_csv_files.return_value = [ - ('path/file.csv', ('src', 'sig', 'day', 'hrr', 20200423, 20200424, 1)), + ('path/file.csv', PathDetails(20200424, 1, 'src', 'sig', 'day', 20200423, 'hrr')), ] mock_csv_importer.load_csv.return_value = [ MagicMock(geo_value='geo', value=1, stderr=1, sample_size=1), ] - mock_file_archiver = MagicMock() mock_logger = MagicMock() upload_archive( - collect_files(data_dir, False, csv_importer_impl=mock_csv_importer), + collect_files(data_dir, False), mock_database, - make_handlers(data_dir, False, file_archiver_impl=mock_file_archiver), - mock_logger, - csv_importer_impl=mock_csv_importer, - ) + make_handlers(data_dir, False), + mock_logger + ) # verify that insertions were attempted self.assertTrue(mock_database.insert_or_update_bulk.called) diff --git a/tests/acquisition/covidcast_nowcast/test_load_sensors.py b/tests/acquisition/covidcast_nowcast/test_load_sensors.py index 0fe96ab17..9b0c5181a 100644 --- a/tests/acquisition/covidcast_nowcast/test_load_sensors.py +++ b/tests/acquisition/covidcast_nowcast/test_load_sensors.py @@ -9,6 +9,7 @@ import pandas as pd # first party +from delphi.epidata.acquisition.covidcast.csv_importer import PathDetails from delphi.epidata.acquisition.covidcast_nowcast.load_sensors import main, load_and_prepare_file # py3tester coverage target @@ -20,13 +21,15 @@ class UpdateTests(unittest.TestCase): @mock.patch('time.time', mock.MagicMock(return_value=12345)) def test_load_and_prepare_file(self): - test_attributes = ("test_source", - "test_signal", - "test_time_type", - "test_geo_type", - 20201231, - 20210102, - 3) + test_attributes = PathDetails( + 20210102, + 3, + "test_source", + "test_signal", + "test_time_type", + 20201231, + "test_geo_type", + ) test_df = load_and_prepare_file(StringIO("sensor_name,geo_value,value\ntestname,01001,1.5"), test_attributes) pd.testing.assert_frame_equal(test_df,