Skip to content

[Refactor] Further cleans of csv_importer #1072

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 58 additions & 17 deletions src/acquisition/covidcast/csv_importer.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
"""Collects and reads covidcast data from a set of local CSV files."""

# standard library
from dataclasses import dataclass
from datetime import date
import glob
import os
import re
from dataclasses import dataclass
from datetime import date
from glob import glob
from typing import Iterator, NamedTuple, Optional, Tuple
Comment on lines +6 to +9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: keep these in alphabetical order

we use pylint to enforce alphabetical import order in covidcast-indicators, and it would be good to be consistent across repositories.

Copy link
Contributor Author

@dshemetov dshemetov Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explanation:

So I'm using a Python import sort extension in my IDE and it likes to:

  • have three sections separated by two new lines: Python STD, third party, and first party
  • place bare import X statements above from X import Y statements
  • sort alphabetically within these sections

So that's the convention I followed here. I think we were sort-of doing the first convention already, not really the second, and doing the third.

question: do you like the second requirement? should I just go with just 1 and 3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is an open ended question that we can address in a more comprehensive push for broader linting/formatting goals. Gonna keep as is for now.


# third party
import pandas as pd
import epiweeks as epi
import pandas as pd

# first party
from delphi_utils import Nans
from delphi.utils.epiweek import delta_epiweeks
from .logger import get_structured_logger
from delphi.epidata.acquisition.covidcast.database import CovidcastRow
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: fully-qualified name ftw!

this will make our lives easier if/when we eventually convert the delphi-epidata codebase to actual python modules, or if/when we switch to a single logger behavior definition everywhere


DFRow = NamedTuple('DFRow', [('geo_id', str), ('value', float), ('stderr', float), ('sample_size', float), ('missing_value', int), ('missing_stderr', int), ('missing_sample_size', int)])
PathDetails = NamedTuple('PathDetails', [('issue', int), ('lag', int), ('source', str), ('signal', str), ('time_type', str), ('time_value', int), ('geo_type', str)])


@dataclass
class CsvRowValue:
Expand All @@ -27,6 +33,7 @@ class CsvRowValue:
missing_stderr: int
missing_sample_size: int


class CsvImporter:
"""Finds and parses covidcast CSV files."""

Expand Down Expand Up @@ -60,6 +67,7 @@ class CsvImporter:
"missing_sample_size": "Int64"
}


@staticmethod
def is_sane_day(value):
"""Return whether `value` is a sane (maybe not valid) YYYYMMDD date.
Expand All @@ -76,6 +84,7 @@ def is_sane_day(value):
return False
return date(year=year,month=month,day=day)


@staticmethod
def is_sane_week(value):
"""Return whether `value` is a sane (maybe not valid) YYYYWW epiweek.
Expand All @@ -91,22 +100,24 @@ def is_sane_week(value):
return False
return value


@staticmethod
def find_issue_specific_csv_files(scan_dir, glob=glob):
def find_issue_specific_csv_files(scan_dir):
logger = get_structured_logger('find_issue_specific_csv_files')
for path in sorted(glob.glob(os.path.join(scan_dir, '*'))):
for path in sorted(glob(os.path.join(scan_dir, '*'))):
issuedir_match = CsvImporter.PATTERN_ISSUE_DIR.match(path.lower())
if issuedir_match and os.path.isdir(path):
issue_date_value = int(issuedir_match.group(2))
issue_date = CsvImporter.is_sane_day(issue_date_value)
if issue_date:
logger.info(event='processing csv files from issue', detail=issue_date, file=path)
yield from CsvImporter.find_csv_files(path, issue=(issue_date, epi.Week.fromdate(issue_date)), glob=glob)
yield from CsvImporter.find_csv_files(path, issue=(issue_date, epi.Week.fromdate(issue_date)))
else:
logger.warning(event='invalid issue directory day', detail=issue_date_value, file=path)


@staticmethod
def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today())), glob=glob):
def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()))):
"""Recursively search for and yield covidcast-format CSV files.

scan_dir: the directory to scan (recursively)
Expand All @@ -122,11 +133,11 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
issue_value=-1
lag_value=-1

for path in sorted(glob.glob(os.path.join(scan_dir, '*', '*'))):

for path in sorted(glob(os.path.join(scan_dir, '*', '*'))):
# safe to ignore this file
if not path.lower().endswith('.csv'):
# safe to ignore this file
continue

# match a daily or weekly naming pattern
daily_match = CsvImporter.PATTERN_DAILY.match(path.lower())
weekly_match = CsvImporter.PATTERN_WEEKLY.match(path.lower())
Expand Down Expand Up @@ -174,14 +185,16 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
yield (path, None)
continue

yield (path, (source, signal, time_type, geo_type, time_value, issue_value, lag_value))
yield (path, PathDetails(issue_value, lag_value, source, signal, time_type, time_value, geo_type))


@staticmethod
def is_header_valid(columns):
"""Return whether the given pandas columns contains the required fields."""

return set(columns) >= CsvImporter.REQUIRED_COLUMNS


@staticmethod
def floaty_int(value: str) -> int:
"""Cast a string to an int, even if it looks like a float.
Expand All @@ -195,6 +208,7 @@ def floaty_int(value: str) -> int:
raise ValueError('not an int: "%s"' % str(value))
return int(float_value)


@staticmethod
def maybe_apply(func, quantity):
"""Apply the given function to the given quantity if not null-ish."""
Expand All @@ -205,6 +219,7 @@ def maybe_apply(func, quantity):
else:
return func(quantity)


@staticmethod
def validate_quantity(row, attr_quantity):
"""Take a row and validate a given associated quantity (e.g., val, se, stderr).
Expand All @@ -218,6 +233,7 @@ def validate_quantity(row, attr_quantity):
# val was a string or another data
return "Error"


@staticmethod
def validate_missing_code(row, attr_quantity, attr_name, filepath=None, logger=None):
"""Take a row and validate the missing code associated with
Expand Down Expand Up @@ -250,8 +266,9 @@ def validate_missing_code(row, attr_quantity, attr_name, filepath=None, logger=N

return missing_entry


@staticmethod
def extract_and_check_row(row, geo_type, filepath=None):
def extract_and_check_row(row: DFRow, geo_type: str, filepath: Optional[str] = None) -> Tuple[Optional[CsvRowValue], Optional[str]]:
"""Extract and return `CsvRowValue` from a CSV row, with sanity checks.

Also returns the name of the field which failed sanity check, or None.
Expand Down Expand Up @@ -331,8 +348,9 @@ def extract_and_check_row(row, geo_type, filepath=None):
# return extracted and validated row values
return (CsvRowValue(geo_id, value, stderr, sample_size, missing_value, missing_stderr, missing_sample_size), None)


@staticmethod
def load_csv(filepath, geo_type):
def load_csv(filepath: str, details: PathDetails) -> Iterator[Optional[CovidcastRow]]:
"""Load, validate, and yield data as `RowValues` from a CSV file.

filepath: the CSV file to be loaded
Expand All @@ -357,9 +375,32 @@ def load_csv(filepath, geo_type):
table.rename(columns={"val": "value", "se": "stderr", "missing_val": "missing_value", "missing_se": "missing_stderr"}, inplace=True)

for row in table.itertuples(index=False):
row_values, error = CsvImporter.extract_and_check_row(row, geo_type, filepath)
csv_row_values, error = CsvImporter.extract_and_check_row(row, details.geo_type, filepath)

if error:
logger.warning(event = 'invalid value for row', detail=(str(row), error), file=filepath)
yield None
continue
yield row_values

yield CovidcastRow(
details.source,
details.signal,
details.time_type,
details.geo_type,
details.time_value,
csv_row_values.geo_value,
csv_row_values.value,
csv_row_values.stderr,
csv_row_values.sample_size,
csv_row_values.missing_value,
csv_row_values.missing_stderr,
csv_row_values.missing_sample_size,
details.issue,
details.lag,
# These four fields are unused by database acquisition
# TODO: These will be used when CovidcastRow is updated.
# id=None,
# direction=None,
# direction_updated_timestamp=0,
# value_updated_timestamp=0,
)
79 changes: 40 additions & 39 deletions src/acquisition/covidcast/csv_to_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import argparse
import os
import time
from logging import Logger
from typing import Callable, Iterable, Optional, Tuple

# first party
from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow, DBLoadStateException
from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails
from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException
from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger

Expand All @@ -28,17 +30,19 @@ def get_argument_parser():
help="filename for log output (defaults to stdout)")
return parser

def collect_files(data_dir, specific_issue_date,csv_importer_impl=CsvImporter):

def collect_files(data_dir: str, specific_issue_date: bool):
"""Fetch path and data profile details for each file to upload."""
logger= get_structured_logger('collect_files')
if specific_issue_date:
results = list(csv_importer_impl.find_issue_specific_csv_files(data_dir))
results = list(CsvImporter.find_issue_specific_csv_files(data_dir))
else:
results = list(csv_importer_impl.find_csv_files(os.path.join(data_dir, 'receiving')))
results = list(CsvImporter.find_csv_files(os.path.join(data_dir, 'receiving')))
logger.info(f'found {len(results)} files')
return results

def make_handlers(data_dir, specific_issue_date, file_archiver_impl=FileArchiver):

def make_handlers(data_dir: str, specific_issue_date: bool):
if specific_issue_date:
# issue-specific uploads are always one-offs, so we can leave all
# files in place without worrying about cleaning up
Expand All @@ -47,7 +51,7 @@ def handle_failed(path_src, filename, source, logger):

def handle_successful(path_src, filename, source, logger):
logger.info(event='archiving as successful',file=filename)
file_archiver_impl.archive_inplace(path_src, filename)
FileArchiver.archive_inplace(path_src, filename)
else:
# normal automation runs require some shuffling to remove files
# from receiving and place them in the archive
Expand All @@ -59,22 +63,24 @@ def handle_failed(path_src, filename, source, logger):
logger.info(event='archiving as failed - ', detail=source, file=filename)
path_dst = os.path.join(archive_failed_dir, source)
compress = False
file_archiver_impl.archive_file(path_src, path_dst, filename, compress)
FileArchiver.archive_file(path_src, path_dst, filename, compress)

# helper to archive a successful file with compression
def handle_successful(path_src, filename, source, logger):
logger.info(event='archiving as successful',file=filename)
path_dst = os.path.join(archive_successful_dir, source)
compress = True
file_archiver_impl.archive_file(path_src, path_dst, filename, compress)
FileArchiver.archive_file(path_src, path_dst, filename, compress)

return handle_successful, handle_failed


def upload_archive(
path_details,
database,
handlers,
logger,
csv_importer_impl=CsvImporter):
path_details: Iterable[Tuple[str, Optional[PathDetails]]],
database: Database,
handlers: Tuple[Callable],
logger: Logger
):
"""Upload CSVs to the database and archive them using the specified handlers.

:path_details: output from CsvImporter.find*_csv_files
Expand All @@ -89,20 +95,16 @@ def upload_archive(
total_modified_row_count = 0
# iterate over each file
for path, details in path_details:
logger.info(event='handling',dest=path)
logger.info(event='handling', dest=path)
path_src, filename = os.path.split(path)

# file path or name was invalid, source is unknown
if not details:
# file path or name was invalid, source is unknown
archive_as_failed(path_src, filename, 'unknown',logger)
continue

(source, signal, time_type, geo_type, time_value, issue, lag) = details

csv_rows = csv_importer_impl.load_csv(path, geo_type)

cc_rows = CovidcastRow.fromCsvRows(csv_rows, source, signal, time_type, geo_type, time_value, issue, lag)
rows_list = list(cc_rows)
csv_rows = CsvImporter.load_csv(path, details)
rows_list = list(csv_rows)
Comment on lines +106 to +107
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: this is an enormous improvement

way more readable and less dependent on catching small errors in long tuple definitions. I have no idea why load_csv wasn't returning a CovidcastRow iterator to begin with.

all_rows_valid = rows_list and all(r is not None for r in rows_list)
if all_rows_valid:
try:
Expand All @@ -111,12 +113,13 @@ def upload_archive(
logger.info(
"Inserted database rows",
row_count = modified_row_count,
source = source,
signal = signal,
geo_type = geo_type,
time_value = time_value,
issue = issue,
lag = lag)
source = details.source,
signal = details.signal,
geo_type = details.geo_type,
time_value = details.time_value,
issue = details.issue,
lag = details.lag
)
if modified_row_count is None or modified_row_count: # else would indicate zero rows inserted
total_modified_row_count += (modified_row_count if modified_row_count else 0)
database.commit()
Expand All @@ -131,40 +134,37 @@ def upload_archive(

# archive the current file based on validation results
if all_rows_valid:
archive_as_successful(path_src, filename, source, logger)
archive_as_successful(path_src, filename, details.source, logger)
else:
archive_as_failed(path_src, filename, source,logger)
archive_as_failed(path_src, filename, details.source, logger)

return total_modified_row_count


def main(
args,
database_impl=Database,
collect_files_impl=collect_files,
upload_archive_impl=upload_archive):
def main(args):
"""Find, parse, and upload covidcast signals."""

logger = get_structured_logger("csv_ingestion", filename=args.log_file)
start_time = time.time()

# shortcut escape without hitting db if nothing to do
path_details = collect_files_impl(args.data_dir, args.specific_issue_date)
path_details = collect_files(args.data_dir, args.specific_issue_date)
if not path_details:
logger.info('nothing to do; exiting...')
return

logger.info("Ingesting CSVs", csv_count = len(path_details))

database = database_impl()
database = Database()
database.connect()

try:
modified_row_count = upload_archive_impl(
modified_row_count = upload_archive(
path_details,
database,
make_handlers(args.data_dir, args.specific_issue_date),
logger)
logger
)
logger.info("Finished inserting/updating database rows", row_count = modified_row_count)
finally:
database.do_analyze()
Expand All @@ -175,5 +175,6 @@ def main(
"Ingested CSVs into database",
total_runtime_in_seconds=round(time.time() - start_time, 2))


if __name__ == '__main__':
main(get_argument_parser().parse_args())
Loading