9
9
SIGNALS ,
10
10
PROVIDER_NORMS ,
11
11
METRIC_SIGNALS ,
12
- METRIC_DATES ,
13
- SAMPLE_SITE_NAMES ,
14
12
SIG_DIGITS ,
15
- NEWLINE ,
13
+ TYPE_DICT ,
14
+ TYPE_DICT_METRIC ,
16
15
)
17
16
18
17
@@ -35,34 +34,29 @@ def sig_digit_round(value, n_digits):
35
34
return result
36
35
37
36
38
- def construct_typedicts ():
39
- """Create the type conversion dictionary for both dataframes."""
40
- # basic type conversion
41
- type_dict = {key : float for key in SIGNALS }
42
- type_dict ["timestamp" ] = "datetime64[ns]"
43
- # metric type conversion
44
- signals_dict_metric = {key : float for key in METRIC_SIGNALS }
45
- metric_dates_dict = {key : "datetime64[ns]" for key in METRIC_DATES }
46
- type_dict_metric = {** metric_dates_dict , ** signals_dict_metric , ** SAMPLE_SITE_NAMES }
47
- return type_dict , type_dict_metric
48
-
49
-
50
- def warn_string (df , type_dict ):
51
- """Format the warning string."""
52
- return f"""
37
+ def convert_df_type (df , type_dict , logger ):
38
+ """Convert types and warn if there are unexpected columns."""
39
+ try :
40
+ df = df .astype (type_dict )
41
+ except KeyError as exc :
42
+ newline = "\n "
43
+ raise KeyError (
44
+ f"""
53
45
Expected column(s) missed, The dataset schema may
54
46
have changed. Please investigate and amend the code.
55
47
56
- Columns needed:
57
- { NEWLINE .join (sorted (type_dict .keys ()))}
48
+ expected={ newline .join (sorted (type_dict .keys ()))}
58
49
59
- Columns available:
60
- { NEWLINE .join (sorted (df .columns ))}
50
+ received={ newline .join (sorted (df .columns ))}
61
51
"""
52
+ ) from exc
53
+ if new_columns := set (df .columns ) - set (type_dict .keys ()):
54
+ logger .info ("New columns found in NWSS dataset." , new_columns = new_columns )
55
+ return df
62
56
63
57
64
58
def reformat (df , df_metric ):
65
- """Add columns from df_metric to df, and rename some columns.
59
+ """Add columns from df_metric to df, and rename some columns.
66
60
67
61
Specifically the population and METRIC_SIGNAL columns, and renames date_start to timestamp.
68
62
"""
@@ -80,27 +74,16 @@ def reformat(df, df_metric):
80
74
return df
81
75
82
76
83
- def drop_unnormalized (df ):
84
- """Drop unnormalized.
85
-
86
- mutate `df` to no longer have rows where the normalization scheme isn't actually identified,
87
- as we can't classify the kind of signal
88
- """
89
- return df [~ df ["normalization" ].isna ()]
90
-
91
-
92
77
def add_identifier_columns (df ):
93
78
"""Add identifier columns.
94
79
95
80
Add columns to get more detail than key_plot_id gives;
96
81
specifically, state, and `provider_normalization`, which gives the signal identifier
97
82
"""
98
- df ["state" ] = df .key_plot_id .str .extract (
99
- r"_(\w\w)_"
100
- ) # a pair of alphanumerics surrounded by _
101
- df ["provider" ] = df .key_plot_id .str .extract (
102
- r"(.*)_[a-z]{2}_"
103
- ) # anything followed by state ^
83
+ # a pair of alphanumerics surrounded by _
84
+ df ["state" ] = df .key_plot_id .str .extract (r"_(\w\w)_" )
85
+ # anything followed by state ^
86
+ df ["provider" ] = df .key_plot_id .str .extract (r"(.*)_[a-z]{2}_" )
104
87
df ["signal_name" ] = df .provider + "_" + df .normalization
105
88
106
89
@@ -120,7 +103,7 @@ def check_endpoints(df):
120
103
)
121
104
122
105
123
- def pull_nwss_data (token : str ):
106
+ def pull_nwss_data (token : str , logger ):
124
107
"""Pull the latest NWSS Wastewater data, and conforms it into a dataset.
125
108
126
109
The output dataset has:
@@ -141,11 +124,6 @@ def pull_nwss_data(token: str):
141
124
pd.DataFrame
142
125
Dataframe as described above.
143
126
"""
144
- # Constants
145
- keep_columns = [* SIGNALS , * METRIC_SIGNALS ]
146
- # concentration key types
147
- type_dict , type_dict_metric = construct_typedicts ()
148
-
149
127
# Pull data from Socrata API
150
128
client = Socrata ("data.cdc.gov" , token )
151
129
results_concentration = client .get ("g653-rqe2" , limit = 10 ** 10 )
@@ -154,19 +132,14 @@ def pull_nwss_data(token: str):
154
132
df_concentration = pd .DataFrame .from_records (results_concentration )
155
133
df_concentration = df_concentration .rename (columns = {"date" : "timestamp" })
156
134
157
- try :
158
- df_concentration = df_concentration .astype (type_dict )
159
- except KeyError as exc :
160
- raise ValueError (warn_string (df_concentration , type_dict )) from exc
135
+ # Schema checks.
136
+ df_concentration = convert_df_type (df_concentration , TYPE_DICT , logger )
137
+ df_metric = convert_df_type (df_metric , TYPE_DICT_METRIC , logger )
161
138
162
- try :
163
- df_metric = df_metric .astype (type_dict_metric )
164
- except KeyError as exc :
165
- raise ValueError (warn_string (df_metric , type_dict_metric )) from exc
139
+ # Drop sites without a normalization scheme.
140
+ df = df_concentration [~ df_concentration ["normalization" ].isna ()]
166
141
167
- # if the normalization scheme isn't recorded, why is it even included as a sample site?
168
- df = drop_unnormalized (df_concentration )
169
- # pull 2 letter state labels out of the key_plot_id labels
142
+ # Pull 2 letter state labels out of the key_plot_id labels.
170
143
add_identifier_columns (df )
171
144
172
145
# move population and metric signals over to df
@@ -180,13 +153,14 @@ def pull_nwss_data(token: str):
180
153
# otherwise, best to assume some value rather than break the data)
181
154
df .population_served = df .population_served .ffill ()
182
155
check_endpoints (df )
183
- keep_columns .extend (
184
- [
185
- "timestamp" ,
186
- "state" ,
187
- "population_served" ,
188
- "normalization" ,
189
- "provider" ,
190
- ]
191
- )
156
+
157
+ keep_columns = [
158
+ * SIGNALS ,
159
+ * METRIC_SIGNALS ,
160
+ "timestamp" ,
161
+ "state" ,
162
+ "population_served" ,
163
+ "normalization" ,
164
+ "provider" ,
165
+ ]
192
166
return df [keep_columns ]
0 commit comments