diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index d3d9699bd..1535dcedb 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -380,7 +380,6 @@ class KafkaProducer(object): } def __init__(self, **configs): - log.debug("Starting the Kafka producer") # trace self.config = copy.copy(self.DEFAULT_CONFIG) user_provided_configs = set(configs.keys()) for key in self.config: @@ -409,8 +408,10 @@ def __init__(self, **configs): self.config['api_version'] = None else: self.config['api_version'] = tuple(map(int, deprecated.split('.'))) - log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated', - str(self.config['api_version']), deprecated) + log.warning('%s: use api_version=%s [tuple] -- "%s" as str is deprecated', + self, str(self.config['api_version']), deprecated) + + log.debug("%s: Starting Kafka producer", self) # Configure metrics if self.config['metrics_enabled']: @@ -466,18 +467,18 @@ def __init__(self, **configs): metadata=self._metadata, ) if self._transaction_manager.is_transactional(): - log.info("Instantiated a transactional producer.") + log.info("%s: Instantiated a transactional producer.", self) else: - log.info("Instantiated an idempotent producer.") + log.info("%s: Instantiated an idempotent producer.", self) if 'retries' not in user_provided_configs: - log.info("Overriding the default 'retries' config to 3 since the idempotent producer is enabled.") + log.info("%s: Overriding the default 'retries' config to 3 since the idempotent producer is enabled.", self) self.config['retries'] = 3 elif self.config['retries'] == 0: raise Errors.KafkaConfigurationError("Must set 'retries' to non-zero when using the idempotent producer.") if 'max_in_flight_requests_per_connection' not in user_provided_configs: - log.info("Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.") + log.info("%s: Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.", self) self.config['max_in_flight_requests_per_connection'] = 1 elif self.config['max_in_flight_requests_per_connection'] != 1: raise Errors.KafkaConfigurationError("Must set 'max_in_flight_requests_per_connection' to 1 in order" @@ -485,7 +486,7 @@ def __init__(self, **configs): " Otherwise we cannot guarantee idempotence.") if 'acks' not in user_provided_configs: - log.info("Overriding the default 'acks' config to 'all' since idempotence is enabled") + log.info("%s: Overriding the default 'acks' config to 'all' since idempotence is enabled", self) self.config['acks'] = -1 elif self.config['acks'] != -1: raise Errors.KafkaConfigurationError("Must set 'acks' config to 'all' in order to use the idempotent" @@ -509,7 +510,7 @@ def __init__(self, **configs): self._cleanup = self._cleanup_factory() atexit.register(self._cleanup) - log.debug("Kafka producer started") + log.debug("%s: Kafka producer started", self) def bootstrap_connected(self): """Return True if the bootstrap is connected.""" @@ -564,7 +565,7 @@ def __getattr__(self, name): self._unregister_cleanup() if not hasattr(self, '_closed') or self._closed: - log.info('Kafka producer closed') + log.info('%s: Kafka producer closed', self) return if timeout is None: # threading.TIMEOUT_MAX is available in Python3.3+ @@ -574,16 +575,16 @@ def __getattr__(self, name): else: assert timeout >= 0 - log.info("Closing the Kafka producer with %s secs timeout.", timeout) + log.info("%s: Closing the Kafka producer with %s secs timeout.", self, timeout) self.flush(timeout) invoked_from_callback = bool(threading.current_thread() is self._sender) if timeout > 0: if invoked_from_callback: - log.warning("Overriding close timeout %s secs to 0 in order to" + log.warning("%s: Overriding close timeout %s secs to 0 in order to" " prevent useless blocking due to self-join. This" " means you have incorrectly invoked close with a" " non-zero timeout from the producer call-back.", - timeout) + self, timeout) else: # Try to close gracefully. if self._sender is not None: @@ -591,9 +592,9 @@ def __getattr__(self, name): self._sender.join(timeout) if self._sender is not None and self._sender.is_alive(): - log.info("Proceeding to force close the producer since pending" + log.info("%s: Proceeding to force close the producer since pending" " requests could not be completed within timeout %s.", - timeout) + self, timeout) self._sender.force_close() if self._metrics: @@ -607,7 +608,7 @@ def __getattr__(self, name): except AttributeError: pass self._closed = True - log.debug("The Kafka producer has closed.") + log.debug("%s: The Kafka producer has closed.", self) def partitions_for(self, topic): """Returns set of all known partitions for the topic.""" @@ -816,7 +817,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest self._ensure_valid_record_size(message_size) tp = TopicPartition(topic, partition) - log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp) + log.debug("%s: Sending (key=%r value=%r headers=%r) to %s", self, key, value, headers, tp) if self._transaction_manager and self._transaction_manager.is_transactional(): self._transaction_manager.maybe_add_partition_to_transaction(tp) @@ -825,8 +826,8 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest key_bytes, value_bytes, headers) future, batch_is_full, new_batch_created = result if batch_is_full or new_batch_created: - log.debug("Waking up the sender since %s is either full or" - " getting a new batch", tp) + log.debug("%s: Waking up the sender since %s is either full or" + " getting a new batch", self, tp) self._sender.wakeup() return future @@ -834,7 +835,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest # for API exceptions return them in the future, # for other exceptions raise directly except Errors.BrokerResponseError as e: - log.error("Exception occurred during message send: %s", e) + log.error("%s: Exception occurred during message send: %s", self, e) return FutureRecordMetadata( FutureProduceResult(TopicPartition(topic, partition)), -1, None, None, @@ -865,7 +866,7 @@ def flush(self, timeout=None): KafkaTimeoutError: failure to flush buffered records within the provided timeout """ - log.debug("Flushing accumulated records in producer.") # trace + log.debug("%s: Flushing accumulated records in producer.", self) self._accumulator.begin_flush() self._sender.wakeup() self._accumulator.await_flush_completion(timeout=timeout) @@ -911,7 +912,7 @@ def _wait_on_metadata(self, topic, max_wait): if not metadata_event: metadata_event = threading.Event() - log.debug("Requesting metadata update for topic %s", topic) + log.debug("%s: Requesting metadata update for topic %s", self, topic) metadata_event.clear() future = self._metadata.request_update() @@ -925,7 +926,7 @@ def _wait_on_metadata(self, topic, max_wait): raise Errors.TopicAuthorizationFailedError(set([topic])) else: elapsed = time.time() - begin - log.debug("_wait_on_metadata woke after %s secs.", elapsed) + log.debug("%s: _wait_on_metadata woke after %s secs.", self, elapsed) def _serialize(self, f, topic, data): if not f: @@ -972,3 +973,6 @@ def metrics(self, raw=False): metrics[k.group][k.name] = {} metrics[k.group][k.name] = v.value() return metrics + + def __str__(self): + return "" % (self.config['client_id'], self.config['transactional_id']) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 9c845cfca..3637be416 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -62,17 +62,17 @@ def __init__(self, client, metadata, accumulator, **configs): def run(self): """The main run loop for the sender thread.""" - log.debug("Starting Kafka producer I/O thread.") + log.debug("%s: Starting Kafka producer I/O thread.", self) # main loop, runs until close is called while self._running: try: self.run_once() except Exception: - log.exception("Uncaught error in kafka producer I/O thread") + log.exception("%s: Uncaught error in kafka producer I/O thread", self) - log.debug("Beginning shutdown of Kafka producer I/O thread, sending" - " remaining records.") + log.debug("%s: Beginning shutdown of Kafka producer I/O thread, sending" + " remaining records.", self) # okay we stopped accepting requests but there may still be # requests in the accumulator or waiting for acknowledgment, @@ -83,7 +83,7 @@ def run(self): try: self.run_once() except Exception: - log.exception("Uncaught error in kafka producer I/O thread") + log.exception("%s: Uncaught error in kafka producer I/O thread", self) if self._force_close: # We need to fail all the incomplete batches and wake up the @@ -93,9 +93,9 @@ def run(self): try: self._client.close() except Exception: - log.exception("Failed to close network client") + log.exception("%s: Failed to close network client", self) - log.debug("Shutdown of Kafka producer I/O thread has completed.") + log.debug("%s: Shutdown of Kafka producer I/O thread has completed.", self) def run_once(self): """Run a single iteration of sending.""" @@ -125,7 +125,7 @@ def run_once(self): except Errors.SaslAuthenticationFailedError as e: # This is already logged as error, but propagated here to perform any clean ups. - log.debug("Authentication exception while processing transactional request: %s", e) + log.debug("%s: Authentication exception while processing transactional request: %s", self, e) self._transaction_manager.authentication_failed(e) poll_timeout_ms = self._send_producer_data() @@ -139,7 +139,7 @@ def _send_producer_data(self): # if there are any partitions whose leaders are not known yet, force # metadata update if unknown_leaders_exist: - log.debug('Unknown leaders exist, requesting metadata update') + log.debug('%s: Unknown leaders exist, requesting metadata update', self) self._metadata.request_update() # remove any nodes we aren't ready to send to @@ -147,7 +147,7 @@ def _send_producer_data(self): for node in list(ready_nodes): if not self._client.is_ready(node): node_delay_ms = self._client.connection_delay(node) - log.debug('Node %s not ready; delaying produce of accumulated batch (%f ms)', node, node_delay_ms) + log.debug('%s: Node %s not ready; delaying produce of accumulated batch (%f ms)', self, node, node_delay_ms) self._client.maybe_connect(node, wakeup=False) ready_nodes.remove(node) not_ready_timeout_ms = min(not_ready_timeout_ms, node_delay_ms) @@ -166,7 +166,7 @@ def _send_producer_data(self): self.config['request_timeout_ms'], self._metadata) if expired_batches: - log.debug("Expired %s batches in accumulator", len(expired_batches)) + log.debug("%s: Expired %s batches in accumulator", self, len(expired_batches)) # Reset the producer_id if an expired batch has previously been sent to the broker. # See the documentation of `TransactionState.reset_producer_id` to understand why @@ -200,8 +200,8 @@ def _send_producer_data(self): # looping. poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout_ms) if ready_nodes: - log.debug("Nodes with data ready to send: %s", ready_nodes) # trace - log.debug("Created %d produce requests: %s", len(requests), requests) # trace + log.debug("%s: Nodes with data ready to send: %s", self, ready_nodes) # trace + log.debug("%s: Created %d produce requests: %s", self, len(requests), requests) # trace # if some partitions are already ready to be sent, the select time # would be 0; otherwise if some partition already has some data # accumulated but not ready yet, the select time will be the time @@ -212,7 +212,7 @@ def _send_producer_data(self): for node_id, request in six.iteritems(requests): batches = batches_by_node[node_id] - log.debug('Sending Produce Request: %r', request) + log.debug('%s: Sending Produce Request: %r', self, request) (self._client.send(node_id, request, wakeup=False) .add_callback( self._handle_produce_response, node_id, time.time(), batches) @@ -235,7 +235,7 @@ def _maybe_send_transactional_request(self): if next_request_handler is None: return False - log.debug("transactional_id: %s -- Sending transactional request %s", self._transaction_manager.transactional_id, next_request_handler.request) + log.debug("%s: Sending transactional request %s", self, next_request_handler.request) while not self._force_close: target_node = None try: @@ -262,7 +262,7 @@ def _maybe_send_transactional_request(self): return True except Exception as e: - log.warn("Got an exception when trying to find a node to send a transactional request to. Going to back off and retry", e) + log.warn("%s: Got an exception when trying to find a node to send a transactional request to. Going to back off and retry", self, e) if next_request_handler.needs_coordinator(): self._transaction_manager.lookup_coordinator_for_request(next_request_handler) break @@ -277,7 +277,7 @@ def _maybe_send_transactional_request(self): def _maybe_abort_batches(self, exc): if self._accumulator.has_incomplete: - log.error("Aborting producer batches due to fatal error: %s", exc) + log.error("%s: Aborting producer batches due to fatal error: %s", self, exc) self._accumulator.abort_batches(exc) def initiate_close(self): @@ -306,8 +306,8 @@ def _maybe_wait_for_producer_id(self): try: node_id = self._client.least_loaded_node() if node_id is None or not self._client.await_ready(node_id): - log.debug("Could not find an available broker to send InitProducerIdRequest to." + - " Will back off and try again.") + log.debug("%s, Could not find an available broker to send InitProducerIdRequest to." + + " Will back off and try again.", self) time.sleep(self._client.least_loaded_node_refresh_ms() / 1000) continue version = self._client.api_version(InitProducerIdRequest, max_version=1) @@ -321,28 +321,28 @@ def _maybe_wait_for_producer_id(self): self._transaction_manager.set_producer_id_and_epoch(ProducerIdAndEpoch(response.producer_id, response.producer_epoch)) break elif getattr(error_type, 'retriable', False): - log.debug("Retriable error from InitProducerId response: %s", error_type.__name__) + log.debug("%s: Retriable error from InitProducerId response: %s", self, error_type.__name__) if getattr(error_type, 'invalid_metadata', False): self._metadata.request_update() else: self._transaction_manager.transition_to_fatal_error(error_type()) break except Errors.KafkaConnectionError: - log.debug("Broker %s disconnected while awaiting InitProducerId response", node_id) + log.debug("%s: Broker %s disconnected while awaiting InitProducerId response", self, node_id) except Errors.RequestTimedOutError: - log.debug("InitProducerId request to node %s timed out", node_id) - log.debug("Retry InitProducerIdRequest in %sms.", self.config['retry_backoff_ms']) + log.debug("%s: InitProducerId request to node %s timed out", self, node_id) + log.debug("%s: Retry InitProducerIdRequest in %sms.", self, self.config['retry_backoff_ms']) time.sleep(self.config['retry_backoff_ms'] / 1000) def _failed_produce(self, batches, node_id, error): - log.error("Error sending produce request to node %d: %s", node_id, error) # trace + log.error("%s: Error sending produce request to node %d: %s", self, node_id, error) # trace for batch in batches: self._complete_batch(batch, error, -1) def _handle_produce_response(self, node_id, send_time, batches, response): """Handle a produce response.""" # if we have a response, parse it - log.debug('Parsing produce response: %r', response) + log.debug('%s: Parsing produce response: %r', self, response) if response: batches_by_partition = dict([(batch.topic_partition, batch) for batch in batches]) @@ -376,9 +376,9 @@ def _fail_batch(self, batch, exception, base_offset=None, timestamp_ms=None, log if isinstance(exception, Errors.OutOfOrderSequenceNumberError) and \ not self._transaction_manager.is_transactional() and \ self._transaction_manager.has_producer_id(batch.producer_id): - log.error("The broker received an out of order sequence number for topic-partition %s" + log.error("%s: The broker received an out of order sequence number for topic-partition %s" " at offset %s. This indicates data loss on the broker, and should be investigated.", - batch.topic_partition, base_offset) + self, batch.topic_partition, base_offset) # Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees # about the previously committed message. Note that this will discard the producer id and sequence @@ -414,24 +414,25 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star if error is not None: if self._can_retry(batch, error): # retry - log.warning("Got error produce response on topic-partition %s," + log.warning("%s: Got error produce response on topic-partition %s," " retrying (%d attempts left). Error: %s", - batch.topic_partition, + self, batch.topic_partition, self.config['retries'] - batch.attempts - 1, error) # If idempotence is enabled only retry the request if the batch matches our current producer id and epoch if not self._transaction_manager or self._transaction_manager.producer_id_and_epoch.match(batch): - log.debug("Retrying batch to topic-partition %s. Sequence number: %s", - batch.topic_partition, + log.debug("%s: Retrying batch to topic-partition %s. Sequence number: %s", + self, batch.topic_partition, self._transaction_manager.sequence_number(batch.topic_partition) if self._transaction_manager else None) self._accumulator.reenqueue(batch) if self._sensors: self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) else: - log.warning("Attempted to retry sending a batch but the producer id/epoch changed from %s/%s to %s/%s. This batch will be dropped" % ( - batch.producer_id, batch.producer_epoch, - self._transaction_manager.producer_id_and_epoch.producer_id, self._transaction_manager.producer_id_and_epoch.epoch)) + log.warning("%s: Attempted to retry sending a batch but the producer id/epoch changed from %s/%s to %s/%s. This batch will be dropped", + self, batch.producer_id, batch.producer_epoch, + self._transaction_manager.producer_id_and_epoch.producer_id, + self._transaction_manager.producer_id_and_epoch.epoch) self._fail_batch(batch, error, base_offset=base_offset, timestamp_ms=timestamp_ms, log_start_offset=log_start_offset) else: if error is Errors.TopicAuthorizationFailedError: @@ -441,9 +442,9 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star self._fail_batch(batch, error, base_offset=base_offset, timestamp_ms=timestamp_ms, log_start_offset=log_start_offset) if error is Errors.UnknownTopicOrPartitionError: - log.warning("Received unknown topic or partition error in produce request on partition %s." + log.warning("%s: Received unknown topic or partition error in produce request on partition %s." " The topic/partition may not exist or the user may not have Describe access to it", - batch.topic_partition) + self, batch.topic_partition) if getattr(error, 'invalid_metadata', False): self._metadata.request_update() @@ -454,7 +455,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star if self._transaction_manager and self._transaction_manager.producer_id_and_epoch.match(batch): self._transaction_manager.increment_sequence_number(batch.topic_partition, batch.record_count) - log.debug("Incremented sequence number for topic-partition %s to %s", batch.topic_partition, + log.debug("%s: Incremented sequence number for topic-partition %s to %s", self, batch.topic_partition, self._transaction_manager.sequence_number(batch.topic_partition)) # Unmute the completed partition. @@ -516,7 +517,7 @@ def _produce_request(self, node_id, acks, timeout, batches): ) else: if transactional_id is not None: - log.warning('Broker does not support ProduceRequest v3+, required for transactional_id') + log.warning('%s: Broker does not support ProduceRequest v3+, required for transactional_id', self) return ProduceRequest[version]( required_acks=acks, timeout=timeout, @@ -530,6 +531,9 @@ def wakeup(self): def bootstrap_connected(self): return self._client.bootstrap_connected() + def __str__(self): + return "" % (self.config['client_id'], self.config['transactional_id']) + class SenderMetrics(object):