Skip to content

Add multiprocess logger to the Quidel indicator #1891

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 63 additions & 77 deletions quidel_covidtest/delphi_quidel_covidtest/run.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
"""
import atexit
from datetime import datetime
from multiprocessing import Manager, Pool, cpu_count, current_process
from multiprocessing import cpu_count
import time
from typing import Dict, Any

@@ -15,6 +15,7 @@
create_export_csv,
get_structured_logger
)
from delphi_utils.logger import pool_and_threadedlogger

from .constants import (END_FROM_TODAY_MINUS,
SMOOTHED_POSITIVE, RAW_POSITIVE,
@@ -60,15 +61,11 @@ def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device,
first_date, last_date, suffix, # generate args
geo_res, sensor_name, export_dir,
export_start_date, export_end_date, # export args
lock, log_filename, log_exceptions): # logger args
threaded_logger): # logger args
"""Generate sensors, create export CSV then return stats."""
# logger cannot be passed to child processes, so has to be recreated
with lock:
logger = get_structured_logger(__name__, log_filename, log_exceptions)
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name,
pid=current_process().pid)
threaded_logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name)
res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device,
first_date, last_date, suffix)
dates = create_export_csv(res_df, geo_res=geo_res,
@@ -81,15 +78,11 @@ def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, de
first_date, last_date, suffix, # generate args
geo_res, sensor_name, export_dir,
export_start_date, export_end_date, # export args
lock, log_filename, log_exceptions): # logger args
threaded_logger): # logger args
"""Generate sensors, create export CSV then return stats."""
# logger cannot be passed to child processes, so has to be recreated
with lock:
logger = get_structured_logger(__name__, log_filename, log_exceptions)
logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name,
pid=current_process().pid)
threaded_logger.info("Generating signal and exporting to CSV",
geo_res=geo_res,
sensor=sensor_name)
res_df = generate_sensor_for_parent_geo(geo_groups, geo_data, res_key, smooth, device,
first_date, last_date, suffix)
dates = create_export_csv(res_df, geo_res=geo_res,
@@ -168,72 +161,65 @@ def run_module(params: Dict[str, Any]):
prefix="wip_")
smoothers = get_smooth_info(sensors, SMOOTHERS)
n_cpu = min(8, cpu_count()) # for parallelization
with Manager() as manager:
with pool_and_threadedlogger(logger, n_cpu) as (pool, threaded_logger):
# for using loggers in multiple threads
# disabled due to a Pylint bug, resolved by version bump (#1886)
lock = manager.Lock() # pylint: disable=no-member
logger.info("Parallelizing sensor generation", n_cpu=n_cpu)
with Pool(n_cpu) as pool:
pool_results = []
for geo_res in NONPARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
geo_groups = geo_data.groupby(res_key)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_nonparent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger params
lock,
params["common"].get("log_filename"),
params["common"].get("log_exceptions", True)
)
pool_results = []
for geo_res in NONPARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
geo_groups = geo_data.groupby(res_key)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_nonparent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger
threaded_logger
)
)
assert geo_res == "state" # Make sure geo_groups is for state level
# County/HRR/MSA level
for geo_res in PARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_parent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, geo_data, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger params
lock,
params["common"].get("log_filename"),
params["common"].get("log_exceptions", True)
)
)
assert geo_res == "state" # Make sure geo_groups is for state level
# County/HRR/MSA level
for geo_res in PARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_parent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, geo_data, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger
threaded_logger
)
)
pool_results = [proc.get() for proc in pool_results]
for dates in pool_results:
if len(dates) > 0:
stats.append((max(dates), len(dates)))
)
pool_results = [proc.get() for proc in pool_results]
for dates in pool_results:
if len(dates) > 0:
stats.append((max(dates), len(dates)))

# Export the cache file if the pipeline runs successfully.
# Otherwise, don't update the cache file