Skip to content

DSEW-CPR: Extend files to be processed for interpolation #1578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
DOWNLOAD_ATTACHMENT = URL_PREFIX + "/files/{assetId}?download=true&filename={filename}"
DOWNLOAD_LISTING = URL_PREFIX + ".json"

INTERP_LENGTH = 5

@dataclass
class Transform:
"""Transformation filters for interpreting a particular sheet in the workbook."""
Expand Down
39 changes: 32 additions & 7 deletions dsew_community_profile/delphi_dsew_community_profile/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@

from delphi_utils.geomap import GeoMapper

from .constants import (TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE,
from .constants import (
TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE,
IS_PROP, NOT_PROP,
DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING)
DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING,
INTERP_LENGTH
)

DataDict = Dict[Tuple[str, str, bool], pd.DataFrame]

Expand Down Expand Up @@ -416,25 +419,47 @@ def fetch_listing(params):
]
if params['indicator']['reports'] == 'new':
# drop files we already have in the input cache
listing = [el for el in listing if not os.path.exists(el['cached_filename'])]
keep = [el for el in listing if not os.path.exists(el['cached_filename'])]
elif params['indicator']['reports'].find("--") > 0:
# drop files outside the specified publish-date range
start_str, _, end_str = params['indicator']['reports'].partition("--")
start_date = datetime.datetime.strptime(start_str, "%Y-%m-%d").date()
end_date = datetime.datetime.strptime(end_str, "%Y-%m-%d").date()
listing = [
keep = [
el for el in listing
if start_date <= el['publish_date'] <= end_date
]

# reference date is guaranteed to be on or before publish date, so we can trim
# reports that are too early
if 'export_start_date' in params['indicator']:
listing = [
el for el in listing
keep = [
el for el in keep
if params['indicator']['export_start_date'] <= el['publish_date']
]
# can't do the same for export_end_date
return listing
return extend_listing_for_interp(keep, listing)

def extend_listing_for_interp(keep, listing):
"""Grab additional files from the full listing for interpolation if needed.

Selects files based purely on publish_date, so may include duplicates where
multiple reports for a single publish_date are available.

Parameters:
- keep: list of reports desired in the final output
- listing: complete list of reports available from healthdata.gov

Returns: list of reports including keep and additional files needed for
interpolation.
"""
publish_date_keeplist = set()
for el in keep:
# starts at 0 so includes keep publish_dates
for i in range(INTERP_LENGTH):
publish_date_keeplist.add(el['publish_date'] - datetime.timedelta(days=i))
keep = [el for el in listing if el['publish_date'] in publish_date_keeplist]
return keep

def download_and_parse(listing, logger):
"""Convert a list of report files into Dataset instances."""
Expand Down
36 changes: 33 additions & 3 deletions dsew_community_profile/tests/test_pull.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import namedtuple
from dataclasses import dataclass
from datetime import date, datetime
from datetime import date, datetime, timedelta
from itertools import chain
from typing import Any, Dict, List, Union
import pandas as pd
Expand All @@ -11,9 +11,12 @@

from delphi_utils.geomap import GeoMapper

from delphi_dsew_community_profile.pull import (DatasetTimes, Dataset,
from delphi_dsew_community_profile.pull import (
DatasetTimes, Dataset,
fetch_listing, nation_from_state, generate_prop_signal,
std_err, add_max_ts_col, unify_testing_sigs, interpolate_missing_values)
std_err, add_max_ts_col, unify_testing_sigs, interpolate_missing_values,
extend_listing_for_interp
)


example = namedtuple("example", "given expected")
Expand Down Expand Up @@ -489,3 +492,30 @@ def test_interpolation(self):
interpolated_dfs1 = interpolate_missing_values({("src", "sig", False): pd.concat(missing_dfs)})
expected_dfs = pd.concat([sig1, sig2, sig3, sig4])
_assert_frame_equal(interpolated_dfs1[("src", "sig", False)], expected_dfs, index_cols=["geo_id", "timestamp"])

@patch("delphi_dsew_community_profile.pull.INTERP_LENGTH", 2)
def test_extend_listing(self):
listing = [
{"publish_date": date(2020, 1, 20) - timedelta(days=i)}
for i in range(20)
]
examples = [
# single range
example(
[{"publish_date": date(2020, 1, 20)}],
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 19)}]
),
# disjoint ranges
example(
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 10)}],
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 19)},
{"publish_date": date(2020, 1, 10)}, {"publish_date": date(2020, 1, 9)}]
),
# conjoined ranges
example(
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 19)}],
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 19)}, {"publish_date": date(2020, 1, 18)}]
),
]
for ex in examples:
assert extend_listing_for_interp(ex.given, listing) == ex.expected, ex.given