-
Notifications
You must be signed in to change notification settings - Fork 16
nssp patching code #2000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
nssp patching code #2000
Changes from 46 commits
Commits
Show all changes
56 commits
Select commit
Hold shift + click to select a range
c78ae21
nssp patching code
minhkhul 7694c0a
lint
minhkhul a3ed4c2
add test
minhkhul 1628d34
add test
minhkhul 2536b94
Add patching how-to to readme
minhkhul e4d45e5
adjust current_issue_dir name for weekly data instead of daily.
minhkhul db906fc
lint
minhkhul 8f0bb32
adjust test for more cases
minhkhul 7f151f5
add custom_run flag
minhkhul c093349
handle custom flag on but bad config
minhkhul a967416
make patch config check readable
minhkhul c020da6
make good_patch_config check comprehensive
minhkhul 9a6130b
rewrite good_patch_config for clarity
minhkhul b8a2177
add unit tests for good_patch_config check
minhkhul a7d9443
add test_pull unit test for patching case + cleanup format
minhkhul e29e07e
split test cases + move to pytest
minhkhul 0a4bfb6
add test for multi-week patching
minhkhul 6c0abad
rename tests for clarity + restructure test_patch tests to clarify pu…
minhkhul 7078dd0
make expected issue dates explicit in test_patch_confirm_dir_structur…
minhkhul d435bf0
add log to distinguish grabbing records from socrata vs locally store…
minhkhul 8734daa
Update nssp/README.md
minhkhul 5a6f8b6
Update nssp/README.md
minhkhul 4356494
Add auto-download source backup data + update docs + test
minhkhul ca427a4
adjust custom_run flag to leave room for non-patch custom runs
minhkhul f58b068
move pull logic from run.py into pull.py
minhkhul 5e93175
logger to static
minhkhul 2d8670d
adjust unit tests
minhkhul f0335f6
more unit test adjustment
minhkhul 7e06f94
move get_source_data to pull.py + make get_source_data run when sourc…
minhkhul e678ce6
auto-remove source_dir content after finish patch run
minhkhul bc1d7a7
lint happy
minhkhul 84cba84
Update pull.py
minhkhul 742737b
Update pull.py - remove stat debug
minhkhul e13d3db
add progress log for source file download
minhkhul 9cec6ff
lint
minhkhul 5450d8b
lint
minhkhul 0a8da1e
Merge remote-tracking branch 'origin/main' into nssp_patching
minhkhul ab4b542
rewrite get_source_data to get source data from prod backup_dir + add…
minhkhul 14b4e6f
various bug fixes
minhkhul d9e1f74
remove source data from local after patch run
minhkhul ca5294b
fix tests in test_pull
minhkhul 49c4f67
Add new + fix current patch tests + fix duplicated data
minhkhul fb3fbf2
use set comprehension in get_patch_dates
minhkhul 1f5a4dc
lint
minhkhul 32f550e
make user param optional when no source data download needed
minhkhul 2214f87
linter
minhkhul 9ce6cab
Merge branch 'main' into nssp_patching
minhkhul 633399b
lint
minhkhul 83e5a8c
adjust readme
minhkhul 552d036
add test for pull source data + remove misc secondary trace
minhkhul c4dd3b2
darker linting
nmdefries a4c4aec
add tests for remote source download vs local
minhkhul 7c77be5
Merge branch 'nssp_patching' of https://github.com/cmu-delphi/covidca…
minhkhul 5f383a6
change source host to params + add if logger before use in create_bac…
minhkhul d70d6e1
lint
minhkhul 2a67c27
fix test
minhkhul File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
""" | ||
This module is used for patching data in the delphi_nssp package. | ||
|
||
The code assume user can use key-based auth to access prod server | ||
where historical source data is stored. | ||
|
||
To use this module, configure params.json like so: | ||
|
||
{ | ||
"common": { | ||
"custom_run": true, | ||
... | ||
}, | ||
"validation": { | ||
... | ||
}, | ||
"patch": { | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"source_dir": "delphi/covidcast-indicators/nssp/source_data", | ||
"user": "username", | ||
"patch_dir": "delphi/covidcast-indicators/nssp/AprilPatch", | ||
"start_issue": "2024-04-20", | ||
"end_issue": "2024-04-21", | ||
} | ||
} | ||
|
||
In this params.json, we | ||
- Turn on the "custom_run" flag under "common" | ||
- Add "patch" section, which contains: | ||
+ "source_dir": the local directory where source data is downloaded to | ||
+ "user": the username to log in to the remote server where source data is backed up | ||
+ "patch_dir": the local directory where to write all patch issues output | ||
+ "start_date": str, YYYY-MM-DD format, first issue date | ||
+ "end_date": str, YYYY-MM-DD format, last issue date | ||
|
||
if "source_dir" doesn't exist locally or has no files in it, we download source data to source_dir | ||
else, we assume all needed source files are already in source_dir. | ||
|
||
This module will generate data for that range of issue dates, and store them in batch issue format in the patch_dir: | ||
[patch_dir]/issue_[issue-date]/nssp/actual_data_file.csv | ||
""" | ||
|
||
import sys | ||
from datetime import datetime, timedelta | ||
from os import listdir, makedirs, path | ||
from shutil import rmtree | ||
|
||
import pandas as pd | ||
from delphi_utils import get_structured_logger, read_params | ||
from epiweeks import Week | ||
|
||
from .pull import get_source_data | ||
from .run import run_module | ||
|
||
|
||
def good_patch_config(params, logger): | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Check if the params.json file is correctly configured for patching. | ||
|
||
params: Dict[str, Any] | ||
Nested dictionary of parameters, typically loaded from params.json file. | ||
logger: Logger object | ||
Logger object to log messages. | ||
""" | ||
valid_config = True | ||
custom_run = params["common"].get("custom_run", False) | ||
if not custom_run: | ||
logger.error("Calling patch.py without custom_run flag set true.") | ||
valid_config = False | ||
|
||
patch_config = params.get("patch", {}) | ||
if patch_config == {}: | ||
logger.error("Custom flag is on, but patch section is missing.") | ||
valid_config = False | ||
else: | ||
required_patch_keys = ["start_issue", "end_issue", "patch_dir", "source_dir"] | ||
|
||
source_dir = params["patch"]["source_dir"] | ||
if not path.isdir(source_dir) or not listdir(source_dir): | ||
required_patch_keys.append("user") | ||
|
||
missing_keys = [key for key in required_patch_keys if key not in patch_config] | ||
if missing_keys: | ||
logger.error("Patch section is missing required key(s)", missing_keys=missing_keys) | ||
valid_config = False | ||
else: | ||
try: # issue dates validity check | ||
start_issue = datetime.strptime(patch_config["start_issue"], "%Y-%m-%d") | ||
end_issue = datetime.strptime(patch_config["end_issue"], "%Y-%m-%d") | ||
if start_issue > end_issue: | ||
logger.error("Start issue date is after end issue date.") | ||
valid_config = False | ||
except ValueError: | ||
logger.error("Issue dates must be in YYYY-MM-DD format.") | ||
valid_config = False | ||
|
||
if valid_config: | ||
logger.info("Good patch configuration.") | ||
return True | ||
logger.info("Bad patch configuration.") | ||
return False | ||
|
||
|
||
def get_patch_dates(start_issue, end_issue, source_dir): | ||
""" | ||
Get the dates to run patch on given a range of issue dates. | ||
|
||
Due to weekly cadence of nssp data, dates to run patch on are not necessarily the same as issue dates. | ||
We use the latest date with source data per epiweek as reporting date for patching of that week's data. | ||
|
||
Note that primary source files are available for all dates where | ||
secondary source files are available but not vice versa. | ||
|
||
start_issue: datetime object | ||
end_issue: datetime object | ||
""" | ||
patch_dates = [] | ||
date_range = pd.date_range(start=start_issue, end=end_issue) | ||
dates_with_source_data = { | ||
date | ||
for date in date_range | ||
if path.isfile(f"""{source_dir}/{date.strftime("%Y%m%d")}.csv.gz""") | ||
or path.isfile(f"""{source_dir}/{date.strftime("%Y%m%d")}_secondary.csv.gz""") | ||
} | ||
epiweek_start_dates = {Week.fromdate(date).startdate() for date in date_range} | ||
for epiweek_start_date in epiweek_start_dates: | ||
epiweek = Week.fromdate(epiweek_start_date) | ||
dates_with_data_in_epiweek = [date for date in dates_with_source_data if date.date() in epiweek.iterdates()] | ||
if dates_with_data_in_epiweek == []: | ||
continue | ||
latest_date_with_data = max(dates_with_data_in_epiweek) | ||
patch_dates.append(latest_date_with_data) | ||
patch_dates.sort() | ||
return patch_dates | ||
|
||
|
||
def patch(): | ||
"""Run nssp indicator for a range of issue dates.""" | ||
params = read_params() | ||
logger = get_structured_logger("delphi_nssp.patch", filename=params["common"]["log_filename"]) | ||
if not good_patch_config(params, logger): | ||
sys.exit(1) | ||
|
||
source_dir = params["patch"]["source_dir"] | ||
download_source = False | ||
if not path.isdir(source_dir) or not listdir(source_dir): # no source dir or empty source dir | ||
download_source = True | ||
get_source_data(params, logger) | ||
else: | ||
logger.info("Source data already exists locally.") | ||
|
||
start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") | ||
end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") | ||
|
||
logger.info(start_issue=start_issue.strftime("%Y-%m-%d")) | ||
logger.info(end_issue=end_issue.strftime("%Y-%m-%d")) | ||
logger.info(source_dir=source_dir) | ||
logger.info(patch_dir=params["patch"]["patch_dir"]) | ||
makedirs(params["patch"]["patch_dir"], exist_ok=True) | ||
|
||
patch_dates = get_patch_dates(start_issue, end_issue, source_dir) | ||
|
||
for current_issue in patch_dates: | ||
logger.info("patching issue", issue_date=current_issue.strftime("%Y%m%d")) | ||
|
||
current_issue_source_csv = f"""{source_dir}/{current_issue.strftime("%Y%m%d")}.csv.gz""" | ||
if not path.isfile(current_issue_source_csv): | ||
logger.info("No source data at this path", current_issue_source_csv=current_issue_source_csv) | ||
current_issue += timedelta(days=1) | ||
continue | ||
|
||
params["patch"]["current_issue"] = current_issue.strftime("%Y%m%d") | ||
|
||
# current_issue_date can be different from params["patch"]["current_issue"] | ||
# due to weekly cadence of nssp data. For weekly sources, issue dates in our | ||
# db matches with first date of epiweek that the reporting date falls in, | ||
# rather than reporting date itself. | ||
current_issue_date = Week.fromdate(current_issue).startdate() | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_date.strftime("%Y%m%d")}/nssp""" | ||
makedirs(f"{current_issue_dir}", exist_ok=True) | ||
params["common"]["export_dir"] = f"""{current_issue_dir}""" | ||
|
||
run_module(params, logger) | ||
|
||
if download_source: | ||
rmtree(source_dir) | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
if __name__ == "__main__": | ||
patch() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,15 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Functions for pulling NSSP ER data.""" | ||
|
||
import functools | ||
import logging | ||
import sys | ||
import textwrap | ||
from os import makedirs, path | ||
from typing import Optional | ||
|
||
import pandas as pd | ||
import paramiko | ||
from delphi_utils import create_backup_csv | ||
from sodapy import Socrata | ||
|
||
|
@@ -20,6 +25,71 @@ | |
) | ||
|
||
|
||
def print_callback(remote_file_name, logger, bytes_so_far, bytes_total, progress_chunks): | ||
"""Print the callback information.""" | ||
rough_percent_transferred = int(100 * (bytes_so_far / bytes_total)) | ||
if rough_percent_transferred in progress_chunks: | ||
logger.info("Transfer in progress", remote_file_name=remote_file_name, percent=rough_percent_transferred) | ||
# Remove progress chunk, so it is not logged again | ||
progress_chunks.remove(rough_percent_transferred) | ||
|
||
|
||
def get_source_data(params, logger): | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Download historical source data from a backup server. | ||
|
||
This function is typically used in patching only. Normal runs grab latest data from SODA API. | ||
|
||
This function uses "user" configuration under "patch" section in params.json to specify | ||
a username with local key-based access to connect to server where backup nssp source data is stored. | ||
It uses "backup_dir" config under "common" section to locate backup files on remote server. | ||
It then searches for CSV files that match the inclusive range of issue dates | ||
specified by 'start_issue', and 'end_issue' config. | ||
|
||
These CSV files are then downloaded and stored in the local 'source_dir' directory. | ||
""" | ||
makedirs(params["patch"]["source_dir"], exist_ok=True) | ||
ssh = paramiko.SSHClient() | ||
aysim319 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | ||
host = "delphi-master-prod-01.delphi.cmu.edu" | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
user = params["patch"]["user"] | ||
ssh.connect(host, username=user) | ||
|
||
# Generate file names of source files to download | ||
dates = pd.date_range(start=params["patch"]["start_issue"], end=params["patch"]["end_issue"]) | ||
primary_source_files = [f"{date.strftime('%Y%m%d')}.csv.gz" for date in dates] | ||
secondary_source_files = [f"{date.strftime('%Y%m%d')}_secondary.csv.gz" for date in dates] | ||
remote_source_files = primary_source_files + secondary_source_files | ||
|
||
# Download source files | ||
sftp = ssh.open_sftp() | ||
try: | ||
sftp.stat(params["common"]["backup_dir"]) | ||
except IOError: | ||
logger.error("Source backup directory does not exist on the remote server.") | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
sftp.chdir(params["common"]["backup_dir"]) | ||
|
||
num_files_transferred = 0 | ||
for remote_file_name in remote_source_files: | ||
callback_for_filename = functools.partial(print_callback, remote_file_name, logger, progress_chunks=[0, 50]) | ||
local_file_path = path.join(params["patch"]["source_dir"], remote_file_name) | ||
try: | ||
sftp.stat(remote_file_name) | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except IOError: | ||
logger.warning( | ||
"Source backup for this date does not exist on the remote server.", missing_filename=remote_file_name | ||
) | ||
continue | ||
sftp.get(remote_file_name, local_file_path, callback=callback_for_filename) | ||
logger.info("Transfer finished", remote_file_name=remote_file_name, local_file_path=local_file_path) | ||
num_files_transferred += 1 | ||
ssh.close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion (optional): You could wrap this section in a |
||
|
||
if num_files_transferred == 0: | ||
logger.error("No source data was transferred. Check the source backup server for potential issues.") | ||
sys.exit(1) | ||
|
||
def warn_string(df, type_dict): | ||
"""Format the warning string.""" | ||
warn = textwrap.dedent( | ||
|
@@ -48,6 +118,7 @@ def pull_with_socrata_api(socrata_token: str, dataset_id: str): | |
dataset_id: str | ||
The dataset id to pull data from | ||
|
||
|
||
Returns | ||
------- | ||
list of dictionaries, each representing a row in the dataset | ||
|
@@ -65,8 +136,14 @@ def pull_with_socrata_api(socrata_token: str, dataset_id: str): | |
return results | ||
|
||
|
||
def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None): | ||
"""Pull the latest NSSP ER visits primary dataset. | ||
def pull_nssp_data( | ||
socrata_token: str, | ||
backup_dir: str, | ||
custom_run: bool, | ||
issue_date: Optional[str] = None, | ||
logger: Optional[logging.Logger] = None, | ||
): | ||
"""Pull the NSSP ER visits primary dataset. | ||
|
||
https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview | ||
|
||
|
@@ -80,9 +157,25 @@ def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger | |
pd.DataFrame | ||
Dataframe as described above. | ||
""" | ||
socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56") | ||
df_ervisits = pd.DataFrame.from_records(socrata_results) | ||
create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger) | ||
if not custom_run: | ||
socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56") | ||
df_ervisits = pd.DataFrame.from_records(socrata_results) | ||
create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger) | ||
logger.info("Number of records grabbed", num_records=len(df_ervisits), source="Socrata API") | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
elif custom_run and logger.name == "delphi_nssp.patch": | ||
minhkhul marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if issue_date is None: | ||
raise ValueError("Issue date is required for patching") | ||
source_filename = f"{backup_dir}/{issue_date}.csv.gz" | ||
if not path.isfile(source_filename): | ||
logger.warning("No primary source data found", source=source_filename, issue_date=issue_date) | ||
return None | ||
df_ervisits = pd.read_csv(source_filename) | ||
logger.info( | ||
"Number of records grabbed", | ||
num_records=len(df_ervisits), | ||
source=source_filename, | ||
) | ||
|
||
df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"}) | ||
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP) | ||
|
||
|
@@ -99,9 +192,13 @@ def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger | |
|
||
|
||
def secondary_pull_nssp_data( | ||
socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None | ||
socrata_token: str, | ||
backup_dir: str, | ||
custom_run: bool, | ||
issue_date: Optional[str] = None, | ||
logger: Optional[logging.Logger] = None, | ||
): | ||
"""Pull the latest NSSP ER visits secondary dataset. | ||
"""Pull the NSSP ER visits secondary dataset. | ||
|
||
https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview | ||
|
||
|
@@ -119,9 +216,26 @@ def secondary_pull_nssp_data( | |
pd.DataFrame | ||
Dataframe as described above. | ||
""" | ||
socrata_results = pull_with_socrata_api(socrata_token, "7mra-9cq9") | ||
df_ervisits = pd.DataFrame.from_records(socrata_results) | ||
create_backup_csv(df_ervisits, backup_dir, custom_run, sensor="secondary", logger=logger) | ||
if not custom_run: | ||
socrata_results = pull_with_socrata_api(socrata_token, "7mra-9cq9") | ||
df_ervisits = pd.DataFrame.from_records(socrata_results) | ||
create_backup_csv(df_ervisits, backup_dir, custom_run, sensor="secondary", logger=logger) | ||
logger.info("Number of records grabbed", num_records=len(df_ervisits), source="secondary Socrata API") | ||
|
||
elif custom_run and logger.name == "delphi_nssp.patch": | ||
if issue_date is None: | ||
raise ValueError("Issue date is required for patching") | ||
source_filename = f"{backup_dir}/{issue_date}_secondary.csv.gz" | ||
if not path.isfile(source_filename): | ||
logger.warning("No secondary source data found", source=source_filename, issue_date=issue_date) | ||
return None | ||
df_ervisits = pd.read_csv(source_filename) | ||
logger.info( | ||
"Number of records grabbed", | ||
num_records=len(df_ervisits), | ||
source=source_filename, | ||
) | ||
|
||
df_ervisits = df_ervisits.rename(columns=SECONDARY_COLS_MAP) | ||
|
||
# geo_type is not provided in the dataset, so we infer it from the geo_value | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.