Skip to content

bring loggers in sync and add multiproc capabilities #1254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 172 additions & 18 deletions src/common/logger.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,50 @@
"""Structured logger utility for creating JSON logs."""

# the Delphi group uses two ~identical versions of this file.
# try to keep them in sync with edits, for sanity.
# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py # pylint: disable=line-too-long
# https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py

import contextlib
import logging
import multiprocessing
import os
import sys
import threading
from traceback import format_exception

import structlog


def handle_exceptions(logger):
"""Handle exceptions using the provided logger."""

def exception_handler(etype, value, traceback):
logger.exception("Top-level exception occurred", exc_info=(etype, value, traceback))
def exception_handler(scope, etype, value, traceback):
logger.exception("Top-level exception occurred",
scope=scope, exc_info=(etype, value, traceback))

def multithread_exception_handler(args):
exception_handler(args.exc_type, args.exc_value, args.exc_traceback)
def sys_exception_handler(etype, value, traceback):
exception_handler("sys", etype, value, traceback)

sys.excepthook = exception_handler
threading.excepthook = multithread_exception_handler
def threading_exception_handler(args):
if args.exc_type == SystemExit and args.exc_value.code == 0:
# `sys.exit(0)` is considered "successful termination":
# https://docs.python.org/3/library/sys.html#sys.exit
logger.debug("normal thread exit", thread=args.thread,
stack="".join(
format_exception(
args.exc_type, args.exc_value, args.exc_traceback)))
else:
exception_handler(f"thread: {args.thread}",
args.exc_type, args.exc_value, args.exc_traceback)

sys.excepthook = sys_exception_handler
threading.excepthook = threading_exception_handler

def get_structured_logger(name=__name__, filename=None, log_exceptions=True):

def get_structured_logger(name=__name__,
filename=None,
log_exceptions=True):
"""Create a new structlog logger.

Use the logger returned from this in indicator code using the standard
Expand All @@ -38,22 +64,19 @@ def get_structured_logger(name=__name__, filename=None, log_exceptions=True):
is a good choice.
filename: An (optional) file to write log output.
"""
# Configure the underlying logging configuration
handlers = [logging.StreamHandler()]
if filename:
handlers.append(logging.FileHandler(filename))

# Set the underlying logging configuration
if "LOG_DEBUG" in os.environ:
log_level = logging.DEBUG
else:
log_level = logging.INFO

logging.basicConfig(format="%(message)s", level=log_level, handlers=handlers)
logging.basicConfig(
format="%(message)s",
level=log_level,
handlers=[logging.StreamHandler()])

def add_pid(logger, method_name, event_dict):
"""
Add current PID to the event dict.
"""
def add_pid(_logger, _method_name, event_dict):
"""Add current PID to the event dict."""
event_dict["pid"] = os.getpid()
return event_dict

Expand Down Expand Up @@ -92,9 +115,140 @@ def add_pid(logger, method_name, event_dict):
cache_logger_on_first_use=True,
)

logger = structlog.get_logger(name)
# Create the underlying python logger and wrap it with structlog
system_logger = logging.getLogger(name)
if filename and not system_logger.handlers:
system_logger.addHandler(logging.FileHandler(filename))
system_logger.setLevel(log_level)
logger = structlog.wrap_logger(system_logger)

if log_exceptions:
handle_exceptions(logger)

return logger


class LoggerThread():
"""
A construct to use a logger from multiprocessing workers/jobs.

the bare structlog loggers are thread-safe but not multiprocessing-safe.
a `LoggerThread` will spawn a thread that listens to a mp.Queue
and logs messages from it with the provided logger,
so other processes can send logging messages to it
via the logger-like `SubLogger` interface.
the SubLogger even logs the pid of the caller.

this is good to use with a set of jobs that are part of a mp.Pool,
but isnt recommended for general use
because of overhead from threading and multiprocessing,
and because it might introduce lag to log messages.

somewhat inspired by:
docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
"""

class SubLogger():
"""MP-safe logger-like interface to convey log messages to a listening LoggerThread."""

def __init__(self, queue):
"""Create SubLogger with a bound queue."""
self.queue = queue

def _log(self, level, *args, **kwargs):
kwargs_plus = {'sub_pid': multiprocessing.current_process().pid}
kwargs_plus.update(kwargs)
self.queue.put([level, args, kwargs_plus])

def debug(self, *args, **kwargs):
"""Log a DEBUG level message."""
self._log(logging.DEBUG, *args, **kwargs)

def info(self, *args, **kwargs):
"""Log an INFO level message."""
self._log(logging.INFO, *args, **kwargs)

def warning(self, *args, **kwargs):
"""Log a WARNING level message."""
self._log(logging.WARNING, *args, **kwargs)

def error(self, *args, **kwargs):
"""Log an ERROR level message."""
self._log(logging.ERROR, *args, **kwargs)

def critical(self, *args, **kwargs):
"""Log a CRITICAL level message."""
self._log(logging.CRITICAL, *args, **kwargs)


def get_sublogger(self):
"""Retrieve SubLogger for this LoggerThread."""
return self.sublogger

def __init__(self, logger, q=None):
"""Create and start LoggerThread with supplied logger, creating a queue if not provided."""
self.logger = logger
if q:
self.msg_queue = q
else:
self.msg_queue = multiprocessing.Queue()

def logger_thread_worker():
logger.info('thread started')
while True:
msg = self.msg_queue.get()
if msg == 'STOP':
logger.debug('received stop signal')
break
level, args, kwargs = msg
if level in [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]:
logger.log(level, *args, **kwargs)
else:
logger.error('received unknown logging level! exiting...',
level=level, args_kwargs=(args, kwargs))
break
logger.debug('stopping thread')

self.thread = threading.Thread(target=logger_thread_worker,
name="LoggerThread__"+logger.name)
logger.debug('starting thread')
self.thread.start()

self.sublogger = LoggerThread.SubLogger(self.msg_queue)
self.running = True

def stop(self):
"""Terminate this LoggerThread."""
if not self.running:
self.logger.warning('thread already stopped')
return
self.logger.debug('sending stop signal')
self.msg_queue.put('STOP')
self.thread.join()
self.running = False
self.logger.info('thread stopped')


@contextlib.contextmanager
def pool_and_threadedlogger(logger, *poolargs):
"""
Provide (to a context) a multiprocessing Pool and a proxy to the supplied logger.

Emulates the multiprocessing.Pool() context manager,
but also provides (via a LoggerThread) a SubLogger proxy to logger
that can be safely used by pool workers.
The SubLogger proxy interface supports these methods: debug, info, warning, error,
and critical.
Also "cleans up" the pool by waiting for workers to complete
as it exits the context.
"""
with multiprocessing.Manager() as manager:
logger_thread = LoggerThread(logger, manager.Queue())
try:
with multiprocessing.Pool(*poolargs) as pool:
yield pool, logger_thread.get_sublogger()
pool.close()
pool.join()
finally:
logger_thread.stop()