Skip to content

Commit feea58d

Browse files
authored
Add NSSP secondary source (#2074)
* base changes * lint * lint * add test * fix hhs bug + doc to readme + fix signal grouping * fix weird nan * logging + error details * test data sync * typo in pull.py * region to lower * add log * fix str bug * add backup data mechanism * adjust details.md * appease linter * add tests for secondary source backup * Update signal names to _2023RVR in constants.py * clarify _2023RVR signals in DETAILS.md
1 parent e7eccaa commit feea58d

File tree

7 files changed

+318
-28
lines changed

7 files changed

+318
-28
lines changed

nssp/DETAILS.md

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,29 @@
22

33
We import the NSSP Emergency Department Visit data, including percentage and smoothed percentage of ER visits attributable to a given pathogen, from the CDC website. The data is provided at the county level, state level and national level; we do a population-weighted mean to aggregate from county data up to the HRR and MSA levels.
44

5+
There are 2 sources we grab data from for nssp:
6+
- Primary source: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview
7+
- Secondary (2023RVR) source: https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview
8+
There are 8 signals output from the primary source and 4 output from secondary. There are no smoothed signals from secondary source.
9+
10+
Note that the data produced from secondary source are mostly the same as their primary source equivalent, with past analysis shows around 95% of datapoints having less than 0.1 value difference and the other 5% having a 0.1 to 1.2 value difference.
11+
512
## Geographical Levels
6-
* `state`: reported using two-letter postal code
7-
* `county`: reported using fips code
8-
* `national`: just `us` for now
13+
Primary source:
14+
* `state`: reported from source using two-letter postal code
15+
* `county`: reported from source using fips code
16+
* `national`: just `us` for now, reported from source
17+
* `hhs`, `hrr`, `msa`: not reported from source, so we computed them from county-level data using a weighted mean. Each county is assigned a weight equal to its population in the last census (2020).
18+
19+
Secondary (2023RVR) source:
20+
* `state`: reported from source
21+
* `hhs`: reported from source
22+
* `national`: reported from source
23+
924
## Metrics
1025
* `percent_visits_covid`, `percent_visits_rsv`, `percent_visits_influenza`: percentage of emergency department patient visits for specified pathogen.
1126
* `percent_visits_combined`: sum of the three percentages of visits for flu, rsv and covid.
1227
* `smoothed_percent_visits_covid`, `smoothed_percent_visits_rsv`, `smoothed_percent_visits_influenza`: 3 week moving average of the percentage of emergency department patient visits for specified pathogen.
13-
* `smoothed_percent_visits_combined`: 3 week moving average of the sum of the three percentages of visits for flu, rsv and covid.
28+
* `smoothed_percent_visits_combined`: 3 week moving average of the sum of the three percentages of visits for flu, rsv and covid.
29+
* `percent_visits_covid_2023RVR`, `percent_visits_rsv_2023RVR`, `percent_visits_influenza_2023RVR`: Taken from secondary source, percentage of emergency department patient visits for specified pathogen.
30+
* `percent_visits_combined_2023RVR`: Taken from secondary source, sum of the three percentages of visits for flu, rsv and covid.

nssp/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
# NSSP Emergency Department Visit data
22

33
We import the NSSP Emergency Department Visit data, currently only the smoothed concentration, from the CDC website, aggregate to the state and national level from the wastewater sample site level, and export the aggregated data.
4+
5+
There are 2 sources we grab data from for nssp:
6+
- Primary source: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview
7+
- Secondary source: https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview
8+
49
For details see the `DETAILS.md` file in this directory.
510

611
## Create a MyAppToken

nssp/delphi_nssp/constants.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,29 @@
4141
"fips": str,
4242
}
4343
)
44+
45+
SECONDARY_COLS_MAP = {
46+
"week_end": "timestamp",
47+
"geography": "geo_value",
48+
"percent_visits": "val",
49+
"pathogen": "signal",
50+
}
51+
52+
SECONDARY_SIGNALS_MAP = {
53+
"COVID-19": "pct_ed_visits_covid_2023RVR",
54+
"Influenza": "pct_ed_visits_influenza_2023RVR",
55+
"RSV": "pct_ed_visits_rsv_2023RVR",
56+
"Combined": "pct_ed_visits_combined_2023RVR",
57+
}
58+
59+
SECONDARY_SIGNALS = [val for (key, val) in SECONDARY_SIGNALS_MAP.items()]
60+
SECONDARY_GEOS = ["state", "nation", "hhs"]
61+
62+
SECONDARY_TYPE_DICT = {
63+
"timestamp": "datetime64[ns]",
64+
"geo_value": str,
65+
"val": float,
66+
"geo_type": str,
67+
"signal": str,
68+
}
69+
SECONDARY_KEEP_COLS = [key for (key, val) in SECONDARY_TYPE_DICT.items()]

nssp/delphi_nssp/pull.py

Lines changed: 86 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,16 @@
88
from delphi_utils import create_backup_csv
99
from sodapy import Socrata
1010

11-
from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT
11+
from .constants import (
12+
NEWLINE,
13+
SECONDARY_COLS_MAP,
14+
SECONDARY_KEEP_COLS,
15+
SECONDARY_SIGNALS_MAP,
16+
SECONDARY_TYPE_DICT,
17+
SIGNALS,
18+
SIGNALS_MAP,
19+
TYPE_DICT,
20+
)
1221

1322

1423
def warn_string(df, type_dict):
@@ -29,42 +38,50 @@ def warn_string(df, type_dict):
2938
return warn
3039

3140

32-
def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
33-
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.
34-
35-
The output dataset has:
36-
37-
- Each row corresponds to a single observation
38-
- Each row additionally has columns for the signals in SIGNALS
41+
def pull_with_socrata_api(socrata_token: str, dataset_id: str):
42+
"""Pull data from Socrata API.
3943
4044
Parameters
4145
----------
4246
socrata_token: str
4347
My App Token for pulling the NSSP data (could be the same as the nchs data)
44-
backup_dir: str
45-
Directory to which to save raw backup data
46-
custom_run: bool
47-
Flag indicating if the current run is a patch. If so, don't save any data to disk
48-
logger: Optional[logging.Logger]
49-
logger object
48+
dataset_id: str
49+
The dataset id to pull data from
5050
5151
Returns
5252
-------
53-
pd.DataFrame
54-
Dataframe as described above.
53+
list of dictionaries, each representing a row in the dataset
5554
"""
56-
# Pull data from Socrata API
5755
client = Socrata("data.cdc.gov", socrata_token)
5856
results = []
5957
offset = 0
6058
limit = 50000 # maximum limit allowed by SODA 2.0
6159
while True:
62-
page = client.get("rdmq-nq56", limit=limit, offset=offset)
60+
page = client.get(dataset_id, limit=limit, offset=offset)
6361
if not page:
6462
break # exit the loop if no more results
6563
results.extend(page)
6664
offset += limit
67-
df_ervisits = pd.DataFrame.from_records(results)
65+
return results
66+
67+
68+
def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
69+
"""Pull the latest NSSP ER visits primary dataset.
70+
71+
https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview
72+
73+
Parameters
74+
----------
75+
socrata_token: str
76+
My App Token for pulling the NSSP data (could be the same as the nchs data)
77+
78+
Returns
79+
-------
80+
pd.DataFrame
81+
Dataframe as described above.
82+
"""
83+
socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56")
84+
df_ervisits = pd.DataFrame.from_records(socrata_results)
6885
create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger)
6986
df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"})
7087
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP)
@@ -79,3 +96,53 @@ def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger
7996

8097
keep_columns = ["timestamp", "geography", "county", "fips"]
8198
return df_ervisits[SIGNALS + keep_columns]
99+
100+
101+
def secondary_pull_nssp_data(
102+
socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None
103+
):
104+
"""Pull the latest NSSP ER visits secondary dataset.
105+
106+
https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview
107+
108+
The output dataset has:
109+
110+
- Each row corresponds to a single observation
111+
112+
Parameters
113+
----------
114+
socrata_token: str
115+
My App Token for pulling the NSSP data (could be the same as the nchs data)
116+
117+
Returns
118+
-------
119+
pd.DataFrame
120+
Dataframe as described above.
121+
"""
122+
socrata_results = pull_with_socrata_api(socrata_token, "7mra-9cq9")
123+
df_ervisits = pd.DataFrame.from_records(socrata_results)
124+
create_backup_csv(df_ervisits, backup_dir, custom_run, sensor="secondary", logger=logger)
125+
df_ervisits = df_ervisits.rename(columns=SECONDARY_COLS_MAP)
126+
127+
# geo_type is not provided in the dataset, so we infer it from the geo_value
128+
# which is either state names, "National" or hhs region numbers
129+
df_ervisits["geo_type"] = "state"
130+
131+
df_ervisits.loc[df_ervisits["geo_value"] == "National", "geo_type"] = "nation"
132+
133+
hhs_region_mask = df_ervisits["geo_value"].str.lower().str.startswith("region ")
134+
df_ervisits.loc[hhs_region_mask, "geo_value"] = df_ervisits.loc[hhs_region_mask, "geo_value"].str.replace(
135+
"Region ", ""
136+
)
137+
df_ervisits.loc[hhs_region_mask, "geo_type"] = "hhs"
138+
139+
df_ervisits["signal"] = df_ervisits["signal"].map(SECONDARY_SIGNALS_MAP)
140+
141+
df_ervisits = df_ervisits[SECONDARY_KEEP_COLS]
142+
143+
try:
144+
df_ervisits = df_ervisits.astype(SECONDARY_TYPE_DICT)
145+
except KeyError as exc:
146+
raise ValueError(warn_string(df_ervisits, SECONDARY_TYPE_DICT)) from exc
147+
148+
return df_ervisits

nssp/delphi_nssp/run.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
from delphi_utils.geomap import GeoMapper
3232
from delphi_utils.nancodes import add_default_nancodes
3333

34-
from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SIGNALS
35-
from .pull import pull_nssp_data
34+
from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SECONDARY_GEOS, SECONDARY_SIGNALS, SIGNALS
35+
from .pull import pull_nssp_data, secondary_pull_nssp_data
3636

3737

3838
def add_needed_columns(df, col_names=None):
@@ -83,6 +83,8 @@ def run_module(params):
8383
socrata_token = params["indicator"]["socrata_token"]
8484

8585
run_stats = []
86+
87+
logger.info("Generating primary signals")
8688
## build the base version of the signal at the most detailed geo level you can get.
8789
## compute stuff here or farm out to another function or file
8890
df_pull = pull_nssp_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)
@@ -139,5 +141,52 @@ def run_module(params):
139141
if len(dates) > 0:
140142
run_stats.append((max(dates), len(dates)))
141143

144+
logger.info("Generating secondary signals")
145+
secondary_df_pull = secondary_pull_nssp_data(socrata_token, backup_dir, custom_run, logger)
146+
for signal in SECONDARY_SIGNALS:
147+
secondary_df_pull_signal = secondary_df_pull[secondary_df_pull["signal"] == signal]
148+
if secondary_df_pull_signal.empty:
149+
logger.warning("No data found for signal", signal=signal)
150+
continue
151+
for geo in SECONDARY_GEOS:
152+
df = secondary_df_pull_signal.copy()
153+
logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=signal)
154+
if geo == "state":
155+
df = df[(df["geo_type"] == "state")]
156+
df["geo_id"] = df["geo_value"].apply(
157+
lambda x: (
158+
us.states.lookup(x).abbr.lower()
159+
if us.states.lookup(x)
160+
else ("dc" if x == "District of Columbia" else x)
161+
)
162+
)
163+
unexpected_state_names = df[df["geo_id"] == df["geo_value"]]
164+
if unexpected_state_names.shape[0] > 0:
165+
logger.error(
166+
"Unexpected state names",
167+
unexpected_state_names=unexpected_state_names["geo_value"].unique(),
168+
)
169+
raise RuntimeError
170+
elif geo == "nation":
171+
df = df[(df["geo_type"] == "nation")]
172+
df["geo_id"] = "us"
173+
elif geo == "hhs":
174+
df = df[(df["geo_type"] == "hhs")]
175+
df["geo_id"] = df["geo_value"]
176+
# add se, sample_size, and na codes
177+
missing_cols = set(CSV_COLS) - set(df.columns)
178+
df = add_needed_columns(df, col_names=list(missing_cols))
179+
df_csv = df[CSV_COLS + ["timestamp"]]
180+
# actual export
181+
dates = create_export_csv(
182+
df_csv,
183+
geo_res=geo,
184+
export_dir=export_dir,
185+
sensor=signal,
186+
weekly_dates=True,
187+
)
188+
if len(dates) > 0:
189+
run_stats.append((max(dates), len(dates)))
190+
142191
## log this indicator run
143192
logging(start_time, run_stats, logger)
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
[
2+
{
3+
"week_end": "2022-10-01T00:00:00.000",
4+
"pathogen": "COVID-19", "geography": "National",
5+
"percent_visits": "1.8",
6+
"status": "Reporting",
7+
"trend_on_date": "Decreasing",
8+
"recent_trend": "Decreasing"
9+
},
10+
{
11+
"week_end": "2022-10-01T00:00:00.000",
12+
"pathogen": "Influenza",
13+
"geography": "National",
14+
"percent_visits": "0.5",
15+
"status": "Reporting",
16+
"trend_on_date": "Increasing",
17+
"recent_trend": "Increasing"
18+
},
19+
{
20+
"week_end": "2022-10-01T00:00:00.000",
21+
"pathogen": "RSV",
22+
"geography": "National",
23+
"percent_visits": "0.5",
24+
"status": "Reporting",
25+
"trend_on_date": "Increasing",
26+
"recent_trend": "Increasing"
27+
},
28+
{
29+
"week_end": "2022-10-01T00:00:00.000",
30+
"pathogen": "Combined",
31+
"geography": "National",
32+
"percent_visits": "2.8",
33+
"status": "Reporting",
34+
"trend_on_date": "Decreasing",
35+
"recent_trend": "Decreasing"
36+
},
37+
{
38+
"week_end": "2022-10-15T00:00:00.000",
39+
"pathogen": "COVID-19",
40+
"geography": "National",
41+
"percent_visits": "1.6",
42+
"status": "Reporting",
43+
"trend_on_date": "Decreasing",
44+
"recent_trend": "Decreasing"
45+
},
46+
{
47+
"week_end": "2022-10-15T00:00:00.000",
48+
"pathogen": "Influenza",
49+
"geography": "National",
50+
"percent_visits": "0.9",
51+
"status": "Reporting",
52+
"trend_on_date": "Increasing",
53+
"recent_trend": "Increasing"
54+
},
55+
{
56+
"week_end": "2022-10-15T00:00:00.000",
57+
"pathogen": "RSV",
58+
"geography": "National",
59+
"percent_visits": "0.7",
60+
"status": "Reporting",
61+
"trend_on_date": "Increasing",
62+
"recent_trend": "Increasing"
63+
},
64+
{
65+
"week_end": "2022-10-15T00:00:00.000",
66+
"pathogen": "Combined",
67+
"geography": "National",
68+
"percent_visits": "3.2",
69+
"status": "Reporting",
70+
"trend_on_date": "Increasing",
71+
"recent_trend": "Decreasing"
72+
}
73+
]

0 commit comments

Comments
 (0)