diff --git a/integrations/acquisition/covidcast/test_csv_uploading.py b/integrations/acquisition/covidcast/test_csv_uploading.py index f975ecfa0..d0ec82657 100644 --- a/integrations/acquisition/covidcast/test_csv_uploading.py +++ b/integrations/acquisition/covidcast/test_csv_uploading.py @@ -14,11 +14,11 @@ # first party from delphi_utils import Nans from delphi.epidata.client.delphi_epidata import Epidata -from delphi.epidata.acquisition.covidcast.csv_to_database import main +from delphi.epidata.acquisition.covidcast.csv_importer import main import delphi.operations.secrets as secrets # py3tester coverage target (equivalent to `import *`) -__test_target__ = 'delphi.epidata.acquisition.covidcast.csv_to_database' +__test_target__ = 'delphi.epidata.acquisition.covidcast.csv_importer' class CsvUploadingTests(unittest.TestCase): diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index 0fa936802..2a12bec58 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -1,22 +1,29 @@ -"""Collects and reads covidcast data from a set of local CSV files.""" +""" +Collects and reads covidcast data from a set of local CSV files. -# standard library +Imports covidcast CSVs and stores them in the epidata database. +""" + +import argparse import os import re +import time from dataclasses import dataclass from datetime import date from glob import glob -from typing import Iterator, NamedTuple, Optional, Tuple +from logging import Logger +from typing import Callable, Iterable, List, NamedTuple, Optional, Tuple -# third party import epiweeks as epi +import numpy as np import pandas as pd - -# first party -from delphi_utils import Nans -from delphi.utils.epiweek import delta_epiweeks from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow +from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException +from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi.utils.epiweek import delta_epiweeks +from delphi_utils import Nans + DataFrameRow = NamedTuple('DFRow', [ ('geo_id', str), @@ -49,6 +56,24 @@ class CsvRowValue: missing_stderr: int missing_sample_size: int +class GeoIdSanityCheckException(ValueError): + + def __init__(self, message, geo_id=None): + self.message = message + self.geo_id = geo_id + +class GeoTypeSanityCheckException(ValueError): + + def __init__(self, message, geo_type=None): + self.message = message + self.geo_type = geo_type + +class ValueSanityCheckException(ValueError): + + def __init__(self, message, value=None): + self.message = message + self.value = value + class CsvImporter: """Finds and parses covidcast CSV files.""" @@ -208,83 +233,11 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() def is_header_valid(columns): """Return whether the given pandas columns contains the required fields.""" - return set(columns) >= CsvImporter.REQUIRED_COLUMNS - - - @staticmethod - def floaty_int(value: str) -> int: - """Cast a string to an int, even if it looks like a float. - - For example, "-1" and "-1.0" should both result in -1. Non-integer floats - will cause `ValueError` to be reaised. - """ - - float_value = float(value) - if not float_value.is_integer(): - raise ValueError('not an int: "%s"' % str(value)) - return int(float_value) - - - @staticmethod - def maybe_apply(func, quantity): - """Apply the given function to the given quantity if not null-ish.""" - if str(quantity).lower() in ('inf', '-inf'): - raise ValueError("Quantity given was an inf.") - elif str(quantity).lower() in ('', 'na', 'nan', 'none'): - return None - else: - return func(quantity) - - - @staticmethod - def validate_quantity(row, attr_quantity): - """Take a row and validate a given associated quantity (e.g., val, se, stderr). - - Returns either a float, a None, or "Error". - """ - try: - quantity = CsvImporter.maybe_apply(float, getattr(row, attr_quantity)) - return quantity - except (ValueError, AttributeError): - # val was a string or another data - return "Error" - - - @staticmethod - def validate_missing_code(row, attr_quantity, attr_name, filepath=None, logger=None): - """Take a row and validate the missing code associated with - a quantity (e.g., val, se, stderr). - - Returns either a nan code for assignment to the missing quantity - or a None to signal an error with the missing code. We decline - to infer missing codes except for a very simple cases; the default - is to produce an error so that the issue can be fixed in indicators. - """ - logger = get_structured_logger('load_csv') if logger is None else logger - missing_entry = getattr(row, "missing_" + attr_name, None) - - try: - missing_entry = CsvImporter.floaty_int(missing_entry) # convert from string to float to int - except (ValueError, TypeError): - missing_entry = None - - if missing_entry is None and attr_quantity is not None: - return Nans.NOT_MISSING.value - if missing_entry is None and attr_quantity is None: - return Nans.OTHER.value - - if missing_entry != Nans.NOT_MISSING.value and attr_quantity is not None: - logger.warning(event = f"missing_{attr_name} column contradicting {attr_name} presence.", detail = (str(row)), file = filepath) - return Nans.NOT_MISSING.value - if missing_entry == Nans.NOT_MISSING.value and attr_quantity is None: - logger.warning(event = f"missing_{attr_name} column contradicting {attr_name} presence.", detail = (str(row)), file = filepath) - return Nans.OTHER.value - - return missing_entry + return CsvImporter.REQUIRED_COLUMNS.issubset(set(columns)) @staticmethod - def extract_and_check_row(row: DataFrameRow, geo_type: str, filepath: Optional[str] = None) -> Tuple[Optional[CsvRowValue], Optional[str]]: + def extract_and_check_row(geo_type: str, table: pd.DataFrame, details: PathDetails) -> pd.DataFrame: """Extract and return `CsvRowValue` from a CSV row, with sanity checks. Also returns the name of the field which failed sanity check, or None. @@ -293,80 +246,92 @@ def extract_and_check_row(row: DataFrameRow, geo_type: str, filepath: Optional[s geo_type: the geographic resolution of the file """ - # use consistent capitalization (e.g. for states) - try: - geo_id = row.geo_id.lower() - except AttributeError: - # geo_id was `None` - return (None, 'geo_id') + def validate_geo_code(fail_mask: pd.Series, geo_type: str): + validation_fails = table[fail_mask] + if not validation_fails.empty: + first_fail = validation_fails.iloc[0] + raise GeoIdSanityCheckException(f'Invalid geo_id for {geo_type}', geo_id=first_fail["geo_id"]) + + def validate_quantity(column: pd.Series): + """Validate a column of a table using a validation function.""" + infinities = column[column.isin([float('inf'), float('-inf')])] + if not infinities.empty: + first_fail = infinities.iloc[0] + raise ValueSanityCheckException(f'Invalid infinite value in {column.name}: {first_fail}', first_fail) + + negative_values = column[column.lt(0)] + if not negative_values.empty: + first_fail = negative_values.iloc[0] + raise ValueSanityCheckException(f'Invalid negative value in {column.name}: {first_fail}', first_fail) + + return column + + def validate_missing_code(missing_code: pd.Series, column: pd.Series): + """Take a row and validate the missing code associated with + a quantity (e.g., val, se, stderr). + + Returns either a nan code for assignment to the missing quantity + or a None to signal an error with the missing code. We decline + to infer missing codes except for a very simple cases; the default + is to produce an error so that the issue can be fixed in indicators. + """ + logger = get_structured_logger('validate_missing_code') + + missing_code[missing_code.isna() & column.notna()] = Nans.NOT_MISSING.value + missing_code[missing_code.isna() & column.isna()] = Nans.OTHER.value + + contradict_mask = missing_code.ne(Nans.NOT_MISSING.value) & column.notna() + if contradict_mask.any(): + first_fail = missing_code[contradict_mask].iloc[0] + logger.warning(f'Correcting contradicting missing code: {first_fail} in {details.source}:{details.signal} {details.time_value} {details.geo_type}') + missing_code[contradict_mask] = Nans.NOT_MISSING.value + + contradict_mask = missing_code.eq(Nans.NOT_MISSING.value) & column.isna() + if contradict_mask.any(): + first_fail = missing_code[contradict_mask].iloc[0] + logger.warning(f'Correcting contradicting missing code: {first_fail} in {details.source}:{details.signal} {details.time_value} {details.geo_type}') + missing_code[contradict_mask] = Nans.OTHER.value + + return missing_code - if geo_type in ('hrr', 'msa', 'dma', 'hhs'): - # these particular ids are prone to be written as ints -- and floats - try: - geo_id = str(CsvImporter.floaty_int(geo_id)) - except ValueError: - # expected a number, but got a string - return (None, 'geo_id') + # use consistent capitalization (e.g. for states) + table['geo_id'] = table['geo_id'].str.lower() # sanity check geo_id with respect to geo_type if geo_type == 'county': - if len(geo_id) != 5 or not '01000' <= geo_id <= '80000': - return (None, 'geo_id') - + fail_mask = (table['geo_id'].str.len() != 5) | ~table['geo_id'].between('01000', '80000') elif geo_type == 'hrr': - if not 1 <= int(geo_id) <= 500: - return (None, 'geo_id') - + fail_mask = ~table['geo_id'].astype(int).between(1, 500) elif geo_type == 'msa': - if len(geo_id) != 5 or not '10000' <= geo_id <= '99999': - return (None, 'geo_id') - + fail_mask = (table['geo_id'].str.len() != 5) | ~table['geo_id'].between('10000', '99999') elif geo_type == 'dma': - if not 450 <= int(geo_id) <= 950: - return (None, 'geo_id') - + fail_mask = ~table['geo_id'].astype(int).between(450, 950) elif geo_type == 'state': - # note that geo_id is lowercase - if len(geo_id) != 2 or not 'aa' <= geo_id <= 'zz': - return (None, 'geo_id') - + fail_mask = (table['geo_id'].str.len() != 2) | ~table['geo_id'].between('aa', 'zz') elif geo_type == 'hhs': - if not 1 <= int(geo_id) <= 10: - return (None, 'geo_id') - + fail_mask = ~table['geo_id'].astype(int).between(1, 10) elif geo_type == 'nation': - # geo_id is lowercase - if len(geo_id) != 2 or not 'aa' <= geo_id <= 'zz': - return (None, 'geo_id') - + fail_mask = table['geo_id'] != 'us' else: - return (None, 'geo_type') + raise GeoTypeSanityCheckException(f'Invalid geo_type: {geo_type}') + + validate_geo_code(fail_mask, geo_type) # Validate row values - value = CsvImporter.validate_quantity(row, "value") - # value was a string or another dtype - if value == "Error": - return (None, 'value') - stderr = CsvImporter.validate_quantity(row, "stderr") - # stderr is a string, another dtype, or negative - if stderr == "Error" or (stderr is not None and stderr < 0): - return (None, 'stderr') - sample_size = CsvImporter.validate_quantity(row, "sample_size") - # sample_size is a string, another dtype, or negative - if sample_size == "Error" or (sample_size is not None and sample_size < 0): - return (None, 'sample_size') - - # Validate and write missingness codes - missing_value = CsvImporter.validate_missing_code(row, value, "value", filepath) - missing_stderr = CsvImporter.validate_missing_code(row, stderr, "stderr", filepath) - missing_sample_size = CsvImporter.validate_missing_code(row, sample_size, "sample_size", filepath) - - # return extracted and validated row values - return (CsvRowValue(geo_id, value, stderr, sample_size, missing_value, missing_stderr, missing_sample_size), None) + table['value'] = validate_quantity(table['value']) + table['stderr'] = validate_quantity(table['stderr']) + table['sample_size'] = validate_quantity(table['sample_size']) + + # Validate and fix missingness codes + table['missing_value'] = validate_missing_code(table['missing_value'], table['value']) + table['missing_stderr'] = validate_missing_code(table['missing_stderr'], table['stderr']) + table['missing_sample_size'] = validate_missing_code(table['missing_sample_size'], table['sample_size']) + + return table @staticmethod - def load_csv(filepath: str, details: PathDetails) -> Iterator[Optional[CovidcastRow]]: + def load_csv(filepath: str, details: PathDetails) -> Optional[List[CovidcastRow]]: """Load, validate, and yield data as `RowValues` from a CSV file. filepath: the CSV file to be loaded @@ -379,38 +344,218 @@ def load_csv(filepath: str, details: PathDetails) -> Iterator[Optional[Covidcast try: table = pd.read_csv(filepath, dtype=CsvImporter.DTYPES) - except ValueError as e: - logger.warning(event='Failed to open CSV with specified dtypes, switching to str', detail=str(e), file=filepath) - table = pd.read_csv(filepath, dtype='str') + except pd.errors.DtypeWarning as e: + logger.warning(event='Failed to open CSV with specified dtypes', detail=str(e), file=filepath) + return None + except pd.errors.EmptyDataError as e: + logger.warning(event='Empty data or header is encountered', detail=str(e), file=filepath) + return None if not CsvImporter.is_header_valid(table.columns): logger.warning(event='invalid header', detail=table.columns, file=filepath) - yield None - return + return None table.rename(columns={"val": "value", "se": "stderr", "missing_val": "missing_value", "missing_se": "missing_stderr"}, inplace=True) + + for key in ["missing_value", "missing_stderr", "missing_sample_size"]: + if key not in table.columns: + table[key] = np.nan - for row in table.itertuples(index=False): - csv_row_values, error = CsvImporter.extract_and_check_row(row, details.geo_type, filepath) - - if error: - logger.warning(event = 'invalid value for row', detail=(str(row), error), file=filepath) - yield None - continue - - yield CovidcastRow( - details.source, - details.signal, - details.time_type, - details.geo_type, - details.time_value, - csv_row_values.geo_value, - csv_row_values.value, - csv_row_values.stderr, - csv_row_values.sample_size, - csv_row_values.missing_value, - csv_row_values.missing_stderr, - csv_row_values.missing_sample_size, - details.issue, - details.lag, - ) + try: + table = CsvImporter.extract_and_check_row(details.geo_type, table, details) + except GeoIdSanityCheckException as err: + row = table.loc[table['geo_id'] == err.geo_id] + logger.warning(event='invalid value for row', detail=(row.to_csv(header=False, index=False, na_rep='NA')), file=filepath) + return None + except GeoTypeSanityCheckException as err: + logger.warning(event='invalid value for row', detail=err, file=filepath) + return None + except ValueSanityCheckException as err: + row = table.loc[table['value'] == err.value] + logger.warning(event='invalid value for row', detail=(row.to_csv(header=False, index=False, na_rep='NA')), file=filepath) + return None + except Exception as err: + logger.warning(event='unknown error occured in extract_and_check_row', detail=err, file=filepath) + return None + return [ + CovidcastRow( + source=details.source, + signal=details.signal, + time_type=details.time_type, + geo_type=details.geo_type, + time_value=details.time_value, + geo_value=row.geo_id, + value=row.value if pd.notna(row.value) else None, + stderr=row.stderr if pd.notna(row.stderr) else None, + sample_size=row.sample_size if pd.notna(row.sample_size) else None, + missing_value=int(row.missing_value), + missing_stderr=int(row.missing_stderr), + missing_sample_size=int(row.missing_sample_size), + issue=details.issue, + lag=details.lag + ) for row in table.itertuples(index=False) + ] + + +def collect_files(data_dir: str, specific_issue_date: bool): + """Fetch path and data profile details for each file to upload.""" + logger= get_structured_logger('collect_files') + if specific_issue_date: + results = list(CsvImporter.find_issue_specific_csv_files(data_dir)) + else: + results = list(CsvImporter.find_csv_files(os.path.join(data_dir, 'receiving'))) + logger.info(f'found {len(results)} files') + return results + + +def make_handlers(data_dir: str, specific_issue_date: bool): + if specific_issue_date: + # issue-specific uploads are always one-offs, so we can leave all + # files in place without worrying about cleaning up + def handle_failed(path_src, filename, source, logger): + logger.info(event='leaving failed file alone', dest=source, file=filename) + + def handle_successful(path_src, filename, source, logger): + logger.info(event='archiving as successful',file=filename) + FileArchiver.archive_inplace(path_src, filename) + else: + # normal automation runs require some shuffling to remove files + # from receiving and place them in the archive + archive_successful_dir = os.path.join(data_dir, 'archive', 'successful') + archive_failed_dir = os.path.join(data_dir, 'archive', 'failed') + + # helper to archive a failed file without compression + def handle_failed(path_src, filename, source, logger): + logger.info(event='archiving as failed - ', detail=source, file=filename) + path_dst = os.path.join(archive_failed_dir, source) + compress = False + FileArchiver.archive_file(path_src, path_dst, filename, compress) + + # helper to archive a successful file with compression + def handle_successful(path_src, filename, source, logger): + logger.info(event='archiving as successful',file=filename) + path_dst = os.path.join(archive_successful_dir, source) + compress = True + FileArchiver.archive_file(path_src, path_dst, filename, compress) + + return handle_successful, handle_failed + + +def upload_archive( + path_details: Iterable[Tuple[str, Optional[PathDetails]]], + database: Database, + handlers: Tuple[Callable], + logger: Logger + ): + """Upload CSVs to the database and archive them using the specified handlers. + + :path_details: output from CsvImporter.find*_csv_files + + :database: an open connection to the epidata database + + :handlers: functions for archiving (successful, failed) files + + :return: the number of modified rows + """ + archive_as_successful, archive_as_failed = handlers + total_modified_row_count = 0 + # iterate over each file + for path, details in path_details: + logger.info(event='handling', dest=path) + path_src, filename = os.path.split(path) + + # file path or name was invalid, source is unknown + if not details: + archive_as_failed(path_src, filename, 'unknown',logger) + continue + + all_rows_valid = True + csv_rows = CsvImporter.load_csv(path, details) + if csv_rows: + try: + modified_row_count = database.insert_or_update_bulk(csv_rows) + logger.info(f"insert_or_update_bulk {filename} returned {modified_row_count}") + logger.info( + "Inserted database rows", + row_count = modified_row_count, + source = details.source, + signal = details.signal, + geo_type = details.geo_type, + time_value = details.time_value, + issue = details.issue, + lag = details.lag + ) + if modified_row_count is None or modified_row_count: # else would indicate zero rows inserted + total_modified_row_count += (modified_row_count if modified_row_count else 0) + database.commit() + except DBLoadStateException as e: + # if the db is in a state that is not fit for loading new data, + # then we should stop processing any more files + raise e + except Exception as e: + all_rows_valid = False + logger.exception('exception while inserting rows', exc_info=e) + database.rollback() + + # archive the current file based on validation results + if csv_rows and all_rows_valid: + archive_as_successful(path_src, filename, details.source, logger) + else: + archive_as_failed(path_src, filename, details.source, logger) + + return total_modified_row_count + + +def main(args): + """Find, parse, and upload covidcast signals.""" + + logger = get_structured_logger("csv_ingestion", filename=args.log_file) + start_time = time.time() + + # shortcut escape without hitting db if nothing to do + path_details = collect_files(args.data_dir, args.specific_issue_date) + if not path_details: + logger.info('nothing to do; exiting...') + return + + logger.info("Ingesting CSVs", csv_count = len(path_details)) + + database = Database() + database.connect() + + try: + modified_row_count = upload_archive( + path_details, + database, + make_handlers(args.data_dir, args.specific_issue_date), + logger + ) + logger.info("Finished inserting/updating database rows", row_count = modified_row_count) + finally: + database.do_analyze() + # unconditionally commit database changes since CSVs have been archived + database.disconnect(True) + + logger.info( + "Ingested CSVs into database", + total_runtime_in_seconds=round(time.time() - start_time, 2)) + + +def get_argument_parser(): + """Define command line arguments.""" + + parser = argparse.ArgumentParser() + parser.add_argument( + '--data_dir', + help='top-level directory where CSVs are stored') + parser.add_argument( + '--specific_issue_date', + action='store_true', + help='indicates argument is where issuedate-specific subdirectories can be found.') + parser.add_argument( + '--log_file', + help="filename for log output (defaults to stdout)") + return parser + +if __name__ == '__main__': + main(get_argument_parser().parse_args()) diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py deleted file mode 100644 index 842e820c9..000000000 --- a/src/acquisition/covidcast/csv_to_database.py +++ /dev/null @@ -1,180 +0,0 @@ -"""Imports covidcast CSVs and stores them in the epidata database.""" - -# standard library -import argparse -import os -import time -from logging import Logger -from typing import Callable, Iterable, Optional, Tuple - -# first party -from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails -from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException -from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger - - -def get_argument_parser(): - """Define command line arguments.""" - - parser = argparse.ArgumentParser() - parser.add_argument( - '--data_dir', - help='top-level directory where CSVs are stored') - parser.add_argument( - '--specific_issue_date', - action='store_true', - help='indicates argument is where issuedate-specific subdirectories can be found.') - parser.add_argument( - '--log_file', - help="filename for log output (defaults to stdout)") - return parser - - -def collect_files(data_dir: str, specific_issue_date: bool): - """Fetch path and data profile details for each file to upload.""" - logger= get_structured_logger('collect_files') - if specific_issue_date: - results = list(CsvImporter.find_issue_specific_csv_files(data_dir)) - else: - results = list(CsvImporter.find_csv_files(os.path.join(data_dir, 'receiving'))) - logger.info(f'found {len(results)} files') - return results - - -def make_handlers(data_dir: str, specific_issue_date: bool): - if specific_issue_date: - # issue-specific uploads are always one-offs, so we can leave all - # files in place without worrying about cleaning up - def handle_failed(path_src, filename, source, logger): - logger.info(event='leaving failed file alone', dest=source, file=filename) - - def handle_successful(path_src, filename, source, logger): - logger.info(event='archiving as successful',file=filename) - FileArchiver.archive_inplace(path_src, filename) - else: - # normal automation runs require some shuffling to remove files - # from receiving and place them in the archive - archive_successful_dir = os.path.join(data_dir, 'archive', 'successful') - archive_failed_dir = os.path.join(data_dir, 'archive', 'failed') - - # helper to archive a failed file without compression - def handle_failed(path_src, filename, source, logger): - logger.info(event='archiving as failed - ', detail=source, file=filename) - path_dst = os.path.join(archive_failed_dir, source) - compress = False - FileArchiver.archive_file(path_src, path_dst, filename, compress) - - # helper to archive a successful file with compression - def handle_successful(path_src, filename, source, logger): - logger.info(event='archiving as successful',file=filename) - path_dst = os.path.join(archive_successful_dir, source) - compress = True - FileArchiver.archive_file(path_src, path_dst, filename, compress) - - return handle_successful, handle_failed - - -def upload_archive( - path_details: Iterable[Tuple[str, Optional[PathDetails]]], - database: Database, - handlers: Tuple[Callable], - logger: Logger - ): - """Upload CSVs to the database and archive them using the specified handlers. - - :path_details: output from CsvImporter.find*_csv_files - - :database: an open connection to the epidata database - - :handlers: functions for archiving (successful, failed) files - - :return: the number of modified rows - """ - archive_as_successful, archive_as_failed = handlers - total_modified_row_count = 0 - # iterate over each file - for path, details in path_details: - logger.info(event='handling', dest=path) - path_src, filename = os.path.split(path) - - # file path or name was invalid, source is unknown - if not details: - archive_as_failed(path_src, filename, 'unknown',logger) - continue - - csv_rows = CsvImporter.load_csv(path, details) - rows_list = list(csv_rows) - all_rows_valid = rows_list and all(r is not None for r in rows_list) - if all_rows_valid: - try: - modified_row_count = database.insert_or_update_bulk(rows_list) - logger.info(f"insert_or_update_bulk {filename} returned {modified_row_count}") - logger.info( - "Inserted database rows", - row_count = modified_row_count, - source = details.source, - signal = details.signal, - geo_type = details.geo_type, - time_value = details.time_value, - issue = details.issue, - lag = details.lag - ) - if modified_row_count is None or modified_row_count: # else would indicate zero rows inserted - total_modified_row_count += (modified_row_count if modified_row_count else 0) - database.commit() - except DBLoadStateException as e: - # if the db is in a state that is not fit for loading new data, - # then we should stop processing any more files - raise e - except Exception as e: - all_rows_valid = False - logger.exception('exception while inserting rows', exc_info=e) - database.rollback() - - # archive the current file based on validation results - if all_rows_valid: - archive_as_successful(path_src, filename, details.source, logger) - else: - archive_as_failed(path_src, filename, details.source, logger) - - return total_modified_row_count - - -def main(args): - """Find, parse, and upload covidcast signals.""" - - logger = get_structured_logger("csv_ingestion", filename=args.log_file) - start_time = time.time() - - # shortcut escape without hitting db if nothing to do - path_details = collect_files(args.data_dir, args.specific_issue_date) - if not path_details: - logger.info('nothing to do; exiting...') - return - - logger.info("Ingesting CSVs", csv_count = len(path_details)) - - database = Database() - database.connect() - - try: - modified_row_count = upload_archive( - path_details, - database, - make_handlers(args.data_dir, args.specific_issue_date), - logger - ) - logger.info("Finished inserting/updating database rows", row_count = modified_row_count) - finally: - database.do_analyze() - # unconditionally commit database changes since CSVs have been archived - database.disconnect(True) - - logger.info( - "Ingested CSVs into database", - total_runtime_in_seconds=round(time.time() - start_time, 2)) - - -if __name__ == '__main__': - main(get_argument_parser().parse_args()) diff --git a/tests/acquisition/covidcast/test_csv_importer.py b/tests/acquisition/covidcast/test_csv_importer.py index 0906febd1..f431bfff7 100644 --- a/tests/acquisition/covidcast/test_csv_importer.py +++ b/tests/acquisition/covidcast/test_csv_importer.py @@ -1,26 +1,44 @@ """Unit tests for csv_importer.py.""" -# standard library +import argparse import unittest -from unittest.mock import MagicMock, patch from datetime import date -import numpy as np +from typing import Iterable +from unittest.mock import MagicMock, patch -# third party -import pandas as pd import epiweeks as epi - -from delphi_utils import Nans +import numpy as np +import pandas as pd +import pytest +from delphi.epidata.acquisition.covidcast.csv_importer import ( + CsvImporter, + PathDetails, + collect_files, + get_argument_parser, + main, + make_handlers, + upload_archive, + GeoIdSanityCheckException, + GeoTypeSanityCheckException, + ValueSanityCheckException +) +from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow from delphi.utils.epiweek import delta_epiweeks -from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, CsvRowValue, PathDetails - -# py3tester coverage target -__test_target__ = 'delphi.epidata.acquisition.covidcast.csv_importer' +from delphi_utils import Nans class UnitTests(unittest.TestCase): """Basic unit tests.""" + _path_details = [ + # a good file + ('path/a.csv', PathDetails(20200420, 1, 'src_a', 'sig_a', 'day', 20200419, 'hrr')), + # a file with a data error + ('path/b.csv', PathDetails(202017, 1, 'src_b', 'sig_b', 'week', 202016, 'msa')), + # emulate a file that's named incorrectly + ('path/c.csv', None) + ] + def test_is_sane_day(self): """Sanity check some dates.""" @@ -135,96 +153,84 @@ def test_is_header_valid_does_not_depend_on_column_order(self): self.assertTrue(CsvImporter.is_header_valid(columns)) - def test_floaty_int(self): - """Parse ints that may look like floats.""" - - self.assertEqual(CsvImporter.floaty_int('-1'), -1) - self.assertEqual(CsvImporter.floaty_int('-1.0'), -1) - - with self.assertRaises(ValueError): - CsvImporter.floaty_int('-1.1') - - - def test_maybe_apply(self): - """Apply a function to a value as long as it's not null-like.""" - - self.assertEqual(CsvImporter.maybe_apply(float, '3.14'), 3.14) - self.assertEqual(CsvImporter.maybe_apply(int, '1'), 1) - self.assertIsNone(CsvImporter.maybe_apply(int, 'NA')) - self.assertIsNone(CsvImporter.maybe_apply(int, 'NaN')) - self.assertIsNone(CsvImporter.maybe_apply(float, '')) - self.assertIsNone(CsvImporter.maybe_apply(float, None)) - - def test_extract_and_check_row(self): """Apply various sanity checks to a row of data.""" def make_row( - geo_type='state', - geo_id='vi', - value='1.23', - stderr='4.56', - sample_size='100.5', - missing_value=str(float(Nans.NOT_MISSING)), - missing_stderr=str(float(Nans.NOT_MISSING)), - missing_sample_size=str(float(Nans.NOT_MISSING))): - row = MagicMock( - geo_id=geo_id, - value=value, - stderr=stderr, - sample_size=sample_size, - missing_value=missing_value, - missing_stderr=missing_stderr, - missing_sample_size=missing_sample_size, - spec=["geo_id", "value", "stderr", "sample_size", - "missing_value", "missing_stderr", "missing_sample_size"]) - return geo_type, row - - # cases to test each failure mode - failure_cases = [ - (make_row(geo_type='county', geo_id='1234'), 'geo_id'), - (make_row(geo_type='county', geo_id='00000'), 'geo_id'), - (make_row(geo_type='hrr', geo_id='600'), 'geo_id'), - (make_row(geo_type='msa', geo_id='1234'), 'geo_id'), - (make_row(geo_type='msa', geo_id='01234'), 'geo_id'), - (make_row(geo_type='dma', geo_id='400'), 'geo_id'), - (make_row(geo_type='state', geo_id='48'), 'geo_id'), - (make_row(geo_type='state', geo_id='iowa'), 'geo_id'), - (make_row(geo_type='nation', geo_id='0000'), 'geo_id'), - (make_row(geo_type='hhs', geo_id='0'), 'geo_id'), - (make_row(geo_type='province', geo_id='ab'), 'geo_type'), - (make_row(stderr='-1'), 'stderr'), - (make_row(geo_type=None), 'geo_type'), - (make_row(geo_id=None), 'geo_id'), - (make_row(value='inf'), 'value'), - (make_row(stderr='inf'), 'stderr'), - (make_row(sample_size='inf'), 'sample_size'), - (make_row(geo_type='hrr', geo_id='hrr001'), 'geo_id'), - (make_row(value='value'), 'value'), - (make_row(stderr='stderr'), 'stderr'), - (make_row(sample_size='sample_size'), 'sample_size'), - ] - - for ((geo_type, row), field) in failure_cases: - values, error = CsvImporter.extract_and_check_row(row, geo_type) - self.assertIsNone(values) - self.assertEqual(error, field) - - success_cases = [ - (make_row(), CsvRowValue('vi', 1.23, 4.56, 100.5, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING)), - (make_row(value=None, stderr=np.nan, sample_size='', missing_value=str(float(Nans.DELETED)), missing_stderr=str(float(Nans.DELETED)), missing_sample_size=str(float(Nans.DELETED))), CsvRowValue('vi', None, None, None, Nans.DELETED, Nans.DELETED, Nans.DELETED)), - (make_row(stderr='', sample_size='NA', missing_stderr=str(float(Nans.OTHER)), missing_sample_size=str(float(Nans.OTHER))), CsvRowValue('vi', 1.23, None, None, Nans.NOT_MISSING, Nans.OTHER, Nans.OTHER)), - (make_row(sample_size=None, missing_value='missing_value', missing_stderr=str(float(Nans.OTHER)), missing_sample_size=str(float(Nans.NOT_MISSING))), CsvRowValue('vi', 1.23, 4.56, None, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.OTHER)), - ] - - for ((geo_type, row), field) in success_cases: - values, error = CsvImporter.extract_and_check_row(row, geo_type) - self.assertIsNone(error) - self.assertIsInstance(values, CsvRowValue) - self.assertEqual(values.geo_value, field.geo_value) - self.assertEqual(values.value, field.value) - self.assertEqual(values.stderr, field.stderr) - self.assertEqual(values.sample_size, field.sample_size) + geo_id='vi', + value=1.23, + stderr=4.56, + sample_size=100.5, + missing_value=Nans.NOT_MISSING, + missing_stderr=Nans.NOT_MISSING, + missing_sample_size=Nans.NOT_MISSING + ): + row = pd.DataFrame({ + "geo_id": [geo_id], + "value": [value], + "stderr": [stderr], + "sample_size": [sample_size], + "missing_value": [missing_value], + "missing_stderr": [missing_stderr], + "missing_sample_size": [missing_sample_size] + }) + return row + + details = PathDetails(20200408, 0, 'src', 'sig', 'day', 20200408, 'state') + + # Failure cases. + with pytest.raises(GeoIdSanityCheckException): + CsvImporter.extract_and_check_row('county', make_row(geo_id='1234'), details) + with pytest.raises(GeoIdSanityCheckException): + CsvImporter.extract_and_check_row('county', make_row(geo_id='00000'), details) + with pytest.raises(GeoIdSanityCheckException): + CsvImporter.extract_and_check_row('hrr', make_row(geo_id='600'), details) + with pytest.raises(GeoIdSanityCheckException): + CsvImporter.extract_and_check_row('msa', make_row(geo_id='1234'), details) + with pytest.raises(GeoIdSanityCheckException): + CsvImporter.extract_and_check_row('msa', make_row(geo_id='01234'), details) + with pytest.raises(GeoIdSanityCheckException): + CsvImporter.extract_and_check_row('dma', make_row(geo_id='400'), details) + with pytest.raises(GeoIdSanityCheckException): + CsvImporter.extract_and_check_row('state', make_row(geo_id='48'), details) + with pytest.raises(GeoIdSanityCheckException): + CsvImporter.extract_and_check_row('state', make_row(geo_id='iowa'), details) + with pytest.raises(GeoIdSanityCheckException): + CsvImporter.extract_and_check_row('nation', make_row(geo_id='0000'), details) + with pytest.raises(GeoIdSanityCheckException): + CsvImporter.extract_and_check_row('hhs', make_row(geo_id='0'), details) + with pytest.raises(GeoIdSanityCheckException): + CsvImporter.extract_and_check_row('county', make_row(geo_id=None), details) + + with pytest.raises(Exception): + CsvImporter.extract_and_check_row('hrr', make_row(geo_id='hrr001'), details) + + with pytest.raises(GeoTypeSanityCheckException): + CsvImporter.extract_and_check_row('province', make_row(geo_id='ab'), details) + with pytest.raises(GeoTypeSanityCheckException): + CsvImporter.extract_and_check_row(None, make_row(), details) + + with pytest.raises(ValueSanityCheckException): + CsvImporter.extract_and_check_row('state', make_row(stderr=-1), details) + with pytest.raises(ValueSanityCheckException): + CsvImporter.extract_and_check_row('state', make_row(value=float('inf')), details) + with pytest.raises(ValueSanityCheckException): + CsvImporter.extract_and_check_row('state', make_row(stderr=float('inf')), details) + with pytest.raises(ValueSanityCheckException): + CsvImporter.extract_and_check_row('state', make_row(sample_size=float('inf')), details) + + # Success cases with NANs. + table = CsvImporter.extract_and_check_row('state', make_row(), details) + assert table.compare(make_row('vi', 1.23, 4.56, 100.5, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING)).empty + + table = CsvImporter.extract_and_check_row('state', make_row(value=None, stderr=np.nan, sample_size=None, missing_value=Nans.DELETED, missing_stderr=Nans.DELETED, missing_sample_size=Nans.DELETED), details) + assert table.compare(make_row('vi', None, None, None, Nans.DELETED, Nans.DELETED, Nans.DELETED)).empty + + table = CsvImporter.extract_and_check_row('state', make_row(stderr=None, sample_size=np.nan, missing_stderr=Nans.OTHER, missing_sample_size=Nans.OTHER), details) + assert table.compare(make_row('vi', 1.23, None, None, Nans.NOT_MISSING, Nans.OTHER, Nans.OTHER)).empty + + table = CsvImporter.extract_and_check_row('state', make_row(sample_size=None, missing_value=Nans.NOT_MISSING, missing_stderr=Nans.OTHER, missing_sample_size=Nans.NOT_MISSING), details) + assert table.compare(make_row('vi', 1.23, 4.56, None, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.OTHER)).empty @patch("pandas.read_csv") @@ -236,18 +242,18 @@ def test_load_csv_with_invalid_header(self, mock_read_csv): details = PathDetails(20200101, 0, "src", "name", "day", 20200101, "state") mock_read_csv.return_value = pd.DataFrame(data) - rows = list(CsvImporter.load_csv(filepath, details)) + rows = CsvImporter.load_csv(filepath, details) self.assertTrue(mock_read_csv.called) self.assertTrue(mock_read_csv.call_args[0][0], filepath) - self.assertEqual(rows, [None]) + self.assertEqual(rows, None) @patch("pandas.read_csv") def test_load_csv_with_valid_header(self, mock_read_csv): - """Yield sanity checked `RowValues` from a valid CSV file.""" + """Yield sanity checked `CsvRowValues` from a valid CSV file.""" - # one invalid geo_id, but otherwise valid + # valid header, but invalid geo_id data = { 'geo_id': ['ca', 'tx', 'fl', '123'], 'val': ['1.1', '1.2', '1.3', '1.4'], @@ -258,35 +264,35 @@ def test_load_csv_with_valid_header(self, mock_read_csv): details = PathDetails(20200101, 0, "src", "name", "day", 20200101, "state") mock_read_csv.return_value = pd.DataFrame(data=data) - rows = list(CsvImporter.load_csv(filepath, details)) + rows = CsvImporter.load_csv(filepath, details) self.assertTrue(mock_read_csv.called) self.assertTrue(mock_read_csv.call_args[0][0], filepath) - self.assertEqual(len(rows), 4) - - self.assertEqual(rows[0].geo_value, 'ca') - self.assertEqual(rows[0].value, 1.1) - self.assertEqual(rows[0].stderr, 2.1) - self.assertEqual(rows[0].sample_size, 301) - - self.assertEqual(rows[1].geo_value, 'tx') - self.assertEqual(rows[1].value, 1.2) - self.assertEqual(rows[1].stderr, 2.2) - self.assertEqual(rows[1].sample_size, 302) - - self.assertEqual(rows[2].geo_value, 'fl') - self.assertEqual(rows[2].value, 1.3) - self.assertEqual(rows[2].stderr, 2.3) - self.assertEqual(rows[2].sample_size, 303) - - self.assertIsNone(rows[3]) + self.assertEqual(rows, None) # now with missing values! + def make_covidcast_row(geo_value, value, stderr, sample_size, missing_value, missing_stderr, missing_sample_size): + return CovidcastRow( + source=details.source, + signal=details.signal, + time_type=details.time_type, + geo_type=details.geo_type, + issue=details.issue, + lag=details.lag, + time_value=details.time_value, + geo_value=geo_value, + value=value, + stderr=stderr, + sample_size=sample_size, + missing_value=missing_value, + missing_stderr=missing_stderr, + missing_sample_size=missing_sample_size, + ) data = { 'geo_id': ['ca', 'tx', 'fl', 'ak', 'wa'], - 'val': [np.nan, '1.2', '1.3', '1.4', '1.5'], - 'se': ['2.1', "na", '2.3', '2.4', '2.5'], - 'sample_size': ['301', '302', None, '304', None], + 'val': [np.nan, 1.2, 1.3, 1.4, 1.5], + 'se': [2.1, None, 2.3, 2.4, 2.5], + 'sample_size': [301, 302, None, 304, None], 'missing_value': [Nans.NOT_APPLICABLE] + [Nans.NOT_MISSING] * 3 + [None], 'missing_stderr': [Nans.NOT_MISSING, Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.NOT_MISSING] + [None], 'missing_sample_size': [Nans.NOT_MISSING] * 2 + [Nans.REGION_EXCEPTION] * 2 + [None] @@ -295,48 +301,252 @@ def test_load_csv_with_valid_header(self, mock_read_csv): details = PathDetails(20200101, 0, "src", "name", "day", 20200101, "state") mock_read_csv.return_value = pd.DataFrame(data) - rows = list(CsvImporter.load_csv(filepath, details)) + rows = CsvImporter.load_csv(filepath, details) self.assertTrue(mock_read_csv.called) self.assertTrue(mock_read_csv.call_args[0][0], filepath) self.assertEqual(len(rows), 5) - self.assertEqual(rows[0].geo_value, 'ca') - self.assertIsNone(rows[0].value) - self.assertEqual(rows[0].stderr, 2.1) - self.assertEqual(rows[0].sample_size, 301) - self.assertEqual(rows[0].missing_value, Nans.NOT_APPLICABLE) - self.assertEqual(rows[0].missing_stderr, Nans.NOT_MISSING) - self.assertEqual(rows[0].missing_sample_size, Nans.NOT_MISSING) - - self.assertEqual(rows[1].geo_value, 'tx') - self.assertEqual(rows[1].value, 1.2) - self.assertIsNone(rows[1].stderr) - self.assertEqual(rows[1].sample_size, 302) - self.assertEqual(rows[1].missing_value, Nans.NOT_MISSING) - self.assertEqual(rows[1].missing_stderr, Nans.REGION_EXCEPTION) - self.assertEqual(rows[1].missing_sample_size, Nans.NOT_MISSING) - - self.assertEqual(rows[2].geo_value, 'fl') - self.assertEqual(rows[2].value, 1.3) - self.assertEqual(rows[2].stderr, 2.3) - self.assertIsNone(rows[2].sample_size) - self.assertEqual(rows[2].missing_value, Nans.NOT_MISSING) - self.assertEqual(rows[2].missing_stderr, Nans.NOT_MISSING) - self.assertEqual(rows[2].missing_sample_size, Nans.REGION_EXCEPTION) - - self.assertEqual(rows[3].geo_value, 'ak') - self.assertEqual(rows[3].value, 1.4) - self.assertEqual(rows[3].stderr, 2.4) - self.assertEqual(rows[3].sample_size, 304) - self.assertEqual(rows[3].missing_value, Nans.NOT_MISSING) - self.assertEqual(rows[3].missing_stderr, Nans.NOT_MISSING) - self.assertEqual(rows[3].missing_sample_size, Nans.NOT_MISSING) - - self.assertEqual(rows[4].geo_value, 'wa') - self.assertEqual(rows[4].value, 1.5) - self.assertEqual(rows[4].stderr, 2.5) - self.assertEqual(rows[4].sample_size, None) - self.assertEqual(rows[4].missing_value, Nans.NOT_MISSING) - self.assertEqual(rows[4].missing_stderr, Nans.NOT_MISSING) - self.assertEqual(rows[4].missing_sample_size, Nans.OTHER) \ No newline at end of file + self.assertEqual(rows[0], make_covidcast_row( + geo_value='ca', + value=None, + stderr=2.1, + sample_size=301, + missing_value=Nans.NOT_APPLICABLE, + missing_stderr=Nans.NOT_MISSING, + missing_sample_size=Nans.NOT_MISSING + )) + + self.assertEqual(rows[1], make_covidcast_row( + geo_value='tx', + value=1.2, + stderr=None, + sample_size=302, + missing_value=Nans.NOT_MISSING, + missing_stderr=Nans.REGION_EXCEPTION, + missing_sample_size=Nans.NOT_MISSING, + )) + + self.assertEqual(rows[2], make_covidcast_row( + geo_value='fl', + value=1.3, + stderr=2.3, + sample_size=None, + missing_value=Nans.NOT_MISSING, + missing_stderr=Nans.NOT_MISSING, + missing_sample_size=Nans.REGION_EXCEPTION, + )) + + self.assertEqual(rows[3], make_covidcast_row( + geo_value='ak', + value=1.4, + stderr=2.4, + sample_size=304, + missing_value=Nans.NOT_MISSING, + missing_stderr=Nans.NOT_MISSING, + missing_sample_size=Nans.NOT_MISSING, + )) + + self.assertEqual(rows[4], make_covidcast_row( + geo_value='wa', + value=1.5, + stderr=2.5, + sample_size=None, + missing_value=Nans.NOT_MISSING, + missing_stderr=Nans.NOT_MISSING, + missing_sample_size=Nans.OTHER, + )) + + + def test_get_argument_parser(self): + """Return a parser for command-line arguments.""" + + self.assertIsInstance(get_argument_parser(), argparse.ArgumentParser) + + + @patch("delphi.epidata.acquisition.covidcast.csv_importer.CsvImporter") + def test_collect_files(self, mock_csv_importer: MagicMock): + """Scan the data directory.""" + + mock_csv_importer.find_csv_files.return_value = self._path_details + collect_files("fake_data_dir", False) # no specific issue + self.assertEqual(mock_csv_importer.find_csv_files.call_count, 1) + + + @patch("delphi.epidata.acquisition.covidcast.csv_importer.Database") + @patch("delphi.epidata.acquisition.covidcast.csv_importer.CsvImporter") + @patch("delphi.epidata.acquisition.covidcast.csv_importer.FileArchiver") + def test_upload_archive(self, mock_file_archiver: MagicMock, mock_csv_importer: MagicMock, mock_database: MagicMock): + """Upload to the database, and archive.""" + + def make_row(value: float, details: PathDetails): + return MagicMock( + source=details.source, + signal=details.signal, + time_type=details.time_type, + geo_type=details.geo_type, + time_value=details.time_value, + issue=details.issue, + lag=details.lag, + geo_value=value, + value=value, + stderr=value, + sample_size=value, + ) + + def load_csv_impl(path, details): + if path == 'path/a.csv': + # no validation errors + return [ + make_row('a1', details), + make_row('a2', details), + make_row('a3', details) + ] + elif path == 'path/b.csv': + # one validation error + return None + else: + # fail the test for any other path + raise Exception('unexpected path') + + def iter_len(l: Iterable) -> int: + return len(list(l)) + + data_dir = 'data_dir' + mock_database.insert_or_update_bulk = MagicMock(wraps=iter_len) + mock_csv_importer.load_csv = load_csv_impl + mock_logger = MagicMock() + + modified_row_count = upload_archive( + self._path_details, + mock_database, + make_handlers(data_dir, False), + mock_logger + ) + + self.assertEqual(modified_row_count, 3) + # verify that appropriate rows were added to the database + self.assertEqual(mock_database.insert_or_update_bulk.call_count, 1) + call_args_list = mock_database.insert_or_update_bulk.call_args_list + actual_args = [[(a.source, a.signal, a.time_type, a.geo_type, a.time_value, + a.geo_value, a.value, a.stderr, a.sample_size, a.issue, a.lag) + for a in call.args[0]] for call in call_args_list] + + expected_args = [ + [('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a1', 'a1', 'a1', 'a1', 20200420, 1), + ('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a2', 'a2', 'a2', 'a2', 20200420, 1), + ('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a3', 'a3', 'a3', 'a3', 20200420, 1)] + ] + self.assertEqual(actual_args, expected_args) + + # verify that two files were successful (a, d) and two failed (b, c) + self.assertEqual(mock_file_archiver.archive_file.call_count, 3) + call_args_list = mock_file_archiver.archive_file.call_args_list + actual_args = [args for (args, kwargs) in call_args_list] + expected_args = [ + ('path', 'data_dir/archive/successful/src_a', 'a.csv', True), + ('path', 'data_dir/archive/failed/src_b', 'b.csv', False), + ('path', 'data_dir/archive/failed/unknown', 'c.csv', False), + ] + self.assertEqual(actual_args, expected_args) + + + @patch("delphi.epidata.acquisition.covidcast.csv_importer.Database") + @patch("delphi.epidata.acquisition.covidcast.csv_importer.upload_archive") + @patch("delphi.epidata.acquisition.covidcast.csv_importer.collect_files") + def test_main_successful(self, mock_collect_files: MagicMock, mock_upload_archive: MagicMock, mock_database: MagicMock): + """Run the main program successfully, then commit changes.""" + + # TODO: use an actual argparse object for the args instead of a MagicMock + args = MagicMock(log_file=None, data_dir='data', specific_issue_date=False) + # `return_value` because we mocked the class constructor + mock_database.return_value.count_all_rows.return_value = 0 + mock_collect_files.return_value = [("a",False)] + + main(args) + self.assertTrue(mock_collect_files.called) + self.assertEqual(mock_collect_files.call_args[0][0], 'data') + + self.assertTrue(mock_upload_archive.called) + self.assertEqual(mock_upload_archive.call_args[0][0], [("a",False)]) + + self.assertTrue(mock_database.return_value.connect.called) + self.assertTrue(mock_database.return_value.disconnect.called) + self.assertTrue(mock_database.return_value.disconnect.call_args[0][0]) + + + @patch("delphi.epidata.acquisition.covidcast.csv_importer.Database") + @patch("delphi.epidata.acquisition.covidcast.csv_importer.upload_archive", side_effect=Exception('testing')) + @patch("delphi.epidata.acquisition.covidcast.csv_importer.collect_files") + def test_main_unsuccessful(self, mock_collect_files: MagicMock, mock_upload_archive: MagicMock, mock_database: MagicMock): + """Run the main program with failure, then commit changes.""" + + # TODO: use an actual argparse object for the args instead of a MagicMock + args = MagicMock(log_file=None, data_dir='data', specific_issue_date=False) + mock_database.return_value.count_all_rows.return_value = 0 + mock_collect_files.return_value = [("a",False)] + + with self.assertRaises(Exception): + main(args) + + self.assertTrue(mock_upload_archive.called) + self.assertEqual(mock_upload_archive.call_args[0][0], [("a",False)]) + + self.assertTrue(mock_database.return_value.connect.called) + self.assertTrue(mock_database.return_value.disconnect.called) + self.assertTrue(mock_database.return_value.disconnect.call_args[0][0]) + + + @patch("delphi.epidata.acquisition.covidcast.csv_importer.Database") + @patch("delphi.epidata.acquisition.covidcast.csv_importer.upload_archive") + @patch("delphi.epidata.acquisition.covidcast.csv_importer.collect_files") + def test_main_early_exit(self, mock_collect_files: MagicMock, mock_upload_archive: MagicMock, mock_database: MagicMock): + """Run the main program with an empty receiving directory.""" + + # TODO: use an actual argparse object for the args instead of a MagicMock + args = MagicMock(log_file=None, data_dir='data', specific_issue_date=False) + mock_database.count_all_rows.return_value = 0 + mock_collect_files.return_value = [] + + main(args) + + self.assertTrue(mock_collect_files.called) + self.assertEqual(mock_collect_files.call_args[0][0], 'data') + + self.assertFalse(mock_upload_archive.called) + + self.assertFalse(mock_database.return_value.connect.called) + self.assertFalse(mock_database.return_value.disconnect.called) + + + @patch("delphi.epidata.acquisition.covidcast.csv_importer.Database") + @patch("delphi.epidata.acquisition.covidcast.csv_importer.CsvImporter") + @patch("delphi.epidata.acquisition.covidcast.csv_importer.FileArchiver") + def test_database_exception_is_handled(self, mock_file_archiver: MagicMock, mock_csv_importer: MagicMock, mock_database: MagicMock): + """Gracefully handle database exceptions.""" + + data_dir = 'data_dir' + mock_database.insert_or_update_bulk.side_effect = Exception('testing') + mock_csv_importer.find_csv_files.return_value = [ + ('path/file.csv', PathDetails(20200424, 1, 'src', 'sig', 'day', 20200423, 'hrr')), + ] + mock_csv_importer.load_csv.return_value = [ + MagicMock(geo_value='geo', value=1, stderr=1, sample_size=1), + ] + mock_logger = MagicMock() + + upload_archive( + collect_files(data_dir, False), + mock_database, + make_handlers(data_dir, False), + mock_logger + ) + + # verify that insertions were attempted + self.assertTrue(mock_database.insert_or_update_bulk.called) + + # verify that the file was archived as having failed + self.assertTrue(mock_file_archiver.archive_file.called) + actual_args = mock_file_archiver.archive_file.call_args[0] + expected_args = ('path', 'data_dir/archive/failed/src', 'file.csv', False) + self.assertEqual(actual_args, expected_args) diff --git a/tests/acquisition/covidcast/test_csv_to_database.py b/tests/acquisition/covidcast/test_csv_to_database.py deleted file mode 100644 index 938070944..000000000 --- a/tests/acquisition/covidcast/test_csv_to_database.py +++ /dev/null @@ -1,218 +0,0 @@ -"""Unit tests for csv_to_database.py.""" - -# standard library -import argparse -from typing import Iterable -import unittest -from unittest.mock import MagicMock, patch - -from delphi.epidata.acquisition.covidcast.csv_importer import PathDetails -from delphi.epidata.acquisition.covidcast.csv_to_database import get_argument_parser, main, collect_files, upload_archive, make_handlers - -# py3tester coverage target -__test_target__ = 'delphi.epidata.acquisition.covidcast.csv_to_database' - - -class UnitTests(unittest.TestCase): - """Basic unit tests.""" - _path_details = [ - # a good file - ('path/a.csv', PathDetails(20200420, 1, 'src_a', 'sig_a', 'day', 20200419, 'hrr')), - # a file with a data error - ('path/b.csv', PathDetails(202017, 1, 'src_b', 'sig_b', 'week', 202016, 'msa')), - # emulate a file that's named incorrectly - ('path/c.csv', None) - ] - - def test_get_argument_parser(self): - """Return a parser for command-line arguments.""" - - self.assertIsInstance(get_argument_parser(), argparse.ArgumentParser) - - - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.CsvImporter") - def test_collect_files(self, mock_csv_importer: MagicMock): - """Scan the data directory.""" - - mock_csv_importer.find_csv_files.return_value = self._path_details - collect_files("fake_data_dir", False) # no specific issue - self.assertEqual(mock_csv_importer.find_csv_files.call_count, 1) - - - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.Database") - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.CsvImporter") - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.FileArchiver") - def test_upload_archive(self, mock_file_archiver: MagicMock, mock_csv_importer: MagicMock, mock_database: MagicMock): - """Upload to the database, and archive.""" - - def make_row(value: float, details: PathDetails): - return MagicMock( - source=details.source, - signal=details.signal, - time_type=details.time_type, - geo_type=details.geo_type, - time_value=details.time_value, - issue=details.issue, - lag=details.lag, - geo_value=value, - value=value, - stderr=value, - sample_size=value, - ) - - def load_csv_impl(path, details): - if path == 'path/a.csv': - # no validation errors - yield make_row('a1', details) - yield make_row('a2', details) - yield make_row('a3', details) - elif path == 'path/b.csv': - # one validation error - yield make_row('b1', details) - yield None - yield make_row('b3', details) - else: - # fail the test for any other path - raise Exception('unexpected path') - - def iter_len(l: Iterable) -> int: - return len(list(l)) - - data_dir = 'data_dir' - mock_database.insert_or_update_bulk = MagicMock(wraps=iter_len) - mock_csv_importer.load_csv = load_csv_impl - mock_logger = MagicMock() - - modified_row_count = upload_archive( - self._path_details, - mock_database, - make_handlers(data_dir, False), - mock_logger - ) - - self.assertEqual(modified_row_count, 3) - # verify that appropriate rows were added to the database - self.assertEqual(mock_database.insert_or_update_bulk.call_count, 1) - call_args_list = mock_database.insert_or_update_bulk.call_args_list - actual_args = [[(a.source, a.signal, a.time_type, a.geo_type, a.time_value, - a.geo_value, a.value, a.stderr, a.sample_size, a.issue, a.lag) - for a in call.args[0]] for call in call_args_list] - - expected_args = [ - [('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a1', 'a1', 'a1', 'a1', 20200420, 1), - ('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a2', 'a2', 'a2', 'a2', 20200420, 1), - ('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a3', 'a3', 'a3', 'a3', 20200420, 1)] - ] - self.assertEqual(actual_args, expected_args) - - # verify that two files were successful (a, d) and two failed (b, c) - self.assertEqual(mock_file_archiver.archive_file.call_count, 3) - call_args_list = mock_file_archiver.archive_file.call_args_list - actual_args = [args for (args, kwargs) in call_args_list] - expected_args = [ - ('path', 'data_dir/archive/successful/src_a', 'a.csv', True), - ('path', 'data_dir/archive/failed/src_b', 'b.csv', False), - ('path', 'data_dir/archive/failed/unknown', 'c.csv', False), - ] - self.assertEqual(actual_args, expected_args) - - - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.Database") - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.upload_archive") - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.collect_files") - def test_main_successful(self, mock_collect_files: MagicMock, mock_upload_archive: MagicMock, mock_database: MagicMock): - """Run the main program successfully, then commit changes.""" - - # TODO: use an actual argparse object for the args instead of a MagicMock - args = MagicMock(log_file=None, data_dir='data', specific_issue_date=False) - # `return_value` because we mocked the class constructor - mock_database.return_value.count_all_rows.return_value = 0 - mock_collect_files.return_value = [("a",False)] - - main(args) - self.assertTrue(mock_collect_files.called) - self.assertEqual(mock_collect_files.call_args[0][0], 'data') - - self.assertTrue(mock_upload_archive.called) - self.assertEqual(mock_upload_archive.call_args[0][0], [("a",False)]) - - self.assertTrue(mock_database.return_value.connect.called) - self.assertTrue(mock_database.return_value.disconnect.called) - self.assertTrue(mock_database.return_value.disconnect.call_args[0][0]) - - - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.Database") - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.upload_archive", side_effect=Exception('testing')) - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.collect_files") - def test_main_unsuccessful(self, mock_collect_files: MagicMock, mock_upload_archive: MagicMock, mock_database: MagicMock): - """Run the main program with failure, then commit changes.""" - - # TODO: use an actual argparse object for the args instead of a MagicMock - args = MagicMock(log_file=None, data_dir='data', specific_issue_date=False) - mock_database.return_value.count_all_rows.return_value = 0 - mock_collect_files.return_value = [("a",False)] - - with self.assertRaises(Exception): - main(args) - - self.assertTrue(mock_upload_archive.called) - self.assertEqual(mock_upload_archive.call_args[0][0], [("a",False)]) - - self.assertTrue(mock_database.return_value.connect.called) - self.assertTrue(mock_database.return_value.disconnect.called) - self.assertTrue(mock_database.return_value.disconnect.call_args[0][0]) - - - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.Database") - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.upload_archive") - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.collect_files") - def test_main_early_exit(self, mock_collect_files: MagicMock, mock_upload_archive: MagicMock, mock_database: MagicMock): - """Run the main program with an empty receiving directory.""" - - # TODO: use an actual argparse object for the args instead of a MagicMock - args = MagicMock(log_file=None, data_dir='data', specific_issue_date=False) - mock_database.count_all_rows.return_value = 0 - mock_collect_files.return_value = [] - - main(args) - - self.assertTrue(mock_collect_files.called) - self.assertEqual(mock_collect_files.call_args[0][0], 'data') - - self.assertFalse(mock_upload_archive.called) - - self.assertFalse(mock_database.return_value.connect.called) - self.assertFalse(mock_database.return_value.disconnect.called) - - - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.Database") - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.CsvImporter") - @patch("delphi.epidata.acquisition.covidcast.csv_to_database.FileArchiver") - def test_database_exception_is_handled(self, mock_file_archiver: MagicMock, mock_csv_importer: MagicMock, mock_database: MagicMock): - """Gracefully handle database exceptions.""" - - data_dir = 'data_dir' - mock_database.insert_or_update_bulk.side_effect = Exception('testing') - mock_csv_importer.find_csv_files.return_value = [ - ('path/file.csv', PathDetails(20200424, 1, 'src', 'sig', 'day', 20200423, 'hrr')), - ] - mock_csv_importer.load_csv.return_value = [ - MagicMock(geo_value='geo', value=1, stderr=1, sample_size=1), - ] - mock_logger = MagicMock() - - upload_archive( - collect_files(data_dir, False), - mock_database, - make_handlers(data_dir, False), - mock_logger - ) - - # verify that insertions were attempted - self.assertTrue(mock_database.insert_or_update_bulk.called) - - # verify that the file was archived as having failed - self.assertTrue(mock_file_archiver.archive_file.called) - actual_args = mock_file_archiver.archive_file.call_args[0] - expected_args = ('path', 'data_dir/archive/failed/src', 'file.csv', False) - self.assertEqual(actual_args, expected_args)