Skip to content

Commit 4c03f4e

Browse files
authored
Merge pull request #1562 from cmu-delphi/ndefries/cpr-lenient-check-ts-per-publishdate
Fix CPR bugs associated with non-unique timestamps by publish date
2 parents 8806744 + 9e94e66 commit 4c03f4e

File tree

2 files changed

+189
-43
lines changed

2 files changed

+189
-43
lines changed

dsew_community_profile/delphi_dsew_community_profile/pull.py

Lines changed: 151 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ def select_fn(h):
362362
sig_select = [s for s in select if s[-1].find(sig) >= 0]
363363
# The name of the cumulative vaccination was changed after 03/09/2021
364364
# when J&J vaccines were added.
365-
if (sig == "fully vaccinated") and (len(sig_select)==0):
365+
if (sig == "fully vaccinated") and (len(sig_select) == 0):
366366
sig_select = [s for s in select if s[-1].find("people with full course") >= 0]
367367
# Since "doses administered" is a substring of another desired header,
368368
# "booster doses administered", we need to more strictly check if "doses administered"
@@ -383,6 +383,7 @@ def select_fn(h):
383383
})
384384
for si in sig_select
385385
])
386+
386387
for sig in COUNTS_7D_SIGNALS:
387388
assert (sheet.level, sig, NOT_PROP) in self.dfs.keys()
388389
self.dfs[(sheet.level, sig, NOT_PROP)]["val"] /= 7 # 7-day total -> 7-day average
@@ -446,6 +447,8 @@ def download_and_parse(listing, logger):
446447

447448
def nation_from_state(df, sig, geomapper):
448449
"""Compute nation level from state df."""
450+
if df.empty:
451+
return df
449452
if SIGNALS[sig]["is_rate"]: # true if sig is a rate
450453
df = geomapper.add_population_column(df, "state_id") \
451454
.rename(columns={"population":"weight"})
@@ -458,12 +461,12 @@ def nation_from_state(df, sig, geomapper):
458461
).drop(
459462
"norm_denom", axis=1
460463
)
461-
# The filter in `fetch_new_reports` to keep most recent publish date gurantees
462-
# that we'll only see one unique publish date per timestamp here.
464+
# The filter in `fetch_new_reports` to keep most recent publish date
465+
# gurantees that we'll only see one unique publish date per timestamp
466+
# here, so just keep the first obs of each group.
463467
publish_date_by_ts = df.groupby(
464468
["timestamp"]
465-
)["publish_date"].apply(
466-
lambda x: np.unique(x)[0]
469+
)["publish_date"].first(
467470
).reset_index(
468471
)
469472
df = geomapper.replace_geocode(
@@ -474,10 +477,32 @@ def nation_from_state(df, sig, geomapper):
474477
)
475478
df["se"] = None
476479
df["sample_size"] = None
480+
# Recreate publish_date column
477481
df = pd.merge(df, publish_date_by_ts, on="timestamp", how="left")
478482

479483
return df
480484

485+
def keep_latest_report(df, sig):
486+
"""Keep data associated with most recent report for each timestamp."""
487+
df = df.groupby(
488+
"timestamp"
489+
).apply(
490+
lambda x: x[x["publish_date"] == x["publish_date"].max()]
491+
).drop_duplicates(
492+
)
493+
494+
if not df.empty:
495+
df = df.reset_index(drop=True)
496+
assert all(df.groupby(
497+
["timestamp", "geo_id"]
498+
).size(
499+
).reset_index(
500+
drop=True
501+
) == 1), f"Duplicate rows in {sig} indicate that one or" \
502+
+ " more reports were published multiple times and the copies differ"
503+
504+
return df
505+
481506
def fetch_new_reports(params, logger=None):
482507
"""Retrieve, compute, and collate all data we haven't seen yet."""
483508
listing = fetch_listing(params)
@@ -486,39 +511,43 @@ def fetch_new_reports(params, logger=None):
486511
datasets = download_and_parse(listing, logger)
487512
# collect like signals together, keeping most recent publish date
488513
ret = {}
489-
for sig, lst in datasets.items():
490-
latest_sig_df = pd.concat(
491-
lst
492-
).groupby(
493-
"timestamp"
494-
).apply(
495-
lambda x: x[x["publish_date"] == x["publish_date"].max()]
496-
).drop_duplicates(
497-
)
498514

499-
if len(latest_sig_df.index) > 0:
500-
latest_sig_df = latest_sig_df.reset_index(drop=True)
501-
assert all(latest_sig_df.groupby(
502-
["timestamp", "geo_id"]
503-
).size(
504-
).reset_index(
505-
drop=True
506-
) == 1), f"Duplicate rows in {sig} indicate that one or" \
507-
+ " more reports were published multiple times and the copies differ"
515+
for key, lst in datasets.items():
516+
(_, sig, _) = key
517+
latest_key_df = pd.concat(lst)
518+
if sig in ("total", "positivity"):
519+
latest_key_df = pd.concat(apply_thres_change_date(
520+
keep_latest_report,
521+
latest_key_df,
522+
[sig] * 2
523+
))
524+
else:
525+
latest_key_df = keep_latest_report(latest_key_df, sig)
508526

509-
ret[sig] = latest_sig_df
527+
if not latest_key_df.empty:
528+
ret[key] = latest_key_df
510529

511530
# add nation from state
512531
geomapper = GeoMapper()
513532
for sig in SIGNALS:
514533
state_key = ("state", sig, NOT_PROP)
515534
if state_key not in ret:
516535
continue
517-
ret[("nation", sig, NOT_PROP)] = nation_from_state(
518-
ret[state_key].rename(columns={"geo_id": "state_id"}),
519-
sig,
520-
geomapper
521-
)
536+
537+
if sig in ("total", "positivity"):
538+
nation_df = pd.concat(apply_thres_change_date(
539+
nation_from_state,
540+
ret[state_key].rename(columns={"geo_id": "state_id"}),
541+
[sig] * 2,
542+
[geomapper] * 2
543+
))
544+
else:
545+
nation_df = nation_from_state(
546+
ret[state_key].rename(columns={"geo_id": "state_id"}),
547+
sig,
548+
geomapper
549+
)
550+
ret[("nation", sig, NOT_PROP)] = nation_df
522551

523552
for key, df in ret.copy().items():
524553
(geo, sig, prop) = key
@@ -597,9 +626,39 @@ def unify_testing_sigs(positivity_df, volume_df):
597626
https://docs.google.com/document/d/1MoIimdM_8OwG4SygoeQ9QEVZzIuDl339_a0xoYa6vuA/edit#
598627
599628
"""
600-
# Combine test positivity and test volume, maintaining "this week" and "previous week" status.
629+
# Check that we have positivity *and* volume for each publishdate+geo, and
630+
# that they have the same number of timestamps.
631+
pos_count_ts = positivity_df.groupby(
632+
["publish_date", "geo_id"]
633+
).agg(
634+
num_obs=("timestamp", "count"),
635+
num_unique_obs=("timestamp", "nunique")
636+
)
637+
vol_count_ts = volume_df.groupby(
638+
["publish_date", "geo_id"]
639+
).agg(
640+
num_obs=("timestamp", "count"),
641+
num_unique_obs=("timestamp", "nunique")
642+
)
643+
merged = pos_count_ts.merge(
644+
vol_count_ts,
645+
on=["geo_id", "publish_date"],
646+
how="outer",
647+
indicator=True
648+
)
649+
assert all(
650+
merged["_merge"] == "both"
651+
) and all(
652+
merged.num_obs_x == merged.num_obs_y
653+
) and all(
654+
merged.num_unique_obs_x == merged.num_unique_obs_y
655+
), \
656+
"Each publish date-geo value combination should be available for both " + \
657+
"test positivity and test volume, and have the same number of timestamps available."
601658
assert len(positivity_df.index) == len(volume_df.index), \
602659
"Test positivity and volume data have different numbers of observations."
660+
expected_rows = len(positivity_df.index)
661+
603662
volume_df = add_max_ts_col(volume_df)[
604663
["geo_id", "publish_date", "val", "is_max_group_ts"]
605664
].rename(
@@ -608,28 +667,41 @@ def unify_testing_sigs(positivity_df, volume_df):
608667
col_order = list(positivity_df.columns)
609668
positivity_df = add_max_ts_col(positivity_df).drop(["sample_size"], axis=1)
610669

670+
# Combine test positivity and test volume, maintaining "this week" and
671+
# "previous week" status. Perform outer join here so that we can later
672+
# check if any observations did not have a match.
611673
df = pd.merge(
612674
positivity_df, volume_df,
613675
on=["publish_date", "geo_id", "is_max_group_ts"],
614-
how="left"
676+
how="outer",
677+
indicator=True
615678
).drop(
616679
["is_max_group_ts"], axis=1
617680
)
618681

682+
# Check that every volume observation was matched with a positivity observation.
683+
assert (len(df.index) == expected_rows) and all(df["_merge"] == "both"), \
684+
"Some observations in the test positivity data were not matched with test volume data."
685+
619686
# Drop everything with 5 or fewer total tests.
620687
df = df.loc[df.sample_size > 5]
621688

622689
# Generate stderr.
623-
df = df.assign(se=std_err(df))
690+
df = df.assign(
691+
se=std_err(df)
692+
).drop(
693+
["_merge"],
694+
axis=1
695+
)
624696

625697
return df[col_order]
626698

627699
def add_max_ts_col(df):
628700
"""
629701
Add column to differentiate timestamps for a given publish date-geo combo.
630702
631-
Each publish date is associated with two timestamps for test volume and
632-
test positivity. The older timestamp corresponds to data from the
703+
Each publish date is associated with up to two timestamps for test volume
704+
and test positivity. The older timestamp corresponds to data from the
633705
"previous week"; the newer timestamp corresponds to the "last week".
634706
635707
Since test volume and test positivity timestamps don't match exactly, we
@@ -638,11 +710,18 @@ def add_max_ts_col(df):
638710
the join. This new column, which is analagous to the "last/previous week"
639711
classification, is used to merge on.
640712
"""
713+
assert_df = df.groupby(
714+
["publish_date", "geo_id"]
715+
).agg(
716+
num_obs=("timestamp", "count"),
717+
num_unique_obs=("timestamp", "nunique")
718+
)
641719
assert all(
642-
df.groupby(["publish_date", "geo_id"])["timestamp"].count() == 2
720+
assert_df.num_obs <= 2
643721
) and all(
644-
df.groupby(["publish_date", "geo_id"])["timestamp"].nunique() == 2
645-
), "Testing signals should have two unique timestamps per publish date-region combination."
722+
assert_df.num_obs == assert_df.num_unique_obs
723+
), "Testing signals should have up to two timestamps per publish date-geo level " + \
724+
"combination. Each timestamp should be unique."
646725

647726
max_ts_by_group = df.groupby(
648727
["publish_date", "geo_id"], as_index=False
@@ -682,3 +761,38 @@ def std_err(df):
682761
p = df.val
683762
n = df.sample_size
684763
return np.sqrt(p * (1 - p) / n)
764+
765+
def apply_thres_change_date(apply_fn, df, *apply_fn_args):
766+
"""
767+
Apply a function separately to data before and after the test volume change date.
768+
769+
The test volume change date is when test volume and test positivity
770+
started being reported for different reference dates within the same
771+
report. This first occurred on 2021-03-17.
772+
773+
Parameters
774+
----------
775+
apply_fn: function
776+
function to apply to data before and after the test volume change date
777+
df: pd.DataFrame
778+
Columns: val, sample_size, ...
779+
apply_fn_args: tuple of lists
780+
variable number of additional arguments to pass to the `apply_fn`.
781+
Each additional argument should be a list of length 2. The first
782+
element of each list will be passed to the `apply_fn` when processing
783+
pre-change date data; the second element will be used for the
784+
post-change date data.
785+
786+
Returns
787+
-------
788+
map object
789+
Iterator with two entries, one for the "before" data and one for the "after" data.
790+
"""
791+
change_date = datetime.date(2021, 3, 17)
792+
list_of_dfs = [df[df.publish_date < change_date], df[df.publish_date >= change_date]]
793+
794+
for arg_field in apply_fn_args:
795+
assert len(arg_field) == 2, "Extra arguments must be iterables with " + \
796+
"length 2, the same as the number of dfs to process"
797+
798+
return map(apply_fn, list_of_dfs, *apply_fn_args)

dsew_community_profile/tests/test_pull.py

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ def test_unify_testing_sigs(self):
359359

360360
with pytest.raises(AssertionError):
361361
# Inputs have different numbers of rows.
362-
unify_testing_sigs(positivity_df, positivity_df.iloc[0])
362+
unify_testing_sigs(positivity_df, positivity_df.head(n=1))
363363

364364
def test_add_max_ts_col(self):
365365
input_df = pd.DataFrame({
@@ -381,25 +381,57 @@ def test_add_max_ts_col(self):
381381
add_max_ts_col(
382382
pd.DataFrame({
383383
'geo_id': ["ca", "ca", "fl", "fl"],
384-
'timestamp': [datetime(2021, 10, 27)]*4,
384+
'timestamp': [datetime(2021, 10, 27)] * 4,
385385
'val': [1, 2, 3, 4],
386386
'se': [None] * 4,
387387
'sample_size': [None] * 4,
388-
'publish_date': [datetime(2021, 10, 30)]*4,
388+
'publish_date': [datetime(2021, 10, 30)] * 4,
389389
})
390390
)
391391
with pytest.raises(AssertionError):
392-
# Input df has fewer than 2 timestamps per geo id-publish date combination.
392+
# Input df has more than 2 timestamps per geo id-publish date combination.
393+
add_max_ts_col(
394+
pd.DataFrame({
395+
'geo_id': ["ca", "ca", "ca", "fl", "fl", "fl"],
396+
'timestamp': [datetime(2021, 10, 27)] * 6,
397+
'val': [1, 2, 3, 4, 5, 6],
398+
'se': [None] * 6,
399+
'sample_size': [None] * 6,
400+
'publish_date': [datetime(2021, 10, 30)] * 6,
401+
})
402+
)
403+
404+
try:
405+
# Input df has fewer than 2 timestamps per geo id-publish date
406+
# combination. This should not raise an exception.
393407
add_max_ts_col(
394408
pd.DataFrame({
395409
'geo_id': ["ca", "fl"],
396-
'timestamp': [datetime(2021, 10, 27)]*2,
410+
'timestamp': [datetime(2021, 10, 27)] * 2,
397411
'val': [1, 2],
398412
'se': [None] * 2,
399413
'sample_size': [None] * 2,
400-
'publish_date': [datetime(2021, 10, 30)]*2,
414+
'publish_date': [datetime(2021, 10, 30)] * 2,
415+
})
416+
)
417+
except AssertionError as e:
418+
assert False, f"'add_max_ts_col' raised exception: {e}"
419+
420+
try:
421+
# Input df has 2 unique timestamps per geo id-publish date
422+
# combination. This should not raise an exception.
423+
add_max_ts_col(
424+
pd.DataFrame({
425+
'geo_id': ["ca", "ca", "fl", "fl"],
426+
'timestamp': [datetime(2021, 10, 27), datetime(2021, 10, 20)] * 2,
427+
'val': [1, 2, 3, 4],
428+
'se': [None] * 4,
429+
'sample_size': [None] * 4,
430+
'publish_date': [datetime(2021, 10, 30)] * 4,
401431
})
402432
)
433+
except AssertionError as e:
434+
assert False, f"'add_max_ts_col' raised exception: {e}"
403435

404436
def test_std_err(self):
405437
df = pd.DataFrame({

0 commit comments

Comments
 (0)