Skip to content

caplog fixture: capture log records from another process #3037

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
hoefling opened this issue Dec 14, 2017 · 9 comments
Closed

caplog fixture: capture log records from another process #3037

hoefling opened this issue Dec 14, 2017 · 9 comments
Labels
plugin: logging related to the logging builtin plugin type: question general question, might be closed after 2 weeks of inactivity

Comments

@hoefling
Copy link
Member

caplog captures log records from spawned threads, but not from processes. This is probably a feature request rather than a bug, also unsure if a general and complete implementation of capturing logs from spawned processes is possible at all...

A simple test:

import concurrent.futures as futures
import logging
import time
import pytest

logging.basicConfig(level=logging.INFO)


def worker(max_count):
    count = 0
    while count < max_count:
        logging.getLogger().info('sleeping ...')
        time.sleep(1)
        count += 1


@pytest.mark.parametrize('Executor_', (futures.ThreadPoolExecutor, futures.ProcessPoolExecutor, ))
def test_spam(caplog, Executor_):
    num_records = 5

    with Executor_() as pool:
        fut = pool.submit(worker, num_records)

        all_records = []
        new_records = []
        running = True
        while running:
            time.sleep(2)
            new_records = [rec for rec in caplog.records if rec not in all_records]
            all_records = caplog.records[:]
            if not new_records:
                running = False

        futures.wait((fut, ), timeout=0)

    assert len(all_records) == num_records

Running the test yields:

$ pytest -v test_spam.py 
========================================================================================================= test session starts =========================================================================================================
platform darwin -- Python 3.6.3, pytest-3.3.1, py-1.5.2, pluggy-0.6.0 -- /Users/hoefling/.virtualenvs/pytest-caplog-test/bin/python3.6
cachedir: .cache
rootdir: /private/tmp, inifile:
collected 2 items                                                                                                                                                                                                                     

test_spam.py::test_spam[ThreadPoolExecutor] PASSED                                                                                                                                                                              [ 50%]
test_spam.py::test_spam[ProcessPoolExecutor] FAILED                                                                                                                                                                             [100%]

============================================================================================================== FAILURES ===============================================================================================================
___________________________________________________________________________________________________ test_spam[ProcessPoolExecutor] ____________________________________________________________________________________________________

caplog = <_pytest.logging.LogCaptureFixture object at 0x1053639e8>, Executor_ = <class 'concurrent.futures.process.ProcessPoolExecutor'>

    @pytest.mark.parametrize('Executor_', (futures.ThreadPoolExecutor, futures.ProcessPoolExecutor, ))
    def test_spam(caplog, Executor_):
    
        num_records = 5
    
        with Executor_() as pool:
            fut = pool.submit(worker, num_records)
    
            all_records = []
            new_records = []
            running = True
            while running:
                time.sleep(2)
                new_records = [rec for rec in caplog.records if rec not in all_records]
                all_records = caplog.records[:]
                if not new_records:
                    running = False
            futures.wait((fut, ), timeout=0)
    
>       assert len(all_records) == num_records
E       assert 0 == 5
E        +  where 0 = len([])

test_spam.py:36: AssertionError
-------------------------------------------------------------------------------------------------------- Captured stderr call ---------------------------------------------------------------------------------------------------------
INFO:root:sleeping ...
INFO:root:sleeping ...
INFO:root:sleeping ...
INFO:root:sleeping ...
INFO:root:sleeping ...
================================================================================================= 1 failed, 1 passed in 13.09 seconds =================================================================================================
@hoefling hoefling changed the title caplog: capture log records from another process caplog fixture: capture log records from another process Dec 14, 2017
@pytestbot
Copy link
Contributor

GitMate.io thinks the contributor most likely able to help you is @RonnyPfannschmidt.

@nicoddemus
Copy link
Member

@hoefling thanks, but I don't know if that's even possible because it all depends on how processes are spanwed (and by who) and which files are actually opened or not (stdout and stderr).

@hoefling
Copy link
Member Author

Yeah, thought that myself, too. The reason for opening this issue is that I have to write tests for a tool that spawns instances of multiprocessing.Process like hell, one test requires collecting and validating the logs that are emitted. I guess I'll try to monkeypatch some handler classes from stdlib then, will see if this works out.

@nicoddemus
Copy link
Member

Thanks! Please report back here so it can help others, and also to know if we can close this issue or you found a way to incorporate this into pytest's logging. 👍

@nicoddemus nicoddemus added plugin: logging related to the logging builtin plugin type: question general question, might be closed after 2 weeks of inactivity labels Dec 14, 2017
@nicoddemus
Copy link
Member

I'm closing this for now, but please do come back with your findings!

@FanchenBao
Copy link

I am able to capture all logs from a child process by using logging.handlers.QueueHandler(). OP's code is modified as below, which passes all tests:

import concurrent.futures as futures
import logging
import time
import pytest

from logging import handlers
from multiprocessing import Queue


# Need to set up a queue and a sentinel
msg_q = Queue()
sentinel = 'foo'

logger = logging.getLogger()
logger.addHandler(handlers.QueueHandler(msg_q))
logger.setLevel(logging.INFO)


def worker(max_count):
    count = 0
    while count < max_count:
        logger.info('sleeping ...')
        time.sleep(1)
        count += 1
    logger.info(sentinel)


@pytest.mark.parametrize('Executor_', (futures.ThreadPoolExecutor, futures.ProcessPoolExecutor, ))
def test_spam(caplog, Executor_):
    num_records = 5

    with Executor_() as pool:
        fut = pool.submit(worker, num_records)

        all_records = []
        running = True
        while running:
            time.sleep(2)
            while not msg_q.empty():
                all_records.append(msg_q.get().getMessage())
            if sentinel in all_records:  # check for sentinel
                all_records.remove(sentinel)
                running = False

        futures.wait((fut, ), timeout=0)

    assert len(all_records) == num_records

@zhukovgreen
Copy link

zhukovgreen commented Jul 23, 2020

Just a little more handy version which extends the proposed from @FanchenBao .
This allows using caplog to capture the messages

import logging
import sys
from contextlib import contextmanager
from logging import handlers
from multiprocessing import Process, Queue

import pytest

logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)


logger = logging.getLogger(__name__)


class ActorContext:
    def __init__(self):
        self.log = logger

    def run(self):
        self.log.debug("Some msg")


current_actor_context = ActorContext()


@pytest.fixture()
def caplog_workaround():
    @contextmanager
    def ctx():
        logger_queue = Queue()
        logger = logging.getLogger()
        logger.addHandler(handlers.QueueHandler(logger_queue))
        yield
        while not logger_queue.empty():
            log_record: logging.LogRecord = logger_queue.get()
            logger._log(
                level=log_record.levelno,
                msg=log_record.message,
                args=log_record.args,
                exc_info=log_record.exc_info,
            )

    return ctx


def test_caplog_already_not_fails(caplog, caplog_workaround):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        with caplog_workaround():
            p = Process(target=current_actor_context.run)
            p.start()
            p.join()
    assert "Some msg" in caplog.text


def test_caplog_passes(caplog, capsys):
    with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
        current_actor_context.run()
    assert "Some msg" in caplog.text

@zhukovgreen
Copy link

@nicoddemus I think this issue might be reopened since there is no straightforward solution found to use the caplog for logs from different processes.

@pepoluan
Copy link

pepoluan commented Dec 15, 2020

Just to add my 2 cents, because I was bitten by the same problem.

My program is "very" multiprocessing: A process to receive messages and put it in a Queue, some processes to consume the message from the Queue, another process ("log-process") to gather all the logs (passed through yet another Queue) and log to console + rotating file, and the main process to monitor & controll all the above.

So I patched the logging configuration in log-process like so:

    tap_handler = logging.handlers.DatagramHandler("127.0.0.1", 5140)
    tap_handler.setLevel(logging.DEBUG)
    logger.addHandler(tap_handler)

then I create a fixture like so:

import socket
from collections import deque


class LogRecordKeeper:
    def __init__(self):
        self._records = deque()

    def appendleft(self, x):
        self._records.appendleft(x)

    def has_log_within(self, n: int, **expected_logrec_fields):
        for i in range(0, n):
            rec = self._records[i]
            if all(rec[k] == v for k, v in expected_logrec_fields.items()):
                return True
        return False


@pytest.fixture
def logreceiver() -> LogRecordKeeper:
    records = LogRecordKeeper()

    def listener():
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.bind(("127.0.0.1", 5140))
        while True:
            data = s.recv(4096)
            if data == b"die":
                break
            # Dont forget to skip over the 32-bit length prepended
            logrec = pickle.loads(data[4:])
            records.appendleft(logrec)

    receiver_thread = threading.Thread(target=listener)
    receiver_thread.start()
    #
    yield records
    #
    t = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    t.sendto(b"die", ("127.0.0.1", 5140))
    receiver_thread.join()

Finally I use the fixture like so:

import pytest

def test_something(logreceiver):
    ... do things that should generate logs ...
    # Need to scan back because last log may come from a different process
    # (e.g., "status" update or "I am alive" heartbeat)
    assert logreceiver.has_log_within(
        3,
        msg="expected message",
        levelname="INFO",
    )
    ... further asserts if desired ...

Yeah, I cannot use caplog, but I can now 'assert' for expected log entries.

The benefits of using UDP instead of Queue is that one does not have to drain the Queue afterwards, nor does one have to inject the Queue into the being-monitored processes before they get yeeted into a separate process.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
plugin: logging related to the logging builtin plugin type: question general question, might be closed after 2 weeks of inactivity
Projects
None yet
Development

No branches or pull requests

6 participants