Skip to content

Release covidcast-indicators 0.3.11 #1586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1d9f353
Add basic interpolation function and a test stub
dshemetov Mar 10, 2022
a60baa5
Linear interpolation by default, add more tests
dshemetov Mar 23, 2022
4cf6268
Enforce a float type on val and sample_size
dshemetov Mar 25, 2022
ed90cd8
Interpolate standard error
dshemetov Apr 1, 2022
cd5b7bd
Update test
dshemetov Apr 1, 2022
8f9d464
Add gitignore and remove indent in test file
dshemetov Apr 13, 2022
c38db2e
Exclude naat_pct_positive from interpolation
dshemetov Apr 13, 2022
4b03630
Extend files to be processed for interpolation
krivard Apr 14, 2022
d00f42a
appease linter
krivard Apr 14, 2022
1b574bc
Minor comment update
dshemetov Apr 14, 2022
917baef
Merge pull request #1578 from cmu-delphi/krivard/dsew-interp-extend-l…
dshemetov Apr 14, 2022
177e048
Merge branch 'ds/dsew-interpolation' of https://github.com/cmu-delphi…
dshemetov Apr 14, 2022
6dba43d
Merge pull request #1580 from cmu-delphi/bot/sync-prod-main
krivard Apr 14, 2022
edae08a
Make the linter happy
dshemetov Apr 15, 2022
dec92af
Merge branch 'main' into ds/dsew-interpolation
dshemetov Apr 15, 2022
a38fcf8
Merge pull request #1555 from cmu-delphi/ds/dsew-interpolation
krivard Apr 18, 2022
0486d7d
Fix UnboundLocalError
krivard Apr 18, 2022
db20c40
whitespace
krivard Apr 18, 2022
1e5e5cf
Merge pull request #1583 from cmu-delphi/krivard/dsew-keep
krivard Apr 18, 2022
06ae8a8
Add archive params to prod dsew-cpr configuration
krivard Apr 19, 2022
ff3f36e
Merge pull request #1584 from cmu-delphi/krivard/dsew-prod-interp-params
krivard Apr 19, 2022
842a169
chore: bump covidcast-indicators to 0.3.11
Apr 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.3.10
current_version = 0.3.11
commit = True
message = chore: bump covidcast-indicators to {new_version}
tag = False
9 changes: 9 additions & 0 deletions ansible/templates/dsew_community_profile-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,14 @@
"booster_doses_admin_7dav"
]
}
},
"archive": {
"aws_credentials": {
"aws_access_key_id": "{{ delphi_aws_access_key_id }}",
"aws_secret_access_key": "{{ delphi_aws_secret_access_key }}"
},
"bucket_name": "delphi-covidcast-indicator-output",
"cache_dir": "./cache",
"indicator_prefix": "delphi_dsew_community_profile"
}
}
1 change: 1 addition & 0 deletions dsew_community_profile/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
input_cache/*
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
DOWNLOAD_ATTACHMENT = URL_PREFIX + "/files/{assetId}?download=true&filename={filename}"
DOWNLOAD_LISTING = URL_PREFIX + ".json"

INTERP_LENGTH = 5

@dataclass
class Transform:
"""Transformation filters for interpreting a particular sheet in the workbook."""
Expand Down
96 changes: 89 additions & 7 deletions dsew_community_profile/delphi_dsew_community_profile/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import datetime
import os
import re
from typing import Dict, Tuple
from urllib.parse import quote_plus as quote_as_url

import pandas as pd
Expand All @@ -12,9 +13,14 @@

from delphi_utils.geomap import GeoMapper

from .constants import (TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE,
from .constants import (
TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE,
IS_PROP, NOT_PROP,
DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING)
DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING,
INTERP_LENGTH
)

DataDict = Dict[Tuple[str, str, bool], pd.DataFrame]

# YYYYMMDD
# example: "Community Profile Report 20211104.xlsx"
Expand Down Expand Up @@ -412,27 +418,54 @@ def fetch_listing(params):
)
for el in listing if el['filename'].endswith("xlsx")
]
keep = []
if params['indicator']['reports'] == 'new':
# drop files we already have in the input cache
listing = [el for el in listing if not os.path.exists(el['cached_filename'])]
keep = [el for el in listing if not os.path.exists(el['cached_filename'])]
elif params['indicator']['reports'].find("--") > 0:
# drop files outside the specified publish-date range
start_str, _, end_str = params['indicator']['reports'].partition("--")
start_date = datetime.datetime.strptime(start_str, "%Y-%m-%d").date()
end_date = datetime.datetime.strptime(end_str, "%Y-%m-%d").date()
listing = [
keep = [
el for el in listing
if start_date <= el['publish_date'] <= end_date
]

# reference date is guaranteed to be on or before publish date, so we can trim
# reports that are too early
if 'export_start_date' in params['indicator']:
listing = [
el for el in listing
keep = [
el for el in keep
if params['indicator']['export_start_date'] <= el['publish_date']
]
# can't do the same for export_end_date
return listing

# if we're only running on a subset, make sure we have enough data for interp
if keep:
keep = extend_listing_for_interp(keep, listing)
return keep if keep else listing

def extend_listing_for_interp(keep, listing):
"""Grab additional files from the full listing for interpolation if needed.

Selects files based purely on publish_date, so may include duplicates where
multiple reports for a single publish_date are available.

Parameters:
- keep: list of reports desired in the final output
- listing: complete list of reports available from healthdata.gov

Returns: list of reports including keep and additional files needed for
interpolation.
"""
publish_date_keeplist = set()
for el in keep:
# starts at 0 so includes keep publish_dates
for i in range(INTERP_LENGTH):
publish_date_keeplist.add(el['publish_date'] - datetime.timedelta(days=i))
keep = [el for el in listing if el['publish_date'] in publish_date_keeplist]
return keep

def download_and_parse(listing, logger):
"""Convert a list of report files into Dataset instances."""
Expand Down Expand Up @@ -572,8 +605,57 @@ def fetch_new_reports(params, logger=None):
if SIGNALS[sig]["make_prop"]:
ret[(geo, sig, IS_PROP)] = generate_prop_signal(df, geo, geomapper)

ret = interpolate_missing_values(ret)

return ret

def interpolate_missing_values(dfs: DataDict) -> DataDict:
"""Interpolates each signal in the dictionary of dfs."""
interpolate_df = dict()
for key, df in dfs.items():
# Here we exclude the 'positivity' signal from interpolation. This is a temporary fix.
# https://github.com/cmu-delphi/covidcast-indicators/issues/1576
_, sig, _ = key
if sig == "positivity":
continue

geo_dfs = []
for geo, group_df in df.groupby("geo_id"):
reindexed_group_df = group_df.set_index("timestamp").reindex(
pd.date_range(group_df.timestamp.min(), group_df.timestamp.max())
)
reindexed_group_df["geo_id"] = geo
if "val" in reindexed_group_df.columns and not reindexed_group_df["val"].isna().all():
reindexed_group_df["val"] = (
reindexed_group_df["val"]
.interpolate(method="linear", limit_area="inside")
.astype(float)
)
if "se" in reindexed_group_df.columns:
reindexed_group_df["se"] = (
reindexed_group_df["se"]
.interpolate(method="linear", limit_area="inside")
.astype(float)
)
if (
"sample_size" in reindexed_group_df.columns
and not reindexed_group_df["sample_size"].isna().all()
):
reindexed_group_df["sample_size"] = (
reindexed_group_df["sample_size"]
.interpolate(method="linear", limit_area="inside")
.astype(float)
)
if "publish_date" in reindexed_group_df.columns:
reindexed_group_df["publish_date"] = reindexed_group_df["publish_date"].fillna(
method="bfill"
)
geo_dfs.append(reindexed_group_df)
interpolate_df[key] = (
pd.concat(geo_dfs).reset_index().rename(columns={"index": "timestamp"})
)
return interpolate_df

def generate_prop_signal(df, geo, geo_mapper):
"""Transform base df into a proportion (per 100k population)."""
if geo == "state":
Expand Down
106 changes: 102 additions & 4 deletions dsew_community_profile/tests/test_pull.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,44 @@
from collections import namedtuple
from datetime import date, datetime
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from itertools import chain
from typing import Any, Dict, List, Union
import pandas as pd
from pandas.util.testing import assert_frame_equal
import numpy as np
import pytest
from unittest.mock import patch, Mock

from delphi_utils.geomap import GeoMapper

from delphi_dsew_community_profile.pull import (DatasetTimes, Dataset,
from delphi_dsew_community_profile.pull import (
DatasetTimes, Dataset,
fetch_listing, nation_from_state, generate_prop_signal,
std_err, add_max_ts_col, unify_testing_sigs)
std_err, add_max_ts_col, unify_testing_sigs, interpolate_missing_values,
extend_listing_for_interp
)


example = namedtuple("example", "given expected")


def _assert_frame_equal(df1, df2, index_cols: List[str] = None):
# Ensure same columns present.
assert set(df1.columns) == set(df2.columns)
# Ensure same column order.
df1 = df1[df1.columns]
df2 = df2[df1.columns]
# Ensure same row order by using a common index and sorting.
df1 = df1.set_index(index_cols).sort_index()
df2 = df2.set_index(index_cols).sort_index()
return assert_frame_equal(df1, df2)

def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame:
df = df.copy()
for k, v in dtypes.items():
if k in df.columns:
df[k] = df[k].astype(v)
return df

class TestPull:
def test_DatasetTimes(self):
examples = [
Expand Down Expand Up @@ -453,3 +477,77 @@ def test_std_err(self):
"sample_size": [2, 2, 5, 10, 20, 0]
})
)

def test_interpolation(self):
DTYPES = {"geo_id": str, "timestamp": "datetime64[ns]", "val": float, "se": float, "sample_size": float, "publish_date": "datetime64[ns]"}
line = lambda x: 3 * x + 5

sig1 = _set_df_dtypes(pd.DataFrame({
"geo_id": "1",
"timestamp": pd.date_range("2022-01-01", "2022-01-10"),
"val": [line(i) for i in range(2, 12)],
"se": [line(i) for i in range(1, 11)],
"sample_size": [line(i) for i in range(0, 10)],
"publish_date": pd.to_datetime("2022-01-10")
}), dtypes=DTYPES)
# A linear signal missing two days which should be filled exactly by the linear interpolation.
missing_sig1 = sig1[(sig1.timestamp <= "2022-01-05") | (sig1.timestamp >= "2022-01-08")]

sig2 = sig1.copy()
sig2["geo_id"] = "2"
# A linear signal missing everything but the end points, should be filled exactly by linear interpolation.
missing_sig2 = sig2[(sig2.timestamp == "2022-01-01") | (sig2.timestamp == "2022-01-10")]

sig3 = _set_df_dtypes(pd.DataFrame({
"geo_id": "3",
"timestamp": pd.date_range("2022-01-01", "2022-01-10"),
"val": None,
"se": [line(i) for i in range(1, 11)],
"sample_size": [line(i) for i in range(0, 10)],
"publish_date": pd.to_datetime("2022-01-10")
}), dtypes=DTYPES)
# A signal missing everything, should be left alone.
missing_sig3 = sig3[(sig3.timestamp <= "2022-01-05") | (sig3.timestamp >= "2022-01-08")]

sig4 = _set_df_dtypes(pd.DataFrame({
"geo_id": "4",
"timestamp": pd.date_range("2022-01-01", "2022-01-10"),
"val": [None] * 9 + [10.0],
"se": [line(i) for i in range(1, 11)],
"sample_size": [line(i) for i in range(0, 10)],
"publish_date": pd.to_datetime("2022-01-10")
}), dtypes=DTYPES)
# A signal missing everything except for one point, should be left alone.
missing_sig4 = sig4[(sig4.timestamp <= "2022-01-05") | (sig4.timestamp >= "2022-01-08")]

missing_dfs = [missing_sig1, missing_sig2, missing_sig3, missing_sig4]
interpolated_dfs1 = interpolate_missing_values({("src", "sig", False): pd.concat(missing_dfs)})
expected_dfs = pd.concat([sig1, sig2, sig3, sig4])
_assert_frame_equal(interpolated_dfs1[("src", "sig", False)], expected_dfs, index_cols=["geo_id", "timestamp"])

@patch("delphi_dsew_community_profile.pull.INTERP_LENGTH", 2)
def test_extend_listing(self):
listing = [
{"publish_date": date(2020, 1, 20) - timedelta(days=i)}
for i in range(20)
]
examples = [
# single range
example(
[{"publish_date": date(2020, 1, 20)}],
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 19)}]
),
# disjoint ranges
example(
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 10)}],
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 19)},
{"publish_date": date(2020, 1, 10)}, {"publish_date": date(2020, 1, 9)}]
),
# conjoined ranges
example(
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 19)}],
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 19)}, {"publish_date": date(2020, 1, 18)}]
),
]
for ex in examples:
assert extend_listing_for_interp(ex.given, listing) == ex.expected, ex.given