Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ class Fetcher(six.Iterator):
'max_partition_fetch_bytes': 1048576,
'max_poll_records': sys.maxsize,
'check_crcs': True,
'metrics': None,
'metric_group_prefix': 'consumer',
'retry_backoff_ms': 100,
'enable_incremental_fetch_sessions': True,
}

def __init__(self, client, subscriptions, metrics, **configs):
def __init__(self, client, subscriptions, **configs):
"""Initialize a Kafka Message Fetcher.

Keyword Arguments:
Expand Down Expand Up @@ -111,7 +112,10 @@ def __init__(self, client, subscriptions, metrics, **configs):
self._next_partition_records = None # Holds a single PartitionRecords until fully consumed
self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
if self.config['metrics']:
self._sensors = FetchManagerMetrics(self.config['metrics'], self.config['metric_group_prefix'])
else:
self._sensors = None
self._isolation_level = READ_UNCOMMITTED
self._session_handlers = {}
self._nodes_with_pending_fetch_requests = set()
Expand Down Expand Up @@ -391,7 +395,7 @@ def _append(self, drained, part, max_records, update_offsets):
# when each message is yielded). There may be edge cases where we re-fetch records
# that we'll end up skipping, but for now we'll live with that.
highwater = self._subscriptions.assignment[tp].highwater
if highwater is not None:
if highwater is not None and self._sensors:
self._sensors.records_fetch_lag.record(highwater - part.next_fetch_offset)
if update_offsets or not part_records:
# TODO: save leader_epoch
Expand Down Expand Up @@ -705,7 +709,10 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
partitions = set([TopicPartition(topic, partition_data[0])
for topic, partitions in response.topics
for partition_data in partitions])
metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions)
if self._sensors:
metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions)
else:
metric_aggregator = None

for topic, partitions in response.topics:
for partition_data in partitions:
Expand All @@ -719,7 +726,8 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
)
self._completed_fetches.append(completed_fetch)

self._sensors.fetch_latency.record((time.time() - send_time) * 1000)
if self._sensors:
self._sensors.fetch_latency.record((time.time() - send_time) * 1000)
self._nodes_with_pending_fetch_requests.remove(node_id)

def _handle_fetch_error(self, node_id, exception):
Expand Down Expand Up @@ -816,7 +824,7 @@ def _parse_fetched_data(self, completed_fetch):
raise error_type('Unexpected error while fetching data')

finally:
if parsed_records is None:
if parsed_records is None and completed_fetch.metric_aggregator:
completed_fetch.metric_aggregator.record(tp, 0, 0)

if error_type is not Errors.NoError:
Expand Down Expand Up @@ -873,7 +881,8 @@ def __bool__(self):
def drain(self):
if self.record_iterator is not None:
self.record_iterator = None
self.metric_aggregator.record(self.topic_partition, self.bytes_read, self.records_read)
if self.metric_aggregator:
self.metric_aggregator.record(self.topic_partition, self.bytes_read, self.records_read)
self.on_drain(self)

def take(self, n=None):
Expand Down
27 changes: 17 additions & 10 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class KafkaConsumer(six.Iterator):
metric_reporters (list): A list of classes to use as metrics reporters.
Implementing the AbstractMetricsReporter interface allows plugging
in classes that will be notified of new metric creation. Default: []
metrics_enabled (bool): Whether to track metrics on this instance. Default True.
metrics_num_samples (int): The number of samples maintained to compute
metrics. Default: 2
metrics_sample_window_ms (int): The maximum age in milliseconds of
Expand Down Expand Up @@ -315,6 +316,7 @@ class KafkaConsumer(six.Iterator):
'api_version_auto_timeout_ms': 2000,
'connections_max_idle_ms': 9 * 60 * 1000,
'metric_reporters': [],
'metrics_enabled': True,
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'metric_group_prefix': 'consumer',
Expand Down Expand Up @@ -358,13 +360,15 @@ def __init__(self, *topics, **configs):
"fetch_max_wait_ms ({})."
.format(connections_max_idle_ms, request_timeout_ms, fetch_max_wait_ms))

metrics_tags = {'client-id': self.config['client_id']}
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
time_window_ms=self.config['metrics_sample_window_ms'],
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
# TODO _metrics likely needs to be passed to KafkaClient, etc.
if self.config['metrics_enabled']:
metrics_tags = {'client-id': self.config['client_id']}
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
time_window_ms=self.config['metrics_sample_window_ms'],
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
else:
self._metrics = None

# api_version was previously a str. Accept old format for now
if isinstance(self.config['api_version'], str):
Expand Down Expand Up @@ -402,9 +406,9 @@ def __init__(self, *topics, **configs):

self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
self._client, self._subscription, self._metrics, **self.config)
self._client, self._subscription, metrics=self._metrics, **self.config)
self._coordinator = ConsumerCoordinator(
self._client, self._subscription, self._metrics,
self._client, self._subscription, metrics=self._metrics,
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
Expand Down Expand Up @@ -485,7 +489,8 @@ def close(self, autocommit=True, timeout_ms=None):
log.debug("Closing the KafkaConsumer.")
self._closed = True
self._coordinator.close(autocommit=autocommit, timeout_ms=timeout_ms)
self._metrics.close()
if self._metrics:
self._metrics.close()
self._client.close()
try:
self.config['key_deserializer'].close()
Expand Down Expand Up @@ -989,6 +994,8 @@ def metrics(self, raw=False):
This is an unstable interface. It may change in future
releases without warning.
"""
if not self._metrics:
return
if raw:
return self._metrics.metrics.copy()

Expand Down
19 changes: 13 additions & 6 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ class BaseCoordinator(object):
'max_poll_interval_ms': 300000,
'retry_backoff_ms': 100,
'api_version': (0, 10, 1),
'metrics': None,
'metric_group_prefix': '',
}

def __init__(self, client, metrics, **configs):
def __init__(self, client, **configs):
"""
Keyword Arguments:
group_id (str): name of the consumer group to join for dynamic
Expand Down Expand Up @@ -130,8 +131,11 @@ def __init__(self, client, metrics, **configs):
self.coordinator_id = None
self._find_coordinator_future = None
self._generation = Generation.NO_GENERATION
self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
self.config['metric_group_prefix'])
if self.config['metrics']:
self._sensors = GroupCoordinatorMetrics(self.heartbeat, self.config['metrics'],
self.config['metric_group_prefix'])
else:
self._sensors = None

@abc.abstractmethod
def protocol_type(self):
Expand Down Expand Up @@ -531,7 +535,8 @@ def _handle_join_group_response(self, future, send_time, response):
if error_type is Errors.NoError:
log.debug("Received successful JoinGroup response for group %s: %s",
self.group_id, response)
self.sensors.join_latency.record((time.time() - send_time) * 1000)
if self._sensors:
self._sensors.join_latency.record((time.time() - send_time) * 1000)
with self._lock:
if self.state is not MemberState.REBALANCING:
# if the consumer was woken up before a rebalance completes,
Expand Down Expand Up @@ -650,7 +655,8 @@ def _send_sync_group_request(self, request):
def _handle_sync_group_response(self, future, send_time, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
self.sensors.sync_latency.record((time.time() - send_time) * 1000)
if self._sensors:
self._sensors.sync_latency.record((time.time() - send_time) * 1000)
future.success(response.member_assignment)
return

Expand Down Expand Up @@ -856,7 +862,8 @@ def _send_heartbeat_request(self):
return future

def _handle_heartbeat_response(self, future, send_time, response):
self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
if self._sensors:
self._sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.debug("Received successful heartbeat response for group %s",
Expand Down
15 changes: 10 additions & 5 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ class ConsumerCoordinator(BaseCoordinator):
'retry_backoff_ms': 100,
'api_version': (0, 10, 1),
'exclude_internal_topics': True,
'metrics': None,
'metric_group_prefix': 'consumer'
}

def __init__(self, client, subscription, metrics, **configs):
def __init__(self, client, subscription, **configs):
"""Initialize the coordination manager.

Keyword Arguments:
Expand Down Expand Up @@ -78,7 +79,7 @@ def __init__(self, client, subscription, metrics, **configs):
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
"""
super(ConsumerCoordinator, self).__init__(client, metrics, **configs)
super(ConsumerCoordinator, self).__init__(client, **configs)

self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
Expand Down Expand Up @@ -120,8 +121,11 @@ def __init__(self, client, subscription, metrics, **configs):
else:
self.next_auto_commit_deadline = time.time() + self.auto_commit_interval

self.consumer_sensors = ConsumerCoordinatorMetrics(
metrics, self.config['metric_group_prefix'], self._subscription)
if self.config['metrics']:
self._consumer_sensors = ConsumerCoordinatorMetrics(
self.config['metrics'], self.config['metric_group_prefix'], self._subscription)
else:
self._consumer_sensors = None

self._cluster.request_update()
self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
Expand Down Expand Up @@ -686,7 +690,8 @@ def _send_offset_commit_request(self, offsets):

def _handle_offset_commit_response(self, offsets, future, send_time, response):
# TODO look at adding request_latency_ms to response (like java kafka)
self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
if self._consumer_sensors:
self._consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
unauthorized_topics = set()

for topic, partitions in response.topics:
Expand Down
27 changes: 18 additions & 9 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ class KafkaProducer(object):
metric_reporters (list): A list of classes to use as metrics reporters.
Implementing the AbstractMetricsReporter interface allows plugging
in classes that will be notified of new metric creation. Default: []
metrics_enabled (bool): Whether to track metrics on this instance. Default True.
metrics_num_samples (int): The number of samples maintained to compute
metrics. Default: 2
metrics_sample_window_ms (int): The maximum age in milliseconds of
Expand Down Expand Up @@ -336,6 +337,7 @@ class KafkaProducer(object):
'api_version': None,
'api_version_auto_timeout_ms': 2000,
'metric_reporters': [],
'metrics_enabled': True,
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector,
Expand Down Expand Up @@ -393,12 +395,15 @@ def __init__(self, **configs):
str(self.config['api_version']), deprecated)

# Configure metrics
metrics_tags = {'client-id': self.config['client_id']}
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
time_window_ms=self.config['metrics_sample_window_ms'],
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
if self.config['metrics_enabled']:
metrics_tags = {'client-id': self.config['client_id']}
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
time_window_ms=self.config['metrics_sample_window_ms'],
tags=metrics_tags)
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
else:
self._metrics = None

client = self.config['kafka_client'](
metrics=self._metrics, metric_group_prefix='producer',
Expand All @@ -424,11 +429,12 @@ def __init__(self, **configs):
self.config['compression_attrs'] = compression_attrs

message_version = self._max_usable_produce_magic()
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
self._sender = Sender(client, self._metadata,
self._accumulator, self._metrics,
self._accumulator,
metrics=self._metrics,
guarantee_message_order=guarantee_message_order,
**self.config)
self._sender.daemon = True
Expand Down Expand Up @@ -524,7 +530,8 @@ def __getattr__(self, name):
timeout)
self._sender.force_close()

self._metrics.close()
if self._metrics:
self._metrics.close()
try:
self.config['key_serializer'].close()
except AttributeError:
Expand Down Expand Up @@ -773,6 +780,8 @@ def metrics(self, raw=False):
This is an unstable interface. It may change in future
releases without warning.
"""
if not self._metrics:
return
if raw:
return self._metrics.metrics.copy()

Expand Down
2 changes: 0 additions & 2 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ class RecordAccumulator(object):
'linger_ms': 0,
'retry_backoff_ms': 100,
'message_version': 0,
'metrics': None,
'metric_group_prefix': 'producer-metrics',
}

def __init__(self, **configs):
Expand Down
21 changes: 14 additions & 7 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ class Sender(threading.Thread):
'acks': 1,
'retries': 0,
'request_timeout_ms': 30000,
'metrics': None,
'guarantee_message_order': False,
'client_id': 'kafka-python-' + __version__,
}

def __init__(self, client, metadata, accumulator, metrics, **configs):
def __init__(self, client, metadata, accumulator, **configs):
super(Sender, self).__init__()
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
Expand All @@ -47,7 +48,10 @@ def __init__(self, client, metadata, accumulator, metrics, **configs):
self._running = True
self._force_close = False
self._topics_to_add = set()
self._sensors = SenderMetrics(metrics, self._client, self._metadata)
if self.config['metrics']:
self._sensors = SenderMetrics(self.config['metrics'], self._client, self._metadata)
else:
self._sensors = None

def run(self):
"""The main run loop for the sender thread."""
Expand Down Expand Up @@ -123,10 +127,12 @@ def run_once(self):

expired_batches = self._accumulator.abort_expired_batches(
self.config['request_timeout_ms'], self._metadata)
for expired_batch in expired_batches:
self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count)

self._sensors.update_produce_request_metrics(batches_by_node)
if self._sensors:
for expired_batch in expired_batches:
self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count)
self._sensors.update_produce_request_metrics(batches_by_node)

requests = self._create_produce_requests(batches_by_node)
# If we have any nodes that are ready to send + have sendable data,
# poll with 0 timeout so this can immediately loop and try sending more
Expand Down Expand Up @@ -237,15 +243,16 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star
self.config['retries'] - batch.attempts - 1,
error)
self._accumulator.reenqueue(batch)
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
if self._sensors:
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
else:
if error is Errors.TopicAuthorizationFailedError:
error = error(batch.topic_partition.topic)

# tell the user the result of their request
batch.done(base_offset, timestamp_ms, error, log_start_offset)
self._accumulator.deallocate(batch)
if error is not None:
if error is not None and self._sensors:
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)

if getattr(error, 'invalid_metadata', False):
Expand Down
Loading
Loading