Skip to content

Commit 7ac28d4

Browse files
authored
Merge pull request #2086 from cmu-delphi/release/indicators_v0.3.57_utils_v0.3.26
Release covidcast-indicators 0.3.57
2 parents ada4165 + 98d12ae commit 7ac28d4

File tree

93 files changed

+14207
-497
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+14207
-497
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.3.56
2+
current_version = 0.3.57
33
commit = True
44
message = chore: bump covidcast-indicators to {new_version}
55
tag = False

.github/workflows/create-release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ jobs:
6161
bump2version --list ${{ github.event.inputs.versionName }} | grep ^new_version | sed -r s,"^.*=",,
6262
- name: Copy version to indicator directory
6363
run: |
64-
indicator_list=("changehc" "claims_hosp" "doctor_visits" "google_symptoms" "hhs_hosp" "nchs_mortality" "quidel_covidtest" "sir_complainsalot")
64+
indicator_list=("changehc" "claims_hosp" "doctor_visits" "google_symptoms" "hhs_hosp" "nchs_mortality" "nssp" "quidel_covidtest" "sir_complainsalot")
6565
for path in ${indicator_list[@]}; do
6666
echo "current_version = ${{ steps.indicators.outputs.version }}" > $path/version.cfg
6767
done

.github/workflows/python-ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ jobs:
3939
dir: "delphi_quidel_covidtest"
4040
- package: "sir_complainsalot"
4141
dir: "delphi_sir_complainsalot"
42+
- package: "nhsn"
43+
dir: "delphi_nhsn"
4244
defaults:
4345
run:
4446
working-directory: ${{ matrix.package }}

Jenkinsfile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,18 @@
1010
- TODO: #527 Get this list automatically from python-ci.yml at runtime.
1111
*/
1212

13-
def indicator_list = ['backfill_corrections', 'changehc', 'claims_hosp', 'google_symptoms', 'hhs_hosp', 'nchs_mortality', 'quidel_covidtest', 'sir_complainsalot', 'doctor_visits', 'nwss_wastewater', 'nssp']
13+
def indicator_list = ['backfill_corrections', 'changehc', 'claims_hosp', 'google_symptoms', 'hhs_hosp', 'nchs_mortality', 'quidel_covidtest', 'sir_complainsalot', 'doctor_visits', 'nwss_wastewater', 'nssp', 'nhsn']
1414
def build_package_main = [:]
1515
def build_package_prod = [:]
1616
def deploy_staging = [:]
1717
def deploy_production = [:]
1818

1919
pipeline {
2020
agent any
21+
environment {
22+
// Set the PATH variable to include the pyenv shims directory.
23+
PATH = "/var/lib/jenkins/.pyenv/shims:${env.PATH}"
24+
}
2125
stages {
2226
stage('Build dev/feature branch') {
2327
when {

_delphi_utils_python/.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.3.25
2+
current_version = 0.3.26
33
commit = True
44
message = chore: bump delphi_utils to {new_version}
55
tag = False

_delphi_utils_python/README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,22 @@ Source code can be found here:
2222

2323
## Logger Usage
2424

25+
To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic, the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier), and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument to the logger call (for use in filtering, thresholding, grouping, visualization, etc).
26+
27+
### Commonly used argument names:
28+
- data_source
29+
- geo_type
30+
- signal
31+
- issue_date
32+
- filename
33+
2534
Single-thread usage.
2635

2736
```py
2837
from delphi_utils.logger import get_structured_logger
2938

3039
logger = get_structured_logger('my_logger')
31-
logger.info('Hello, world!')
40+
logger.info('Hello', name='World')
3241
```
3342

3443
Multi-thread usage.

_delphi_utils_python/delphi_utils/__init__.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@
44
from __future__ import absolute_import
55

66
from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer
7-
from .export import create_export_csv
8-
from .utils import read_params
9-
10-
from .slack_notifier import SlackNotifier
11-
from .logger import get_structured_logger
7+
from .export import create_backup_csv, create_export_csv
128
from .geomap import GeoMapper
13-
from .smooth import Smoother
14-
from .signal import add_prefix
9+
from .logger import get_structured_logger
1510
from .nancodes import Nans
11+
from .signal import add_prefix
12+
from .slack_notifier import SlackNotifier
13+
from .smooth import Smoother
14+
from .utils import read_params
1615
from .weekday import Weekday
1716

18-
__version__ = "0.3.25"
17+
__version__ = "0.3.26"

_delphi_utils_python/delphi_utils/export.py

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
"""Export data in the format expected by the Delphi API."""
22
# -*- coding: utf-8 -*-
3+
import logging
34
from datetime import datetime
4-
from os.path import join
5+
from os.path import getsize, join
56
from typing import Optional
6-
import logging
77

8-
from epiweeks import Week
98
import numpy as np
109
import pandas as pd
10+
from epiweeks import Week
1111

1212
from .nancodes import Nans
1313

14+
1415
def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
1516
"""Find values with contradictory missingness codes, filter them, and log."""
1617
columns = ["val", "se", "sample_size"]
@@ -22,8 +23,10 @@ def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
2223
for mask in masks:
2324
if not logger is None and df.loc[mask].size > 0:
2425
logger.info(
25-
"Filtering contradictory missing code in " +
26-
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
26+
"Filtering contradictory missing code",
27+
sensor=sensor,
28+
metric=metric,
29+
date=date.strftime(format="%Y-%m-%d"),
2730
)
2831
df = df.loc[~mask]
2932
elif logger is None and df.loc[mask].size > 0:
@@ -130,3 +133,76 @@ def create_export_csv(
130133
export_df = export_df.sort_values(by="geo_id")
131134
export_df.to_csv(export_file, index=False, na_rep="NA")
132135
return dates
136+
137+
138+
def create_backup_csv(
139+
df: pd.DataFrame,
140+
backup_dir: str,
141+
custom_run: bool,
142+
issue: Optional[str] = None,
143+
geo_res: Optional[str] = None,
144+
sensor: Optional[str] = None,
145+
metric: Optional[str] = None,
146+
logger: Optional[logging.Logger] = None,
147+
):
148+
"""Save data for use as a backup.
149+
150+
This function is meant to save raw data fetched from data sources.
151+
Therefore, it avoids manipulating the data as much as possible to
152+
preserve the input.
153+
154+
When only required arguments are passed, data will be saved to a file of
155+
the format `<export_dir>/<today's date as YYYYMMDD>.csv`. Optional arguments
156+
should be passed if the source data is fetched from different tables or
157+
in batches by signal, geo, etc.
158+
159+
Parameters
160+
----------
161+
df: pd.DataFrame
162+
Columns: geo_id, timestamp, val, se, sample_size
163+
backup_dir: str
164+
Backup directory
165+
custom_run: bool
166+
Flag indicating if the current run is a patch, or other run where
167+
backups aren't needed. If so, don't save any data to disk
168+
issue: Optional[str]
169+
The date the data was fetched, in YYYYMMDD format. Defaults to "today"
170+
if not provided
171+
geo_res: Optional[str]
172+
Geographic resolution of the data
173+
sensor: Optional[str]
174+
Sensor that has been calculated (cumulative_counts vs new_counts)
175+
metric: Optional[str]
176+
Metric we are considering, if any.
177+
logger: Optional[logging.Logger]
178+
Pass a logger object here to log information about name and size of the backup file.
179+
180+
Returns
181+
---------
182+
dates: pd.Series[datetime]
183+
Series of dates for which CSV files were exported.
184+
"""
185+
if not custom_run:
186+
# Label the file with today's date (the date the data was fetched).
187+
if not issue:
188+
issue = datetime.today().strftime("%Y%m%d")
189+
190+
backup_filename = [issue, geo_res, metric, sensor]
191+
backup_filename = "_".join(filter(None, backup_filename))
192+
backup_file = join(backup_dir, backup_filename)
193+
try:
194+
# defacto data format is csv, but parquet preserved data types (keeping both as intermidary measures)
195+
df.to_csv(
196+
f"{backup_file}.csv.gz", index=False, na_rep="NA", compression="gzip"
197+
)
198+
df.to_parquet(f"{backup_file}.parquet", index=False)
199+
200+
if logger:
201+
logger.info(
202+
"Backup file created",
203+
backup_file=backup_file,
204+
backup_size=getsize(f"{backup_file}.csv.gz"),
205+
)
206+
# pylint: disable=W0703
207+
except Exception as e:
208+
logger.info("Backup file creation failed", msg=e)

_delphi_utils_python/delphi_utils/flash_eval/eval_day.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,7 @@ def output(evd_ranking, day, lag, signal, logger):
153153
p_text += f"\t{start_link}|*{index}*, {'{:.2f}'.format(value)}>\n"
154154
else:
155155
break
156-
name = f"Signal: {signal} Lag: {lag}"
157-
logger.info(name, payload=p_text)
156+
logger.info("FLaSH: worth inspecting", signal=signal, lag=lag, payload=p_text)
158157

159158

160159
def evd_ranking_fn(ts_streams, EVD_max, EVD_min):

_delphi_utils_python/delphi_utils/logger.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
"""Structured logger utility for creating JSON logs.
22
3-
See the delphi_utils README.md for usage examples.
3+
To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic,
4+
the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier),
5+
and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument
6+
to the logger call (for use in filtering, thresholding, grouping, visualization, etc)
47
5-
The Delphi group uses two ~identical versions of this file.
6-
Try to keep them in sync with edits, for sanity.
7-
https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py
8-
https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py
8+
See the delphi_utils README.md for usage examples.
99
"""
1010

1111
import contextlib

_delphi_utils_python/delphi_utils/runner.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
5151

5252
#Get version and indicator name for startup
5353
ind_name = indicator_fn.__module__.replace(".run", "")
54+
5455
#Check for version.cfg in indicator directory
5556
if os.path.exists("version.cfg"):
5657
with open("version.cfg") as ver_file:
@@ -59,9 +60,15 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
5960
if "current_version" in line:
6061
current_version = str.strip(line)
6162
current_version = current_version.replace("current_version = ", "")
62-
#Logging - Starting Indicator
63-
logger.info(f"Started {ind_name} with covidcast-indicators version {current_version}")
64-
else: logger.info(f"Started {ind_name} without version.cfg")
63+
logger.info(
64+
"Started a covidcast-indicator",
65+
indicator_name=ind_name,
66+
current_version=current_version,
67+
)
68+
else:
69+
logger.info(
70+
"Started a covidcast-indicator without version.cfg", indicator_name=ind_name
71+
)
6572

6673
indicator_fn(params)
6774
validator = validator_fn(params)
@@ -77,8 +84,10 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
7784
break
7885
time.sleep(1)
7986
else:
80-
logger.error(f"Flash step timed out ({timer} s), terminating",
81-
elapsed_time_in_seconds = round(time.time() - start, 2))
87+
logger.error(
88+
"Flash step timed out, terminating",
89+
elapsed_time_in_seconds=round(time.time() - start, 2),
90+
)
8291
t1.terminate()
8392
t1.join()
8493
if validator:

_delphi_utils_python/pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "delphi-utils"
7-
version = "0.3.25"
7+
version = "0.3.26"
88
description = "Shared Utility Functions for Indicators"
99
readme = "README.md"
1010
requires-python = "== 3.8.*"
@@ -13,7 +13,7 @@ classifiers = [
1313
"Development Status :: 5 - Production/Stable",
1414
"Intended Audience :: Developers",
1515
"Programming Language :: Python :: 3.8",
16-
"License :: MIT",
16+
"License :: OSI Approved :: MIT License",
1717
]
1818
dependencies = [
1919
"boto3",
@@ -23,6 +23,7 @@ dependencies = [
2323
"gitpython",
2424
"importlib_resources>=1.3",
2525
"numpy",
26+
"pyarrow",
2627
"pandas>=1.1.0",
2728
"requests",
2829
"slackclient",

_delphi_utils_python/tests/test_export.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
"""Tests for exporting CSV files."""
22
from datetime import datetime
3+
import logging
34
from os import listdir
45
from os.path import join
5-
from typing import Any, Dict, List
6+
from typing import Any, Dict
67

78
import mock
89
import numpy as np
910
import pandas as pd
1011
from pandas.testing import assert_frame_equal
1112

12-
from delphi_utils import create_export_csv, Nans
13+
from delphi_utils import create_export_csv, Nans, create_backup_csv, get_structured_logger
1314

1415

1516
def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame:
@@ -323,12 +324,13 @@ def test_export_df_with_missingness(self, tmp_path):
323324

324325
@mock.patch("delphi_utils.logger")
325326
def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path):
326-
327+
sensor = "test"
328+
geo_res = "state"
327329
create_export_csv(
328330
df=self.DF3.copy(),
329331
export_dir=tmp_path,
330-
geo_res="state",
331-
sensor="test",
332+
sensor=sensor,
333+
geo_res=geo_res,
332334
logger=mock_logger
333335
)
334336
assert set(listdir(tmp_path)) == set(
@@ -339,8 +341,9 @@ def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path):
339341
]
340342
)
341343
assert pd.read_csv(join(tmp_path, "20200315_state_test.csv")).size > 0
344+
date_str = datetime.strftime(self.TIMES[0], "%Y-%m-%d")
342345
mock_logger.info.assert_called_once_with(
343-
"Filtering contradictory missing code in test_None_2020-02-15."
346+
"Filtering contradictory missing code", sensor=sensor, metric=None, date=date_str
344347
)
345348

346349
def test_export_sort(self, tmp_path):
@@ -384,3 +387,22 @@ def test_export_sort(self, tmp_path):
384387
})
385388
sorted_csv = _set_df_dtypes(pd.read_csv(join(tmp_path, "20200215_county_test.csv")), dtypes={"geo_id": str})
386389
assert_frame_equal(sorted_csv,expected_df)
390+
391+
def test_create_backup_regular(self, caplog, tmp_path):
392+
caplog.set_level(logging.INFO)
393+
logger = get_structured_logger()
394+
today = datetime.strftime(datetime.today(), "%Y%m%d")
395+
dtypes = self.DF.dtypes.to_dict()
396+
del dtypes["timestamp"]
397+
geo_res = "county"
398+
metric = "test"
399+
sensor = "deaths"
400+
create_backup_csv(df=self.DF, backup_dir=tmp_path, custom_run=False, issue=None, geo_res=geo_res, metric=metric, sensor=sensor, logger=logger)
401+
assert "Backup file created" in caplog.text
402+
403+
actual = pd.read_csv(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.csv.gz"), dtype=dtypes, parse_dates=["timestamp"])
404+
assert self.DF.equals(actual)
405+
406+
actual_parquet = pd.read_parquet(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.parquet"))
407+
assert actual_parquet.equals(actual)
408+

_template_python/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ classifiers = [
1313
"Development Status :: 5 - Production/Stable",
1414
"Intended Audience :: Developers",
1515
"Programming Language :: Python :: 3.8",
16-
"License :: MIT",
16+
"License :: OSI Approved :: MIT License",
1717
]
1818
dependencies = [
1919
"delphi-utils",

ansible/templates/nchs_mortality-params-prod.json.j2

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"common": {
33
"daily_export_dir": "./daily_receiving",
4+
"backup_dir": "./raw_data_backups",
45
"log_filename": "/var/log/indicators/nchs_mortality.log",
56
"weekly_export_dir": "/common/covidcast/receiving/nchs-mortality"
67
},

0 commit comments

Comments
 (0)