diff --git a/nssp/DETAILS.md b/nssp/DETAILS.md index 539697baa..692d85559 100644 --- a/nssp/DETAILS.md +++ b/nssp/DETAILS.md @@ -2,12 +2,29 @@ We import the NSSP Emergency Department Visit data, including percentage and smoothed percentage of ER visits attributable to a given pathogen, from the CDC website. The data is provided at the county level, state level and national level; we do a population-weighted mean to aggregate from county data up to the HRR and MSA levels. +There are 2 sources we grab data from for nssp: +- Primary source: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview +- Secondary (2023RVR) source: https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview +There are 8 signals output from the primary source and 4 output from secondary. There are no smoothed signals from secondary source. + +Note that the data produced from secondary source are mostly the same as their primary source equivalent, with past analysis shows around 95% of datapoints having less than 0.1 value difference and the other 5% having a 0.1 to 1.2 value difference. + ## Geographical Levels -* `state`: reported using two-letter postal code -* `county`: reported using fips code -* `national`: just `us` for now +Primary source: +* `state`: reported from source using two-letter postal code +* `county`: reported from source using fips code +* `national`: just `us` for now, reported from source +* `hhs`, `hrr`, `msa`: not reported from source, so we computed them from county-level data using a weighted mean. Each county is assigned a weight equal to its population in the last census (2020). + +Secondary (2023RVR) source: +* `state`: reported from source +* `hhs`: reported from source +* `national`: reported from source + ## Metrics * `percent_visits_covid`, `percent_visits_rsv`, `percent_visits_influenza`: percentage of emergency department patient visits for specified pathogen. * `percent_visits_combined`: sum of the three percentages of visits for flu, rsv and covid. * `smoothed_percent_visits_covid`, `smoothed_percent_visits_rsv`, `smoothed_percent_visits_influenza`: 3 week moving average of the percentage of emergency department patient visits for specified pathogen. -* `smoothed_percent_visits_combined`: 3 week moving average of the sum of the three percentages of visits for flu, rsv and covid. \ No newline at end of file +* `smoothed_percent_visits_combined`: 3 week moving average of the sum of the three percentages of visits for flu, rsv and covid. +* `percent_visits_covid_2023RVR`, `percent_visits_rsv_2023RVR`, `percent_visits_influenza_2023RVR`: Taken from secondary source, percentage of emergency department patient visits for specified pathogen. +* `percent_visits_combined_2023RVR`: Taken from secondary source, sum of the three percentages of visits for flu, rsv and covid. diff --git a/nssp/README.md b/nssp/README.md index 4bba6f626..c3f57b94b 100644 --- a/nssp/README.md +++ b/nssp/README.md @@ -1,6 +1,11 @@ # NSSP Emergency Department Visit data We import the NSSP Emergency Department Visit data, currently only the smoothed concentration, from the CDC website, aggregate to the state and national level from the wastewater sample site level, and export the aggregated data. + +There are 2 sources we grab data from for nssp: +- Primary source: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview +- Secondary source: https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview + For details see the `DETAILS.md` file in this directory. ## Create a MyAppToken diff --git a/nssp/delphi_nssp/constants.py b/nssp/delphi_nssp/constants.py index 9b98d2012..76d51b927 100644 --- a/nssp/delphi_nssp/constants.py +++ b/nssp/delphi_nssp/constants.py @@ -41,3 +41,29 @@ "fips": str, } ) + +SECONDARY_COLS_MAP = { + "week_end": "timestamp", + "geography": "geo_value", + "percent_visits": "val", + "pathogen": "signal", +} + +SECONDARY_SIGNALS_MAP = { + "COVID-19": "pct_ed_visits_covid_2023RVR", + "Influenza": "pct_ed_visits_influenza_2023RVR", + "RSV": "pct_ed_visits_rsv_2023RVR", + "Combined": "pct_ed_visits_combined_2023RVR", +} + +SECONDARY_SIGNALS = [val for (key, val) in SECONDARY_SIGNALS_MAP.items()] +SECONDARY_GEOS = ["state", "nation", "hhs"] + +SECONDARY_TYPE_DICT = { + "timestamp": "datetime64[ns]", + "geo_value": str, + "val": float, + "geo_type": str, + "signal": str, +} +SECONDARY_KEEP_COLS = [key for (key, val) in SECONDARY_TYPE_DICT.items()] diff --git a/nssp/delphi_nssp/pull.py b/nssp/delphi_nssp/pull.py index de6934bc8..94058dea8 100644 --- a/nssp/delphi_nssp/pull.py +++ b/nssp/delphi_nssp/pull.py @@ -8,7 +8,16 @@ from delphi_utils import create_backup_csv from sodapy import Socrata -from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT +from .constants import ( + NEWLINE, + SECONDARY_COLS_MAP, + SECONDARY_KEEP_COLS, + SECONDARY_SIGNALS_MAP, + SECONDARY_TYPE_DICT, + SIGNALS, + SIGNALS_MAP, + TYPE_DICT, +) def warn_string(df, type_dict): @@ -29,42 +38,50 @@ def warn_string(df, type_dict): return warn -def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None): - """Pull the latest NSSP ER visits data, and conforms it into a dataset. - - The output dataset has: - - - Each row corresponds to a single observation - - Each row additionally has columns for the signals in SIGNALS +def pull_with_socrata_api(socrata_token: str, dataset_id: str): + """Pull data from Socrata API. Parameters ---------- socrata_token: str My App Token for pulling the NSSP data (could be the same as the nchs data) - backup_dir: str - Directory to which to save raw backup data - custom_run: bool - Flag indicating if the current run is a patch. If so, don't save any data to disk - logger: Optional[logging.Logger] - logger object + dataset_id: str + The dataset id to pull data from Returns ------- - pd.DataFrame - Dataframe as described above. + list of dictionaries, each representing a row in the dataset """ - # Pull data from Socrata API client = Socrata("data.cdc.gov", socrata_token) results = [] offset = 0 limit = 50000 # maximum limit allowed by SODA 2.0 while True: - page = client.get("rdmq-nq56", limit=limit, offset=offset) + page = client.get(dataset_id, limit=limit, offset=offset) if not page: break # exit the loop if no more results results.extend(page) offset += limit - df_ervisits = pd.DataFrame.from_records(results) + return results + + +def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None): + """Pull the latest NSSP ER visits primary dataset. + + https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview + + Parameters + ---------- + socrata_token: str + My App Token for pulling the NSSP data (could be the same as the nchs data) + + Returns + ------- + pd.DataFrame + Dataframe as described above. + """ + socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56") + df_ervisits = pd.DataFrame.from_records(socrata_results) create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger) df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"}) df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP) @@ -79,3 +96,53 @@ def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger keep_columns = ["timestamp", "geography", "county", "fips"] return df_ervisits[SIGNALS + keep_columns] + + +def secondary_pull_nssp_data( + socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None +): + """Pull the latest NSSP ER visits secondary dataset. + + https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview + + The output dataset has: + + - Each row corresponds to a single observation + + Parameters + ---------- + socrata_token: str + My App Token for pulling the NSSP data (could be the same as the nchs data) + + Returns + ------- + pd.DataFrame + Dataframe as described above. + """ + socrata_results = pull_with_socrata_api(socrata_token, "7mra-9cq9") + df_ervisits = pd.DataFrame.from_records(socrata_results) + create_backup_csv(df_ervisits, backup_dir, custom_run, sensor="secondary", logger=logger) + df_ervisits = df_ervisits.rename(columns=SECONDARY_COLS_MAP) + + # geo_type is not provided in the dataset, so we infer it from the geo_value + # which is either state names, "National" or hhs region numbers + df_ervisits["geo_type"] = "state" + + df_ervisits.loc[df_ervisits["geo_value"] == "National", "geo_type"] = "nation" + + hhs_region_mask = df_ervisits["geo_value"].str.lower().str.startswith("region ") + df_ervisits.loc[hhs_region_mask, "geo_value"] = df_ervisits.loc[hhs_region_mask, "geo_value"].str.replace( + "Region ", "" + ) + df_ervisits.loc[hhs_region_mask, "geo_type"] = "hhs" + + df_ervisits["signal"] = df_ervisits["signal"].map(SECONDARY_SIGNALS_MAP) + + df_ervisits = df_ervisits[SECONDARY_KEEP_COLS] + + try: + df_ervisits = df_ervisits.astype(SECONDARY_TYPE_DICT) + except KeyError as exc: + raise ValueError(warn_string(df_ervisits, SECONDARY_TYPE_DICT)) from exc + + return df_ervisits diff --git a/nssp/delphi_nssp/run.py b/nssp/delphi_nssp/run.py index b512e8aba..417c49ab2 100644 --- a/nssp/delphi_nssp/run.py +++ b/nssp/delphi_nssp/run.py @@ -31,8 +31,8 @@ from delphi_utils.geomap import GeoMapper from delphi_utils.nancodes import add_default_nancodes -from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SIGNALS -from .pull import pull_nssp_data +from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SECONDARY_GEOS, SECONDARY_SIGNALS, SIGNALS +from .pull import pull_nssp_data, secondary_pull_nssp_data def add_needed_columns(df, col_names=None): @@ -83,6 +83,8 @@ def run_module(params): socrata_token = params["indicator"]["socrata_token"] run_stats = [] + + logger.info("Generating primary signals") ## build the base version of the signal at the most detailed geo level you can get. ## compute stuff here or farm out to another function or file df_pull = pull_nssp_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger) @@ -139,5 +141,52 @@ def run_module(params): if len(dates) > 0: run_stats.append((max(dates), len(dates))) + logger.info("Generating secondary signals") + secondary_df_pull = secondary_pull_nssp_data(socrata_token, backup_dir, custom_run, logger) + for signal in SECONDARY_SIGNALS: + secondary_df_pull_signal = secondary_df_pull[secondary_df_pull["signal"] == signal] + if secondary_df_pull_signal.empty: + logger.warning("No data found for signal", signal=signal) + continue + for geo in SECONDARY_GEOS: + df = secondary_df_pull_signal.copy() + logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=signal) + if geo == "state": + df = df[(df["geo_type"] == "state")] + df["geo_id"] = df["geo_value"].apply( + lambda x: ( + us.states.lookup(x).abbr.lower() + if us.states.lookup(x) + else ("dc" if x == "District of Columbia" else x) + ) + ) + unexpected_state_names = df[df["geo_id"] == df["geo_value"]] + if unexpected_state_names.shape[0] > 0: + logger.error( + "Unexpected state names", + unexpected_state_names=unexpected_state_names["geo_value"].unique(), + ) + raise RuntimeError + elif geo == "nation": + df = df[(df["geo_type"] == "nation")] + df["geo_id"] = "us" + elif geo == "hhs": + df = df[(df["geo_type"] == "hhs")] + df["geo_id"] = df["geo_value"] + # add se, sample_size, and na codes + missing_cols = set(CSV_COLS) - set(df.columns) + df = add_needed_columns(df, col_names=list(missing_cols)) + df_csv = df[CSV_COLS + ["timestamp"]] + # actual export + dates = create_export_csv( + df_csv, + geo_res=geo, + export_dir=export_dir, + sensor=signal, + weekly_dates=True, + ) + if len(dates) > 0: + run_stats.append((max(dates), len(dates))) + ## log this indicator run logging(start_time, run_stats, logger) diff --git a/nssp/tests/test_data/secondary_page.txt b/nssp/tests/test_data/secondary_page.txt new file mode 100644 index 000000000..4b4aaaca7 --- /dev/null +++ b/nssp/tests/test_data/secondary_page.txt @@ -0,0 +1,73 @@ +[ + { + "week_end": "2022-10-01T00:00:00.000", + "pathogen": "COVID-19", "geography": "National", + "percent_visits": "1.8", + "status": "Reporting", + "trend_on_date": "Decreasing", + "recent_trend": "Decreasing" + }, + { + "week_end": "2022-10-01T00:00:00.000", + "pathogen": "Influenza", + "geography": "National", + "percent_visits": "0.5", + "status": "Reporting", + "trend_on_date": "Increasing", + "recent_trend": "Increasing" + }, + { + "week_end": "2022-10-01T00:00:00.000", + "pathogen": "RSV", + "geography": "National", + "percent_visits": "0.5", + "status": "Reporting", + "trend_on_date": "Increasing", + "recent_trend": "Increasing" + }, + { + "week_end": "2022-10-01T00:00:00.000", + "pathogen": "Combined", + "geography": "National", + "percent_visits": "2.8", + "status": "Reporting", + "trend_on_date": "Decreasing", + "recent_trend": "Decreasing" + }, + { + "week_end": "2022-10-15T00:00:00.000", + "pathogen": "COVID-19", + "geography": "National", + "percent_visits": "1.6", + "status": "Reporting", + "trend_on_date": "Decreasing", + "recent_trend": "Decreasing" + }, + { + "week_end": "2022-10-15T00:00:00.000", + "pathogen": "Influenza", + "geography": "National", + "percent_visits": "0.9", + "status": "Reporting", + "trend_on_date": "Increasing", + "recent_trend": "Increasing" + }, + { + "week_end": "2022-10-15T00:00:00.000", + "pathogen": "RSV", + "geography": "National", + "percent_visits": "0.7", + "status": "Reporting", + "trend_on_date": "Increasing", + "recent_trend": "Increasing" + }, + { + "week_end": "2022-10-15T00:00:00.000", + "pathogen": "Combined", + "geography": "National", + "percent_visits": "3.2", + "status": "Reporting", + "trend_on_date": "Increasing", + "recent_trend": "Decreasing" + } +] \ No newline at end of file diff --git a/nssp/tests/test_pull.py b/nssp/tests/test_pull.py index d0ebbc550..30debd6cd 100644 --- a/nssp/tests/test_pull.py +++ b/nssp/tests/test_pull.py @@ -7,8 +7,20 @@ from delphi_nssp.pull import ( pull_nssp_data, + secondary_pull_nssp_data, + pull_with_socrata_api, +) + +from delphi_nssp.constants import ( + NEWLINE, + SECONDARY_COLS_MAP, + SECONDARY_KEEP_COLS, + SECONDARY_SIGNALS_MAP, + SECONDARY_TYPE_DICT, + SIGNALS, + SIGNALS_MAP, + TYPE_DICT, ) -from delphi_nssp.constants import SIGNALS from delphi_utils import get_structured_logger @@ -66,6 +78,47 @@ def test_pull_nssp_data(self, mock_socrata, caplog): for signal in SIGNALS: assert result[signal].notnull().all(), f"{signal} has rogue NaN" - # clean up for file in backup_files: - os.remove(file) \ No newline at end of file + os.remove(file) + + @patch("delphi_nssp.pull.Socrata") + def test_secondary_pull_nssp_data(self, mock_socrata): + today = pd.Timestamp.today().strftime("%Y%m%d") + backup_dir = 'test_raw_data_backups' + + # Load test data + with open("test_data/secondary_page.txt", "r") as f: + test_data = json.load(f) + + # Mock Socrata client and its get method + mock_client = MagicMock() + mock_client.get.side_effect = [test_data, []] # Return test data on first call, empty list on second call + mock_socrata.return_value = mock_client + + custom_run = False + logger = get_structured_logger() + # Call function with test token + test_token = "test_token" + result = secondary_pull_nssp_data(test_token, backup_dir, custom_run, logger) + # print(result) + + # Check that Socrata client was initialized with correct arguments + mock_socrata.assert_called_once_with("data.cdc.gov", test_token) + + # Check that get method was called with correct arguments + mock_client.get.assert_any_call("7mra-9cq9", limit=50000, offset=0) + + for col in SECONDARY_KEEP_COLS: + assert result[col].notnull().all(), f"{col} has rogue NaN" + + assert result[result['geo_value'].str.startswith('Region') ].empty, "'Region ' need to be removed from geo_value for geo_type 'hhs'" + assert (result[result['geo_type'] == 'nation']['geo_value'] == 'National').all(), "All rows with geo_type 'nation' must have geo_value 'National'" + + # Check that backup file was created + backup_files = glob.glob(f"{backup_dir}/{today}*") + assert len(backup_files) == 2, "Backup file was not created" + for file in backup_files: + os.remove(file) + +if __name__ == "__main__": + unittest.main()