Skip to content

Revert "Release covidcast-indicators 0.3.11" #1589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.3.11
current_version = 0.3.10
commit = True
message = chore: bump covidcast-indicators to {new_version}
tag = False
9 changes: 0 additions & 9 deletions ansible/templates/dsew_community_profile-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,5 @@
"booster_doses_admin_7dav"
]
}
},
"archive": {
"aws_credentials": {
"aws_access_key_id": "{{ delphi_aws_access_key_id }}",
"aws_secret_access_key": "{{ delphi_aws_secret_access_key }}"
},
"bucket_name": "delphi-covidcast-indicator-output",
"cache_dir": "./cache",
"indicator_prefix": "delphi_dsew_community_profile"
}
}
1 change: 0 additions & 1 deletion dsew_community_profile/.gitignore

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
DOWNLOAD_ATTACHMENT = URL_PREFIX + "/files/{assetId}?download=true&filename={filename}"
DOWNLOAD_LISTING = URL_PREFIX + ".json"

INTERP_LENGTH = 5

@dataclass
class Transform:
"""Transformation filters for interpreting a particular sheet in the workbook."""
Expand Down
96 changes: 7 additions & 89 deletions dsew_community_profile/delphi_dsew_community_profile/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import datetime
import os
import re
from typing import Dict, Tuple
from urllib.parse import quote_plus as quote_as_url

import pandas as pd
Expand All @@ -13,14 +12,9 @@

from delphi_utils.geomap import GeoMapper

from .constants import (
TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE,
from .constants import (TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE,
IS_PROP, NOT_PROP,
DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING,
INTERP_LENGTH
)

DataDict = Dict[Tuple[str, str, bool], pd.DataFrame]
DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING)

# YYYYMMDD
# example: "Community Profile Report 20211104.xlsx"
Expand Down Expand Up @@ -418,54 +412,27 @@ def fetch_listing(params):
)
for el in listing if el['filename'].endswith("xlsx")
]
keep = []
if params['indicator']['reports'] == 'new':
# drop files we already have in the input cache
keep = [el for el in listing if not os.path.exists(el['cached_filename'])]
listing = [el for el in listing if not os.path.exists(el['cached_filename'])]
elif params['indicator']['reports'].find("--") > 0:
# drop files outside the specified publish-date range
start_str, _, end_str = params['indicator']['reports'].partition("--")
start_date = datetime.datetime.strptime(start_str, "%Y-%m-%d").date()
end_date = datetime.datetime.strptime(end_str, "%Y-%m-%d").date()
keep = [
listing = [
el for el in listing
if start_date <= el['publish_date'] <= end_date
]

# reference date is guaranteed to be on or before publish date, so we can trim
# reports that are too early
if 'export_start_date' in params['indicator']:
keep = [
el for el in keep
listing = [
el for el in listing
if params['indicator']['export_start_date'] <= el['publish_date']
]
# can't do the same for export_end_date

# if we're only running on a subset, make sure we have enough data for interp
if keep:
keep = extend_listing_for_interp(keep, listing)
return keep if keep else listing

def extend_listing_for_interp(keep, listing):
"""Grab additional files from the full listing for interpolation if needed.

Selects files based purely on publish_date, so may include duplicates where
multiple reports for a single publish_date are available.

Parameters:
- keep: list of reports desired in the final output
- listing: complete list of reports available from healthdata.gov

Returns: list of reports including keep and additional files needed for
interpolation.
"""
publish_date_keeplist = set()
for el in keep:
# starts at 0 so includes keep publish_dates
for i in range(INTERP_LENGTH):
publish_date_keeplist.add(el['publish_date'] - datetime.timedelta(days=i))
keep = [el for el in listing if el['publish_date'] in publish_date_keeplist]
return keep
return listing

def download_and_parse(listing, logger):
"""Convert a list of report files into Dataset instances."""
Expand Down Expand Up @@ -605,57 +572,8 @@ def fetch_new_reports(params, logger=None):
if SIGNALS[sig]["make_prop"]:
ret[(geo, sig, IS_PROP)] = generate_prop_signal(df, geo, geomapper)

ret = interpolate_missing_values(ret)

return ret

def interpolate_missing_values(dfs: DataDict) -> DataDict:
"""Interpolates each signal in the dictionary of dfs."""
interpolate_df = dict()
for key, df in dfs.items():
# Here we exclude the 'positivity' signal from interpolation. This is a temporary fix.
# https://github.com/cmu-delphi/covidcast-indicators/issues/1576
_, sig, _ = key
if sig == "positivity":
continue

geo_dfs = []
for geo, group_df in df.groupby("geo_id"):
reindexed_group_df = group_df.set_index("timestamp").reindex(
pd.date_range(group_df.timestamp.min(), group_df.timestamp.max())
)
reindexed_group_df["geo_id"] = geo
if "val" in reindexed_group_df.columns and not reindexed_group_df["val"].isna().all():
reindexed_group_df["val"] = (
reindexed_group_df["val"]
.interpolate(method="linear", limit_area="inside")
.astype(float)
)
if "se" in reindexed_group_df.columns:
reindexed_group_df["se"] = (
reindexed_group_df["se"]
.interpolate(method="linear", limit_area="inside")
.astype(float)
)
if (
"sample_size" in reindexed_group_df.columns
and not reindexed_group_df["sample_size"].isna().all()
):
reindexed_group_df["sample_size"] = (
reindexed_group_df["sample_size"]
.interpolate(method="linear", limit_area="inside")
.astype(float)
)
if "publish_date" in reindexed_group_df.columns:
reindexed_group_df["publish_date"] = reindexed_group_df["publish_date"].fillna(
method="bfill"
)
geo_dfs.append(reindexed_group_df)
interpolate_df[key] = (
pd.concat(geo_dfs).reset_index().rename(columns={"index": "timestamp"})
)
return interpolate_df

def generate_prop_signal(df, geo, geo_mapper):
"""Transform base df into a proportion (per 100k population)."""
if geo == "state":
Expand Down
106 changes: 4 additions & 102 deletions dsew_community_profile/tests/test_pull.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,20 @@
from collections import namedtuple
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from datetime import date, datetime
from itertools import chain
from typing import Any, Dict, List, Union
import pandas as pd
from pandas.util.testing import assert_frame_equal
import numpy as np
import pytest
from unittest.mock import patch, Mock

from delphi_utils.geomap import GeoMapper

from delphi_dsew_community_profile.pull import (
DatasetTimes, Dataset,
from delphi_dsew_community_profile.pull import (DatasetTimes, Dataset,
fetch_listing, nation_from_state, generate_prop_signal,
std_err, add_max_ts_col, unify_testing_sigs, interpolate_missing_values,
extend_listing_for_interp
)
std_err, add_max_ts_col, unify_testing_sigs)


example = namedtuple("example", "given expected")

def _assert_frame_equal(df1, df2, index_cols: List[str] = None):
# Ensure same columns present.
assert set(df1.columns) == set(df2.columns)
# Ensure same column order.
df1 = df1[df1.columns]
df2 = df2[df1.columns]
# Ensure same row order by using a common index and sorting.
df1 = df1.set_index(index_cols).sort_index()
df2 = df2.set_index(index_cols).sort_index()
return assert_frame_equal(df1, df2)

def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame:
df = df.copy()
for k, v in dtypes.items():
if k in df.columns:
df[k] = df[k].astype(v)
return df


class TestPull:
def test_DatasetTimes(self):
examples = [
Expand Down Expand Up @@ -477,77 +453,3 @@ def test_std_err(self):
"sample_size": [2, 2, 5, 10, 20, 0]
})
)

def test_interpolation(self):
DTYPES = {"geo_id": str, "timestamp": "datetime64[ns]", "val": float, "se": float, "sample_size": float, "publish_date": "datetime64[ns]"}
line = lambda x: 3 * x + 5

sig1 = _set_df_dtypes(pd.DataFrame({
"geo_id": "1",
"timestamp": pd.date_range("2022-01-01", "2022-01-10"),
"val": [line(i) for i in range(2, 12)],
"se": [line(i) for i in range(1, 11)],
"sample_size": [line(i) for i in range(0, 10)],
"publish_date": pd.to_datetime("2022-01-10")
}), dtypes=DTYPES)
# A linear signal missing two days which should be filled exactly by the linear interpolation.
missing_sig1 = sig1[(sig1.timestamp <= "2022-01-05") | (sig1.timestamp >= "2022-01-08")]

sig2 = sig1.copy()
sig2["geo_id"] = "2"
# A linear signal missing everything but the end points, should be filled exactly by linear interpolation.
missing_sig2 = sig2[(sig2.timestamp == "2022-01-01") | (sig2.timestamp == "2022-01-10")]

sig3 = _set_df_dtypes(pd.DataFrame({
"geo_id": "3",
"timestamp": pd.date_range("2022-01-01", "2022-01-10"),
"val": None,
"se": [line(i) for i in range(1, 11)],
"sample_size": [line(i) for i in range(0, 10)],
"publish_date": pd.to_datetime("2022-01-10")
}), dtypes=DTYPES)
# A signal missing everything, should be left alone.
missing_sig3 = sig3[(sig3.timestamp <= "2022-01-05") | (sig3.timestamp >= "2022-01-08")]

sig4 = _set_df_dtypes(pd.DataFrame({
"geo_id": "4",
"timestamp": pd.date_range("2022-01-01", "2022-01-10"),
"val": [None] * 9 + [10.0],
"se": [line(i) for i in range(1, 11)],
"sample_size": [line(i) for i in range(0, 10)],
"publish_date": pd.to_datetime("2022-01-10")
}), dtypes=DTYPES)
# A signal missing everything except for one point, should be left alone.
missing_sig4 = sig4[(sig4.timestamp <= "2022-01-05") | (sig4.timestamp >= "2022-01-08")]

missing_dfs = [missing_sig1, missing_sig2, missing_sig3, missing_sig4]
interpolated_dfs1 = interpolate_missing_values({("src", "sig", False): pd.concat(missing_dfs)})
expected_dfs = pd.concat([sig1, sig2, sig3, sig4])
_assert_frame_equal(interpolated_dfs1[("src", "sig", False)], expected_dfs, index_cols=["geo_id", "timestamp"])

@patch("delphi_dsew_community_profile.pull.INTERP_LENGTH", 2)
def test_extend_listing(self):
listing = [
{"publish_date": date(2020, 1, 20) - timedelta(days=i)}
for i in range(20)
]
examples = [
# single range
example(
[{"publish_date": date(2020, 1, 20)}],
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 19)}]
),
# disjoint ranges
example(
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 10)}],
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 19)},
{"publish_date": date(2020, 1, 10)}, {"publish_date": date(2020, 1, 9)}]
),
# conjoined ranges
example(
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 19)}],
[{"publish_date": date(2020, 1, 20)}, {"publish_date": date(2020, 1, 19)}, {"publish_date": date(2020, 1, 18)}]
),
]
for ex in examples:
assert extend_listing_for_interp(ex.given, listing) == ex.expected, ex.given