-
Notifications
You must be signed in to change notification settings - Fork 16
bring loggers in sync and add multiproc capabilities #1885
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
06afcfe
48e7734
5922b62
aab01ff
432c47a
19f863b
f97058f
6be101f
0117135
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,45 @@ | ||
"""Structured logger utility for creating JSON logs in Delphi pipelines.""" | ||
"""Structured logger utility for creating JSON logs.""" | ||
|
||
# the Delphi group uses two ~identical versions of this file. | ||
# try to keep them in sync with edits, for sanity. | ||
# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py # pylint: disable=line-too-long | ||
# https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py | ||
|
||
import contextlib | ||
import logging | ||
import multiprocessing | ||
import os | ||
import sys | ||
import threading | ||
from traceback import format_exception | ||
|
||
import structlog | ||
|
||
|
||
def handle_exceptions(logger): | ||
"""Handle exceptions using the provided logger.""" | ||
def exception_handler(etype, value, traceback): | ||
|
||
def exception_handler(scope, etype, value, traceback): | ||
logger.exception("Top-level exception occurred", | ||
exc_info=(etype, value, traceback)) | ||
scope=scope, exc_info=(etype, value, traceback)) | ||
|
||
def sys_exception_handler(etype, value, traceback): | ||
exception_handler("sys", etype, value, traceback) | ||
|
||
def multithread_exception_handler(args): | ||
exception_handler(args.exc_type, args.exc_value, args.exc_traceback) | ||
def threading_exception_handler(args): | ||
if args.exc_type == SystemExit and args.exc_value.code == 0: | ||
# `sys.exit(0)` is considered "successful termination": | ||
# https://docs.python.org/3/library/sys.html#sys.exit | ||
logger.debug("normal thread exit", thread=args.thread, | ||
stack="".join( | ||
format_exception( | ||
args.exc_type, args.exc_value, args.exc_traceback))) | ||
else: | ||
exception_handler(f"thread: {args.thread}", | ||
args.exc_type, args.exc_value, args.exc_traceback) | ||
|
||
sys.excepthook = exception_handler | ||
threading.excepthook = multithread_exception_handler | ||
sys.excepthook = sys_exception_handler | ||
threading.excepthook = threading_exception_handler | ||
|
||
|
||
def get_structured_logger(name=__name__, | ||
|
@@ -40,12 +64,21 @@ def get_structured_logger(name=__name__, | |
is a good choice. | ||
filename: An (optional) file to write log output. | ||
""" | ||
# Configure the basic underlying logging configuration | ||
# Set the underlying logging configuration | ||
if "LOG_DEBUG" in os.environ: | ||
log_level = logging.DEBUG | ||
else: | ||
log_level = logging.INFO | ||
|
||
logging.basicConfig( | ||
format="%(message)s", | ||
level=logging.INFO, | ||
handlers=[logging.StreamHandler()] | ||
) | ||
level=log_level, | ||
handlers=[logging.StreamHandler()]) | ||
|
||
def add_pid(_logger, _method_name, event_dict): | ||
"""Add current PID to the event dict.""" | ||
event_dict["pid"] = os.getpid() | ||
return event_dict | ||
|
||
# Configure structlog. This uses many of the standard suggestions from | ||
# the structlog documentation. | ||
|
@@ -57,6 +90,8 @@ def get_structured_logger(name=__name__, | |
structlog.stdlib.add_logger_name, | ||
# Include log level in output. | ||
structlog.stdlib.add_log_level, | ||
# Include PID in output. | ||
add_pid, | ||
# Allow formatting into arguments e.g., logger.info("Hello, %s", | ||
# name) | ||
structlog.stdlib.PositionalArgumentsFormatter(), | ||
|
@@ -68,7 +103,7 @@ def get_structured_logger(name=__name__, | |
# Decode unicode characters | ||
structlog.processors.UnicodeDecoder(), | ||
# Render as JSON | ||
structlog.processors.JSONRenderer() | ||
structlog.processors.JSONRenderer(), | ||
], | ||
# Use a dict class for keeping track of data. | ||
context_class=dict, | ||
|
@@ -84,10 +119,136 @@ def get_structured_logger(name=__name__, | |
system_logger = logging.getLogger(name) | ||
if filename and not system_logger.handlers: | ||
system_logger.addHandler(logging.FileHandler(filename)) | ||
system_logger.setLevel(logging.INFO) | ||
system_logger.setLevel(log_level) | ||
logger = structlog.wrap_logger(system_logger) | ||
|
||
if log_exceptions: | ||
handle_exceptions(logger) | ||
|
||
return logger | ||
|
||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
class LoggerThread(): | ||
""" | ||
A construct to use a logger from multiprocessing workers/jobs. | ||
|
||
the bare structlog loggers are thread-safe but not multiprocessing-safe. | ||
a `LoggerThread` will spawn a thread that listens to a mp.Queue | ||
and logs messages from it with the provided logger, | ||
so other processes can send logging messages to it | ||
via the logger-like `SubLogger` interface. | ||
the SubLogger even logs the pid of the caller. | ||
|
||
this is good to use with a set of jobs that are part of a mp.Pool, | ||
but isnt recommended for general use | ||
because of overhead from threading and multiprocessing, | ||
and because it might introduce lag to log messages. | ||
dshemetov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
somewhat inspired by: | ||
docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes | ||
""" | ||
|
||
class SubLogger(): | ||
"""MP-safe logger-like interface to convey log messages to a listening LoggerThread.""" | ||
|
||
def __init__(self, queue): | ||
"""Create SubLogger with a bound queue.""" | ||
self.queue = queue | ||
|
||
def _log(self, level, *args, **kwargs): | ||
kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} | ||
kwargs_plus.update(kwargs) | ||
self.queue.put([level, args, kwargs_plus]) | ||
|
||
def debug(self, *args, **kwargs): | ||
"""Log a DEBUG level message.""" | ||
self._log(logging.DEBUG, *args, **kwargs) | ||
|
||
def info(self, *args, **kwargs): | ||
"""Log an INFO level message.""" | ||
self._log(logging.INFO, *args, **kwargs) | ||
|
||
def warning(self, *args, **kwargs): | ||
"""Log a WARNING level message.""" | ||
self._log(logging.WARNING, *args, **kwargs) | ||
|
||
def error(self, *args, **kwargs): | ||
"""Log an ERROR level message.""" | ||
self._log(logging.ERROR, *args, **kwargs) | ||
|
||
def critical(self, *args, **kwargs): | ||
"""Log a CRITICAL level message.""" | ||
self._log(logging.CRITICAL, *args, **kwargs) | ||
|
||
|
||
def get_sublogger(self): | ||
"""Retrieve SubLogger for this LoggerThread.""" | ||
return self.sublogger | ||
|
||
def __init__(self, logger, q=None): | ||
"""Create and start LoggerThread with supplied logger, creating a queue if not provided.""" | ||
self.logger = logger | ||
if q: | ||
self.msg_queue = q | ||
else: | ||
self.msg_queue = multiprocessing.Queue() | ||
|
||
def logger_thread_worker(): | ||
logger.info('thread started') | ||
while True: | ||
msg = self.msg_queue.get() | ||
if msg == 'STOP': | ||
logger.debug('received stop signal') | ||
break | ||
level, args, kwargs = msg | ||
if level in [logging.DEBUG, logging.INFO, logging.WARNING, | ||
logging.ERROR, logging.CRITICAL]: | ||
logger.log(level, *args, **kwargs) | ||
else: | ||
logger.error('received unknown logging level! exiting...', | ||
level=level, args_kwargs=(args, kwargs)) | ||
break | ||
logger.debug('stopping thread') | ||
|
||
self.thread = threading.Thread(target=logger_thread_worker, | ||
name="LoggerThread__"+logger.name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't quite follow the purpose of this thread... it seems like it should interact with self.sublogger in some way, no? Could you explain a bit? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the thread doesnt really "interact" with the sublogger; the sublogger acts indirectly upon the thread. the thread is spawned in the background with a handle to a queue and a handle to the provided Logger, and it just logs anything that shows up on the queue. the sublogger is used by things outside the thread (or even outside the process!) to enqueue things to be logged. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see! So verifying that I get it:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep! |
||
logger.debug('starting thread') | ||
self.thread.start() | ||
|
||
self.sublogger = LoggerThread.SubLogger(self.msg_queue) | ||
self.running = True | ||
|
||
def stop(self): | ||
"""Terminate this LoggerThread.""" | ||
if not self.running: | ||
self.logger.warning('thread already stopped') | ||
return | ||
self.logger.debug('sending stop signal') | ||
self.msg_queue.put('STOP') | ||
self.thread.join() | ||
self.running = False | ||
self.logger.info('thread stopped') | ||
|
||
|
||
@contextlib.contextmanager | ||
def pool_and_threadedlogger(logger, *poolargs): | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Provide (to a context) a multiprocessing Pool and a proxy to the supplied logger. | ||
|
||
Emulates the multiprocessing.Pool() context manager, | ||
but also provides (via a LoggerThread) a SubLogger proxy to logger | ||
that can be safely used by pool workers. | ||
melange396 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
The SubLogger proxy interface supports these methods: debug, info, warning, error, | ||
and critical. | ||
Also "cleans up" the pool by waiting for workers to complete | ||
as it exits the context. | ||
""" | ||
with multiprocessing.Manager() as manager: | ||
logger_thread = LoggerThread(logger, manager.Queue()) | ||
try: | ||
with multiprocessing.Pool(*poolargs) as pool: | ||
yield pool, logger_thread.get_sublogger() | ||
pool.close() | ||
pool.join() | ||
finally: | ||
logger_thread.stop() |
Uh oh!
There was an error while loading. Please reload this page.