Skip to content

Prefix producer logs w/ client id and transactional id #2591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ class KafkaProducer(object):
}

def __init__(self, **configs):
log.debug("Starting the Kafka producer") # trace
self.config = copy.copy(self.DEFAULT_CONFIG)
user_provided_configs = set(configs.keys())
for key in self.config:
Expand Down Expand Up @@ -409,8 +408,10 @@ def __init__(self, **configs):
self.config['api_version'] = None
else:
self.config['api_version'] = tuple(map(int, deprecated.split('.')))
log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated',
str(self.config['api_version']), deprecated)
log.warning('%s: use api_version=%s [tuple] -- "%s" as str is deprecated',
self, str(self.config['api_version']), deprecated)

log.debug("%s: Starting Kafka producer", self)

# Configure metrics
if self.config['metrics_enabled']:
Expand Down Expand Up @@ -466,26 +467,26 @@ def __init__(self, **configs):
metadata=self._metadata,
)
if self._transaction_manager.is_transactional():
log.info("Instantiated a transactional producer.")
log.info("%s: Instantiated a transactional producer.", self)
else:
log.info("Instantiated an idempotent producer.")
log.info("%s: Instantiated an idempotent producer.", self)

if 'retries' not in user_provided_configs:
log.info("Overriding the default 'retries' config to 3 since the idempotent producer is enabled.")
log.info("%s: Overriding the default 'retries' config to 3 since the idempotent producer is enabled.", self)
self.config['retries'] = 3
elif self.config['retries'] == 0:
raise Errors.KafkaConfigurationError("Must set 'retries' to non-zero when using the idempotent producer.")

if 'max_in_flight_requests_per_connection' not in user_provided_configs:
log.info("Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.")
log.info("%s: Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.", self)
self.config['max_in_flight_requests_per_connection'] = 1
elif self.config['max_in_flight_requests_per_connection'] != 1:
raise Errors.KafkaConfigurationError("Must set 'max_in_flight_requests_per_connection' to 1 in order"
" to use the idempotent producer."
" Otherwise we cannot guarantee idempotence.")

if 'acks' not in user_provided_configs:
log.info("Overriding the default 'acks' config to 'all' since idempotence is enabled")
log.info("%s: Overriding the default 'acks' config to 'all' since idempotence is enabled", self)
self.config['acks'] = -1
elif self.config['acks'] != -1:
raise Errors.KafkaConfigurationError("Must set 'acks' config to 'all' in order to use the idempotent"
Expand All @@ -509,7 +510,7 @@ def __init__(self, **configs):

self._cleanup = self._cleanup_factory()
atexit.register(self._cleanup)
log.debug("Kafka producer started")
log.debug("%s: Kafka producer started", self)

def bootstrap_connected(self):
"""Return True if the bootstrap is connected."""
Expand Down Expand Up @@ -564,7 +565,7 @@ def __getattr__(self, name):
self._unregister_cleanup()

if not hasattr(self, '_closed') or self._closed:
log.info('Kafka producer closed')
log.info('%s: Kafka producer closed', self)
return
if timeout is None:
# threading.TIMEOUT_MAX is available in Python3.3+
Expand All @@ -574,26 +575,26 @@ def __getattr__(self, name):
else:
assert timeout >= 0

log.info("Closing the Kafka producer with %s secs timeout.", timeout)
log.info("%s: Closing the Kafka producer with %s secs timeout.", self, timeout)
self.flush(timeout)
invoked_from_callback = bool(threading.current_thread() is self._sender)
if timeout > 0:
if invoked_from_callback:
log.warning("Overriding close timeout %s secs to 0 in order to"
log.warning("%s: Overriding close timeout %s secs to 0 in order to"
" prevent useless blocking due to self-join. This"
" means you have incorrectly invoked close with a"
" non-zero timeout from the producer call-back.",
timeout)
self, timeout)
else:
# Try to close gracefully.
if self._sender is not None:
self._sender.initiate_close()
self._sender.join(timeout)

if self._sender is not None and self._sender.is_alive():
log.info("Proceeding to force close the producer since pending"
log.info("%s: Proceeding to force close the producer since pending"
" requests could not be completed within timeout %s.",
timeout)
self, timeout)
self._sender.force_close()

if self._metrics:
Expand All @@ -607,7 +608,7 @@ def __getattr__(self, name):
except AttributeError:
pass
self._closed = True
log.debug("The Kafka producer has closed.")
log.debug("%s: The Kafka producer has closed.", self)

def partitions_for(self, topic):
"""Returns set of all known partitions for the topic."""
Expand Down Expand Up @@ -816,7 +817,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
self._ensure_valid_record_size(message_size)

tp = TopicPartition(topic, partition)
log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
log.debug("%s: Sending (key=%r value=%r headers=%r) to %s", self, key, value, headers, tp)

if self._transaction_manager and self._transaction_manager.is_transactional():
self._transaction_manager.maybe_add_partition_to_transaction(tp)
Expand All @@ -825,16 +826,16 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
key_bytes, value_bytes, headers)
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
log.debug("Waking up the sender since %s is either full or"
" getting a new batch", tp)
log.debug("%s: Waking up the sender since %s is either full or"
" getting a new batch", self, tp)
self._sender.wakeup()

return future
# handling exceptions and record the errors;
# for API exceptions return them in the future,
# for other exceptions raise directly
except Errors.BrokerResponseError as e:
log.error("Exception occurred during message send: %s", e)
log.error("%s: Exception occurred during message send: %s", self, e)
return FutureRecordMetadata(
FutureProduceResult(TopicPartition(topic, partition)),
-1, None, None,
Expand Down Expand Up @@ -865,7 +866,7 @@ def flush(self, timeout=None):
KafkaTimeoutError: failure to flush buffered records within the
provided timeout
"""
log.debug("Flushing accumulated records in producer.") # trace
log.debug("%s: Flushing accumulated records in producer.", self)
self._accumulator.begin_flush()
self._sender.wakeup()
self._accumulator.await_flush_completion(timeout=timeout)
Expand Down Expand Up @@ -911,7 +912,7 @@ def _wait_on_metadata(self, topic, max_wait):
if not metadata_event:
metadata_event = threading.Event()

log.debug("Requesting metadata update for topic %s", topic)
log.debug("%s: Requesting metadata update for topic %s", self, topic)

metadata_event.clear()
future = self._metadata.request_update()
Expand All @@ -925,7 +926,7 @@ def _wait_on_metadata(self, topic, max_wait):
raise Errors.TopicAuthorizationFailedError(set([topic]))
else:
elapsed = time.time() - begin
log.debug("_wait_on_metadata woke after %s secs.", elapsed)
log.debug("%s: _wait_on_metadata woke after %s secs.", self, elapsed)

def _serialize(self, f, topic, data):
if not f:
Expand Down Expand Up @@ -972,3 +973,6 @@ def metrics(self, raw=False):
metrics[k.group][k.name] = {}
metrics[k.group][k.name] = v.value()
return metrics

def __str__(self):
return "<KafkaProducer client_id=%s transactional_id=%s>" % (self.config['client_id'], self.config['transactional_id'])
Loading
Loading