Skip to content
15 changes: 15 additions & 0 deletions google/cloud/logging_v2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ def __init__(
else:
self._use_grpc = _use_grpc

self._handlers = set()

@property
def logging_api(self):
"""Helper for logging-related API calls.
Expand Down Expand Up @@ -411,4 +413,17 @@ def setup_logging(
dict: keyword args passed to handler constructor
"""
handler = self.get_default_handler(**kw)
self._handlers.add(handler)
setup_logging(handler, log_level=log_level, excluded_loggers=excluded_loggers)

def flush_handlers(self):
"""Flushes all Python log handlers associated with this Client."""

for handler in self._handlers:
handler.flush()

def close(self):
"""Closes the Client and all handlers associated with this Client."""
super(Client, self).close()
for handler in self._handlers:
handler.close()
24 changes: 24 additions & 0 deletions google/cloud/logging_v2/handlers/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ def __init__(
resource = detect_resource(client.project)
self.name = name
self.client = client
client._handlers.add(self)
self.transport = transport(client, name, resource=resource)
self._transport_open = True
self._transport_cls = transport
self.project_id = client.project
self.resource = resource
self.labels = labels
Expand All @@ -213,6 +216,12 @@ def emit(self, record):
labels = {**add_resource_labels(resource, record), **(labels or {})} or None

# send off request
if not self._transport_open:
self.transport = self._transport_cls(
self.client, self.name, resource=self.resource
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this mean the handler has been closed? Why not just raise an error, instead of re-creating the transport?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I based this off of some logging handlers I saw in the Python standard library, which will reopen a closed connection if that connection is closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I think this adds extra complexity, so I'd prefer to keep them permanently closed if we can get away with that

But if you think this behavior would help us integrate with the standard library, it makes sense to keep it.

self._transport_open = True

self.transport.send(
record,
message,
Expand All @@ -225,6 +234,21 @@ def emit(self, record):
source_location=record._source_location,
)

def flush(self):
"""Forces the Transport object to submit any pending log records.

For SyncTransport, this is a no-op.
"""
super(CloudLoggingHandler, self).flush()
if self._transport_open:
self.transport.flush()

def close(self):
"""Closes the log handler and cleans up all Transport objects used."""
self.transport.close()
self.transport = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the idea of using transport == None as a way to check if the handler is closed. That makes the types quite a bit more complicated.

Can't we use a new flag for this?

self._transport_open = False


def _format_and_parse_message(record, formatter_handler):
"""
Expand Down
58 changes: 46 additions & 12 deletions google/cloud/logging_v2/handlers/transports/background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@
_WORKER_TERMINATOR = object()
_LOGGER = logging.getLogger(__name__)

_CLOSE_THREAD_SHUTDOWN_ERROR_MSG = (
"CloudLoggingHandler shutting down, cannot send logs entries to Cloud Logging due to "
"inconsistent threading behavior at shutdown. To avoid this issue, flush the logging handler "
"manually or switch to StructuredLogHandler. You can also close the CloudLoggingHandler manually "
"via handler.close or client.close."
)


def _get_many(queue_, *, max_items=None, max_latency=0):
"""Get multiple items from a Queue.
Expand Down Expand Up @@ -140,9 +147,11 @@ def _thread_main(self):
else:
batch.log(**item)

self._safely_commit_batch(batch)
# We cannot commit logs upstream if the main thread is shutting down
if threading.main_thread().is_alive():
self._safely_commit_batch(batch)

for _ in items:
for it in items:
self._queue.task_done()

_LOGGER.debug("Background thread exited gracefully.")
Expand All @@ -162,7 +171,7 @@ def start(self):
)
self._thread.daemon = True
self._thread.start()
atexit.register(self._main_thread_terminated)
atexit.register(self._handle_exit)

def stop(self, *, grace_period=None):
"""Signals the background thread to stop.
Expand Down Expand Up @@ -202,26 +211,26 @@ def stop(self, *, grace_period=None):

return success

def _main_thread_terminated(self):
"""Callback that attempts to send pending logs before termination."""
def _close(self, close_msg):
"""Callback that attempts to send pending logs before termination if the main thread is alive."""
if not self.is_alive:
return

if not self._queue.empty():
print(
"Program shutting down, attempting to send %d queued log "
"entries to Cloud Logging..." % (self._queue.qsize(),),
file=sys.stderr,
)
print(close_msg, file=sys.stderr)

if self.stop(grace_period=self._grace_period):
if threading.main_thread().is_alive() and self.stop(
grace_period=self._grace_period
):
print("Sent all pending logs.", file=sys.stderr)
else:
elif not self._queue.empty():
print(
"Failed to send %d pending logs." % (self._queue.qsize(),),
file=sys.stderr,
)

self._thread = None

def enqueue(self, record, message, **kwargs):
"""Queues a log entry to be written by the background thread.

Expand Down Expand Up @@ -251,6 +260,26 @@ def flush(self):
"""Submit any pending log records."""
self._queue.join()

def close(self):
"""Signals the worker thread to stop, then closes the transport thread.

This call will attempt to send pending logs before termination, and
should be followed up by disowning the transport object.
"""
atexit.unregister(self._handle_exit)
self._close(
"Background thread shutting down, attempting to send %d queued log "
"entries to Cloud Logging..." % (self._queue.qsize(),)
)

def _handle_exit(self):
"""Handle system exit.

Since we cannot send pending logs during system shutdown due to thread errors,
log an error message to stderr to notify the user.
"""
self._close(_CLOSE_THREAD_SHUTDOWN_ERROR_MSG)


class BackgroundThreadTransport(Transport):
"""Asynchronous transport that uses a background thread."""
Expand Down Expand Up @@ -285,6 +314,7 @@ def __init__(
"""
self.client = client
logger = self.client.logger(name, resource=resource)
self.grace_period = grace_period
self.worker = _Worker(
logger,
grace_period=grace_period,
Expand All @@ -307,3 +337,7 @@ def send(self, record, message, **kwargs):
def flush(self):
"""Submit any pending log records."""
self.worker.flush()

def close(self):
"""Closes the worker thread."""
self.worker.close()
8 changes: 8 additions & 0 deletions google/cloud/logging_v2/handlers/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,11 @@ def flush(self):

For blocking/sync transports, this is a no-op.
"""
pass

def close(self):
"""Closes the transport and cleans up resources used by it.

This call should be followed up by disowning the transport.
"""
pass
7 changes: 7 additions & 0 deletions google/cloud/logging_v2/handlers/transports/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,10 @@ def send(self, record, message, **kwargs):
labels=labels,
**kwargs,
)

def close(self):
"""Closes the transport and cleans up resources used by it.

This call is usually followed up by cleaning up the reference to the transport.
"""
self.logger = None
67 changes: 67 additions & 0 deletions tests/system/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import google.cloud.logging
from google.cloud._helpers import UTC
from google.cloud.logging_v2.handlers import CloudLoggingHandler
from google.cloud.logging_v2.handlers.transports import BackgroundThreadTransport
from google.cloud.logging_v2.handlers.transports import SyncTransport
from google.cloud.logging_v2 import client
from google.cloud.logging_v2.resource import Resource
Expand Down Expand Up @@ -719,6 +720,72 @@ def test_log_handler_otel_integration(self):
self.assertEqual(entries[0].span_id, expected_span_id)
self.assertTrue(entries[0].trace_sampled, expected_tracesampled)

def test_log_handler_close(self):
from multiprocessing import Process

LOG_MESSAGE = "This is a test of handler.close before exiting."
LOGGER_NAME = "close-test"
handler_name = self._logger_name(LOGGER_NAME)

# only create the logger to delete, hidden otherwise
logger = Config.CLIENT.logger(handler_name)
self.to_delete.append(logger)

# Run a simulation of logging an entry then immediately shutting down.
# The .close() function before the process exits should prevent the
# thread shutdown error and let us log the message.
def subprocess_main():
# logger.delete and logger.list_entries work by filtering on log name, so we
# can create new objects with the same name and have the queries on the parent
# process still work.
handler = CloudLoggingHandler(
Config.CLIENT, name=handler_name, transport=BackgroundThreadTransport
)
cloud_logger = logging.getLogger(LOGGER_NAME)
cloud_logger.addHandler(handler)
cloud_logger.warning(LOG_MESSAGE)
handler.close()

proc = Process(target=subprocess_main)
proc.start()
proc.join()
entries = _list_entries(logger)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].payload, LOG_MESSAGE)

def test_log_client_flush_handlers(self):
from multiprocessing import Process

LOG_MESSAGE = "This is a test of client.flush_handlers before exiting."
LOGGER_NAME = "close-test"
handler_name = self._logger_name(LOGGER_NAME)

# only create the logger to delete, hidden otherwise
logger = Config.CLIENT.logger(handler_name)
self.to_delete.append(logger)

# Run a simulation of logging an entry then immediately shutting down.
# The .close() function before the process exits should prevent the
# thread shutdown error and let us log the message.
def subprocess_main():
# logger.delete and logger.list_entries work by filtering on log name, so we
# can create new objects with the same name and have the queries on the parent
# process still work.
handler = CloudLoggingHandler(
Config.CLIENT, name=handler_name, transport=BackgroundThreadTransport
)
cloud_logger = logging.getLogger(LOGGER_NAME)
cloud_logger.addHandler(handler)
cloud_logger.warning(LOG_MESSAGE)
Config.CLIENT.flush_handlers()

proc = Process(target=subprocess_main)
proc.start()
proc.join()
entries = _list_entries(logger)
self.assertEqual(len(entries), 1)
self.assertEqual(entries[0].payload, LOG_MESSAGE)

def test_create_metric(self):
METRIC_NAME = "test-create-metric%s" % (_RESOURCE_ID,)
metric = Config.CLIENT.metric(
Expand Down
Loading