From 0da152a71fe85ef6790e8c80e4297a8d1ee8f57f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 3 Mar 2025 15:24:05 -0800 Subject: [PATCH 01/15] Add leader_epoch to OffsetAndMetadata struct --- kafka/admin/client.py | 15 ++++++++++----- kafka/coordinator/consumer.py | 5 ++--- kafka/structs.py | 8 ++++---- test/test_coordinator.py | 10 +++++----- test/test_fetcher.py | 2 +- 5 files changed, 22 insertions(+), 18 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index a46cf9c58..29ee6cd9a 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1353,7 +1353,7 @@ def _list_consumer_group_offsets_send_request(self, group_id, Returns: A message future """ - version = self._client.api_version(OffsetFetchRequest, max_version=3) + version = self._client.api_version(OffsetFetchRequest, max_version=5) if version <= 3: if partitions is None: if version <= 1: @@ -1386,7 +1386,7 @@ def _list_consumer_group_offsets_process_response(self, response): A dictionary composed of TopicPartition keys and OffsetAndMetadata values. """ - if response.API_VERSION <= 3: + if response.API_VERSION <= 5: # OffsetFetchResponse_v1 lacks a top-level error_code if response.API_VERSION > 1: @@ -1401,13 +1401,18 @@ def _list_consumer_group_offsets_process_response(self, response): # OffsetAndMetadata values--this is what the Java AdminClient returns offsets = {} for topic, partitions in response.topics: - for partition, offset, metadata, error_code in partitions: + for partition_data in partitions: + if response.API_VERSION <= 4: + partition, offset, metadata, error_code = partition_data + leader_epoch = -1 + else: + partition, offset, leader_epoch, metadata, error_code = partition_data error_type = Errors.for_code(error_code) if error_type is not Errors.NoError: raise error_type( "Unable to fetch consumer group offsets for topic {}, partition {}" .format(topic, partition)) - offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata) + offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata, leader_epoch) else: raise NotImplementedError( "Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient." @@ -1439,7 +1444,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, Returns: dictionary: A dictionary with TopicPartition keys and - OffsetAndMetada values. Partitions that are not specified and for + OffsetAndMetadata values. Partitions that are not specified and for which the group_id does not have a recorded offset are omitted. An offset value of `-1` indicates the group_id has no offset for that TopicPartition. A `-1` can only happen for partitions that are diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 3734e8817..cf508c606 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -649,7 +649,7 @@ def _send_offset_commit_request(self, offsets): topic, [( partition, offset.offset, - -1, # leader_epoch + offset.leader_epoch, offset.metadata ) for partition, offset in six.iteritems(partitions)] ) for topic, partitions in six.iteritems(offset_data)] @@ -809,7 +809,6 @@ def _handle_offset_fetch_response(self, future, response): else: metadata, error_code = partition_data[2:] leader_epoch = -1 - # TODO: save leader_epoch! tp = TopicPartition(topic, partition) error_type = Errors.for_code(error_code) if error_type is not Errors.NoError: @@ -836,7 +835,7 @@ def _handle_offset_fetch_response(self, future, response): elif offset >= 0: # record the position with the offset # (-1 indicates no committed offset to fetch) - offsets[tp] = OffsetAndMetadata(offset, metadata) + offsets[tp] = OffsetAndMetadata(offset, metadata, leader_epoch) else: log.debug("Group %s has no committed offset for partition" " %s", self.group_id, tp) diff --git a/kafka/structs.py b/kafka/structs.py index dc4f07bee..9e9ad9e3c 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -48,17 +48,17 @@ """The Kafka offset commit API The Kafka offset commit API allows users to provide additional metadata -(in the form of a string) when an offset is committed. This can be useful +(in the form of raw bytes) when an offset is committed. This can be useful (for example) to store information about which node made the commit, what time the commit was made, etc. Keyword Arguments: offset (int): The offset to be committed - metadata (str): Non-null metadata + metadata (bytes): Non-null metadata + leader_epoch (int): The last known epoch from the leader / broker """ OffsetAndMetadata = namedtuple("OffsetAndMetadata", - # TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata) - ["offset", "metadata"]) + ["offset", "metadata", "leader_epoch"]) """An offset and timestamp tuple diff --git a/test/test_coordinator.py b/test/test_coordinator.py index c0e7c6d60..e8a238f49 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -230,13 +230,13 @@ def test_need_rejoin(coordinator): def test_refresh_committed_offsets_if_needed(mocker, coordinator): mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets', return_value = { - TopicPartition('foobar', 0): OffsetAndMetadata(123, b''), - TopicPartition('foobar', 1): OffsetAndMetadata(234, b'')}) + TopicPartition('foobar', 0): OffsetAndMetadata(123, b'', -1), + TopicPartition('foobar', 1): OffsetAndMetadata(234, b'', -1)}) coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)]) assert coordinator._subscription.needs_fetch_committed_offsets is True coordinator.refresh_committed_offsets_if_needed() assignment = coordinator._subscription.assignment - assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'') + assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'', -1) assert TopicPartition('foobar', 1) not in assignment assert coordinator._subscription.needs_fetch_committed_offsets is False @@ -303,8 +303,8 @@ def test_close(mocker, coordinator): @pytest.fixture def offsets(): return { - TopicPartition('foobar', 0): OffsetAndMetadata(123, b''), - TopicPartition('foobar', 1): OffsetAndMetadata(234, b''), + TopicPartition('foobar', 0): OffsetAndMetadata(123, b'', -1), + TopicPartition('foobar', 1): OffsetAndMetadata(234, b'', -1), } diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 256c24fda..7c010491b 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -138,7 +138,7 @@ def test_update_fetch_positions(fetcher, topic, mocker): fetcher._reset_offset.reset_mock() fetcher._subscriptions.need_offset_reset(partition) fetcher._subscriptions.assignment[partition].awaiting_reset = False - fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'') + fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'', -1) mocker.patch.object(fetcher._subscriptions, 'seek') fetcher.update_fetch_positions([partition]) assert fetcher._reset_offset.call_count == 0 From 2edb8fdedea29db75149092c493eb9c260c5b06e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 3 Mar 2025 18:48:17 -0800 Subject: [PATCH 02/15] TopicPartitionState --- kafka/consumer/subscription_state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index a329ad3e9..7a5486821 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -319,7 +319,7 @@ def all_consumed_offsets(self): all_consumed = {} for partition, state in six.iteritems(self.assignment): if state.has_valid_position: - all_consumed[partition] = OffsetAndMetadata(state.position, '') + all_consumed[partition] = OffsetAndMetadata(state.position, b'', -1) return all_consumed def need_offset_reset(self, partition, offset_reset_strategy=None): From ac8e42bdf08e0e571edff8febd12c9277fecdab2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 3 Mar 2025 22:32:49 -0800 Subject: [PATCH 03/15] OffsetAndTimestamp; FetchRequest v10 --- kafka/consumer/fetcher.py | 105 +++++++++++++++------------ kafka/consumer/group.py | 10 +-- kafka/consumer/subscription_state.py | 7 +- kafka/record/default_records.py | 4 + kafka/structs.py | 3 +- test/test_consumer_integration.py | 4 +- test/test_fetcher.py | 40 +++++++--- 7 files changed, 108 insertions(+), 65 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 98f5dbcfa..1a03b216e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -18,7 +18,7 @@ ) from kafka.record import MemoryRecords from kafka.serializer import Deserializer -from kafka.structs import TopicPartition, OffsetAndTimestamp +from kafka.structs import TopicPartition, OffsetAndMetadata, OffsetAndTimestamp log = logging.getLogger(__name__) @@ -28,7 +28,7 @@ READ_COMMITTED = 1 ConsumerRecord = collections.namedtuple("ConsumerRecord", - ["topic", "partition", "offset", "timestamp", "timestamp_type", + ["topic", "partition", "leader_epoch", "offset", "timestamp", "timestamp_type", "key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"]) @@ -198,9 +198,6 @@ def get_offsets_by_times(self, timestamps, timeout_ms): for tp in timestamps: if tp not in offsets: offsets[tp] = None - else: - offset, timestamp = offsets[tp] - offsets[tp] = OffsetAndTimestamp(offset, timestamp) return offsets def beginning_offsets(self, partitions, timeout_ms): @@ -215,7 +212,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): timestamps = dict([(tp, timestamp) for tp in partitions]) offsets = self._retrieve_offsets(timestamps, timeout_ms) for tp in timestamps: - offsets[tp] = offsets[tp][0] + offsets[tp] = offsets[tp].offset return offsets def _reset_offset(self, partition): @@ -240,7 +237,7 @@ def _reset_offset(self, partition): offsets = self._retrieve_offsets({partition: timestamp}) if partition in offsets: - offset = offsets[partition][0] + offset = offsets[partition].offset # we might lose the assignment while fetching the offset, # so check it is still active @@ -261,8 +258,8 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): available. Otherwise timestamp is treated as epoch milliseconds. Returns: - {TopicPartition: (int, int)}: Mapping of partition to - retrieved offset and timestamp. If offset does not exist for + {TopicPartition: OffsetAndTimestamp}: Mapping of partition to + retrieved offset, timestamp, and leader_epoch. If offset does not exist for the provided timestamp, that partition will be missing from this mapping. """ @@ -373,20 +370,21 @@ def _append(self, drained, part, max_records, update_offsets): log.debug("Not returning fetched records for assigned partition" " %s since it is no longer fetchable", tp) - elif fetch_offset == position: + elif fetch_offset == position.offset: # we are ensured to have at least one record since we already checked for emptiness part_records = part.take(max_records) next_offset = part_records[-1].offset + 1 + leader_epoch = part_records[-1].leader_epoch log.log(0, "Returning fetched records at offset %d for assigned" - " partition %s and update position to %s", position, - tp, next_offset) + " partition %s and update position to %s (leader epoch %s)", position.offset, + tp, next_offset, leader_epoch) for record in part_records: drained[tp].append(record) if update_offsets: - self._subscriptions.assignment[tp].position = next_offset + self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, b'', leader_epoch) return len(part_records) else: @@ -394,7 +392,7 @@ def _append(self, drained, part, max_records, update_offsets): # position, ignore them they must be from an obsolete request log.debug("Ignoring fetched records for %s at offset %s since" " the current position is %d", tp, part.fetch_offset, - position) + position.offset) part.discard() return 0 @@ -444,13 +442,13 @@ def _message_generator(self): break # Compressed messagesets may include earlier messages - elif msg.offset < self._subscriptions.assignment[tp].position: + elif msg.offset < self._subscriptions.assignment[tp].position.offset: log.debug("Skipping message offset: %s (expecting %s)", msg.offset, - self._subscriptions.assignment[tp].position) + self._subscriptions.assignment[tp].position.offset) continue - self._subscriptions.assignment[tp].position = msg.offset + 1 + self._subscriptions.assignment[tp].position = OffsetAndMetadata(msg.offset + 1, b'', -1) yield msg self._next_partition_records = None @@ -463,8 +461,9 @@ def _unpack_records(self, tp, records): # Try DefaultsRecordBatch / message log format v2 # base_offset, last_offset_delta, and control batches try: - self._subscriptions.assignment[tp].last_offset_from_record_batch = batch.base_offset + \ - batch.last_offset_delta + batch_offset = batch.base_offset + batch.last_offset_delta + leader_epoch = batch.leader_epoch + self._subscriptions.assignment[tp].last_offset_from_record_batch = batch_offset # Control batches have a single record indicating whether a transaction # was aborted or committed. # When isolation_level is READ_COMMITTED (currently unsupported) @@ -475,6 +474,7 @@ def _unpack_records(self, tp, records): batch = records.next_batch() continue except AttributeError: + leader_epoch = -1 pass for record in batch: @@ -491,7 +491,7 @@ def _unpack_records(self, tp, records): len(h_key.encode("utf-8")) + (len(h_val) if h_val is not None else 0) for h_key, h_val in headers) if headers else -1 yield ConsumerRecord( - tp.topic, tp.partition, record.offset, record.timestamp, + tp.topic, tp.partition, leader_epoch, record.offset, record.timestamp, record.timestamp_type, key, value, headers, record.checksum, key_size, value_size, header_size) @@ -577,7 +577,9 @@ def _send_list_offsets_request(self, node_id, timestamps): version = self._client.api_version(ListOffsetsRequest, max_version=3) by_topic = collections.defaultdict(list) for tp, timestamp in six.iteritems(timestamps): - if version >= 1: + if version >= 4: + data = (tp.partition, leader_epoch, timestamp) + elif version >= 1: data = (tp.partition, timestamp) else: data = (tp.partition, timestamp, 1) @@ -628,17 +630,18 @@ def _handle_list_offsets_response(self, future, response): offset = UNKNOWN_OFFSET else: offset = offsets[0] - log.debug("Handling v0 ListOffsetsResponse response for %s. " - "Fetched offset %s", partition, offset) - if offset != UNKNOWN_OFFSET: - timestamp_offset_map[partition] = (offset, None) - else: + timestamp = None + leader_epoch = -1 + elif response.API_VERSION <= 3: timestamp, offset = partition_info[2:] - log.debug("Handling ListOffsetsResponse response for %s. " - "Fetched offset %s, timestamp %s", - partition, offset, timestamp) - if offset != UNKNOWN_OFFSET: - timestamp_offset_map[partition] = (offset, timestamp) + leader_epoch = -1 + else: + timestamp, offset, leader_epoch = partition_info[2:] + log.debug("Handling ListOffsetsResponse response for %s. " + "Fetched offset %s, timestamp %s, leader_epoch %s", + partition, offset, timestamp, leader_epoch) + if offset != UNKNOWN_OFFSET: + timestamp_offset_map[partition] = OffsetAndTimestamp(offset, timestamp, leader_epoch) elif error_type is Errors.UnsupportedForMessageFormatError: # The message format on the broker side is before 0.10.0, # we simply put None in the response. @@ -686,7 +689,7 @@ def _create_fetch_requests(self): """ # create the fetch info as a dict of lists of partition info tuples # which can be passed to FetchRequest() via .items() - version = self._client.api_version(FetchRequest, max_version=8) + version = self._client.api_version(FetchRequest, max_version=10) fetchable = collections.defaultdict(dict) for partition in self._fetchable_partitions(): @@ -695,12 +698,12 @@ def _create_fetch_requests(self): # advance position for any deleted compacted messages if required if self._subscriptions.assignment[partition].last_offset_from_record_batch: next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_record_batch + 1 - if next_offset_from_batch_header > self._subscriptions.assignment[partition].position: + if next_offset_from_batch_header > self._subscriptions.assignment[partition].position.offset: log.debug( "Advance position for partition %s from %s to %s (last record batch location plus one)" " to correct for deleted compacted messages and/or transactional control records", - partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header) - self._subscriptions.assignment[partition].position = next_offset_from_batch_header + partition, self._subscriptions.assignment[partition].position.offset, next_offset_from_batch_header) + self._subscriptions.assignment[partition].position = OffsetAndMetadata(next_offset_from_batch_header, b'', -1) position = self._subscriptions.assignment[partition].position @@ -718,19 +721,28 @@ def _create_fetch_requests(self): if version < 5: partition_info = ( partition.partition, - position, + position.offset, self.config['max_partition_fetch_bytes'] ) + elif version <= 8: + partition_info = ( + partition.partition, + position.offset, + -1, # log_start_offset is used internally by brokers / replicas only + self.config['max_partition_fetch_bytes'], + ) else: partition_info = ( partition.partition, - position, + position.leader_epoch, + position.offset, -1, # log_start_offset is used internally by brokers / replicas only self.config['max_partition_fetch_bytes'], ) + fetchable[node_id][partition] = partition_info log.debug("Adding fetch request for partition %s at offset %d", - partition, position) + partition, position.offset) requests = {} for node_id, next_partitions in six.iteritems(fetchable): @@ -778,7 +790,10 @@ def _create_fetch_requests(self): fetch_offsets = {} for tp, partition_data in six.iteritems(next_partitions): - offset = partition_data[1] + if version <= 8: + offset = partition_data[1] + else: + offset = partition_data[2] fetch_offsets[tp] = offset requests[node_id] = (request, fetch_offsets) @@ -807,7 +822,7 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): tp = TopicPartition(topic, partition_data[0]) fetch_offset = fetch_offsets[tp] completed_fetch = CompletedFetch( - tp, fetch_offsets[tp], + tp, fetch_offset, response.API_VERSION, partition_data[1:], metric_aggregator @@ -847,18 +862,18 @@ def _parse_fetched_data(self, completed_fetch): # Note that the *response* may return a messageset that starts # earlier (e.g., compressed messages) or later (e.g., compacted topic) position = self._subscriptions.assignment[tp].position - if position is None or position != fetch_offset: + if position is None or position.offset != fetch_offset: log.debug("Discarding fetch response for partition %s" " since its offset %d does not match the" " expected offset %d", tp, fetch_offset, - position) + position.offset) return None records = MemoryRecords(completed_fetch.partition_data[-1]) if records.has_next(): log.debug("Adding fetched record for partition %s with" " offset %d to buffered record list", tp, - position) + position.offset) unpacked = list(self._unpack_records(tp, records)) parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked) if unpacked: @@ -889,10 +904,10 @@ def _parse_fetched_data(self, completed_fetch): self._client.cluster.request_update() elif error_type is Errors.OffsetOutOfRangeError: position = self._subscriptions.assignment[tp].position - if position is None or position != fetch_offset: + if position is None or position.offset != fetch_offset: log.debug("Discarding stale fetch response for partition %s" " since the fetched offset %d does not match the" - " current offset %d", tp, fetch_offset, position) + " current offset %d", tp, fetch_offset, position.offset) elif self._subscriptions.has_default_offset_reset_policy(): log.info("Fetch offset %s is out of range for topic-partition %s", fetch_offset, tp) self._subscriptions.need_offset_reset(tp) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index f150c4bd6..74ce6ca90 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -17,7 +17,7 @@ from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.metrics import MetricConfig, Metrics from kafka.protocol.list_offsets import OffsetResetStrategy -from kafka.structs import TopicPartition +from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.version import __version__ log = logging.getLogger(__name__) @@ -737,11 +737,11 @@ def position(self, partition): if not isinstance(partition, TopicPartition): raise TypeError('partition must be a TopicPartition namedtuple') assert self._subscription.is_assigned(partition), 'Partition is not assigned' - offset = self._subscription.assignment[partition].position + position = self._subscription.assignment[partition].position if offset is None: self._update_fetch_positions([partition]) - offset = self._subscription.assignment[partition].position - return offset + position = self._subscription.assignment[partition].position + return position.offset def highwater(self, partition): """Last known highwater offset for a partition. @@ -1144,7 +1144,7 @@ def _message_generator_v2(self): log.debug("Not returning fetched records for partition %s" " since it is no longer fetchable", tp) break - self._subscription.assignment[tp].position = record.offset + 1 + self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, b'', -1) yield record def _message_generator(self): diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 7a5486821..7ab864b3d 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -319,7 +319,7 @@ def all_consumed_offsets(self): all_consumed = {} for partition, state in six.iteritems(self.assignment): if state.has_valid_position: - all_consumed[partition] = OffsetAndMetadata(state.position, b'', -1) + all_consumed[partition] = state.position return all_consumed def need_offset_reset(self, partition, offset_reset_strategy=None): @@ -379,7 +379,7 @@ def __init__(self): self.paused = False # whether this partition has been paused by the user self.awaiting_reset = False # whether we are awaiting reset self.reset_strategy = None # the reset strategy if awaitingReset is set - self._position = None # offset exposed to the user + self._position = None # OffsetAndMetadata exposed to the user self.highwater = None self.drop_pending_record_batch = False # The last message offset hint available from a record batch with @@ -388,6 +388,7 @@ def __init__(self): def _set_position(self, offset): assert self.has_valid_position, 'Valid position required' + assert isinstance(offset, OffsetAndMetadata) self._position = offset def _get_position(self): @@ -403,7 +404,7 @@ def await_reset(self, strategy): self.has_valid_position = False def seek(self, offset): - self._position = offset + self._position = OffsetAndMetadata(offset, b'', -1) self.awaiting_reset = False self.reset_strategy = None self.has_valid_position = True diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index b3a6fd082..14732cb06 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -136,6 +136,10 @@ def __init__(self, buffer): def base_offset(self): return self._header_data[0] + @property + def leader_epoch(self): + return self._header_data[2] + @property def magic(self): return self._header_data[3] diff --git a/kafka/structs.py b/kafka/structs.py index 9e9ad9e3c..f955615a0 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -66,9 +66,10 @@ Keyword Arguments: offset (int): An offset timestamp (int): The timestamp associated to the offset + leader_epoch (int): The last known epoch from the leader / broker """ OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", - ["offset", "timestamp"]) + ["offset", "timestamp", "leader_epoch"]) MemberInformation = namedtuple("MemberInformation", ["member_id", "client_id", "client_host", "member_metadata", "member_assignment"]) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 6789329b4..b1641019c 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -259,8 +259,8 @@ def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_pro }) assert offsets == { - tp0: OffsetAndTimestamp(p0msg.offset, send_time), - tp1: OffsetAndTimestamp(p1msg.offset, send_time) + tp0: OffsetAndTimestamp(p0msg.offset, send_time, -1), + tp1: OffsetAndTimestamp(p1msg.offset, send_time, -1) } offsets = consumer.beginning_offsets([tp0, tp1]) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 7c010491b..309dd5517 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -24,7 +24,7 @@ UnknownTopicOrPartitionError, OffsetOutOfRangeError ) from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords -from kafka.structs import OffsetAndMetadata, TopicPartition +from kafka.structs import OffsetAndMetadata, OffsetAndTimestamp, TopicPartition @pytest.fixture @@ -152,10 +152,10 @@ def test__reset_offset(fetcher, mocker): fetcher._subscriptions.need_offset_reset(tp) mocked = mocker.patch.object(fetcher, '_retrieve_offsets') - mocked.return_value = {tp: (1001, None)} + mocked.return_value = {tp: OffsetAndTimestamp(1001, None, -1)} fetcher._reset_offset(tp) assert not fetcher._subscriptions.assignment[tp].awaiting_reset - assert fetcher._subscriptions.assignment[tp].position == 1001 + assert fetcher._subscriptions.assignment[tp].position.offset == 1001 def test__send_list_offsets_requests(fetcher, mocker): @@ -279,7 +279,7 @@ def test__handle_list_offsets_response_v1(fetcher, mocker): ]) fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() - assert fut.value == {TopicPartition("topic", 1): (9999, 1000)} + assert fut.value == {TopicPartition("topic", 1): OffsetAndTimestamp(9999, 1000, -1)} # Broker returns NotLeaderForPartitionError fut = Future() @@ -322,7 +322,7 @@ def test__handle_list_offsets_response_v2_v3(fetcher, mocker): ]) fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() - assert fut.value == {TopicPartition("topic", 0): (9999, 1000)} + assert fut.value == {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)} # v3 response is the same format fut = Future() @@ -332,7 +332,29 @@ def test__handle_list_offsets_response_v2_v3(fetcher, mocker): ]) fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() - assert fut.value == {TopicPartition("topic", 0): (9999, 1000)} + assert fut.value == {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)} + + +def test__handle_list_offsets_response_v4_v5(fetcher, mocker): + # includes leader_epoch + fut = Future() + res = ListOffsetsResponse[4]( + 123, # throttle_time_ms + [("topic", [(0, 0, 1000, 9999, 1234)]) + ]) + fetcher._handle_list_offsets_response(fut, res) + assert fut.succeeded() + assert fut.value == {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)} + + # v5 response is the same format + fut = Future() + res = ListOffsetsResponse[5]( + 123, # throttle_time_ms + [("topic", [(0, 0, 1000, 9999, 1234)]) + ]) + fetcher._handle_list_offsets_response(fut, res) + assert fut.succeeded() + assert fut.value == {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)} def test_fetched_records(fetcher, topic, mocker): @@ -546,7 +568,7 @@ def test_partition_records_offset(): batch_end = 130 fetch_offset = 123 tp = TopicPartition('foo', 0) - messages = [ConsumerRecord(tp.topic, tp.partition, i, + messages = [ConsumerRecord(tp.topic, tp.partition, -1, i, None, None, 'key', 'value', [], 'checksum', 0, 0, -1) for i in range(batch_start, batch_end)] records = Fetcher.PartitionRecords(fetch_offset, None, messages) @@ -571,7 +593,7 @@ def test_partition_records_no_fetch_offset(): batch_end = 100 fetch_offset = 123 tp = TopicPartition('foo', 0) - messages = [ConsumerRecord(tp.topic, tp.partition, i, + messages = [ConsumerRecord(tp.topic, tp.partition, -1, i, None, None, 'key', 'value', None, 'checksum', 0, 0, -1) for i in range(batch_start, batch_end)] records = Fetcher.PartitionRecords(fetch_offset, None, messages) @@ -586,7 +608,7 @@ def test_partition_records_compacted_offset(): batch_end = 100 fetch_offset = 42 tp = TopicPartition('foo', 0) - messages = [ConsumerRecord(tp.topic, tp.partition, i, + messages = [ConsumerRecord(tp.topic, tp.partition, -1, i, None, None, 'key', 'value', None, 'checksum', 0, 0, -1) for i in range(batch_start, batch_end) if i != fetch_offset] records = Fetcher.PartitionRecords(fetch_offset, None, messages) From 269b953ce508b2a8300df385d37ba52df65004b4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 3 Mar 2025 22:49:26 -0800 Subject: [PATCH 04/15] todo --- kafka/consumer/fetcher.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1a03b216e..7848573f9 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -253,9 +253,10 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): or ``timeout_ms`` passed. Arguments: - timestamps: {TopicPartition: int} dict with timestamps to fetch - offsets by. -1 for the latest available, -2 for the earliest - available. Otherwise timestamp is treated as epoch milliseconds. + timestamps: {TopicPartition: (int, int)} dict with (timestamp, leader_epoch) + tuples s to fetch offsets by. Timestamp is -1 for the latest available, and + -2 for the earliest available. Otherwise timestamp is treated as epoch milliseconds. + Leader epoch is -1 to ignore, otherwise last known epoch value from partition. Returns: {TopicPartition: OffsetAndTimestamp}: Mapping of partition to @@ -578,7 +579,8 @@ def _send_list_offsets_request(self, node_id, timestamps): by_topic = collections.defaultdict(list) for tp, timestamp in six.iteritems(timestamps): if version >= 4: - data = (tp.partition, leader_epoch, timestamp) + # TODO: leader_epoch + data = (tp.partition, -1, timestamp) elif version >= 1: data = (tp.partition, timestamp) else: From 245c4fec42828917c27c8bcfdb68a043d3c4ded9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 3 Mar 2025 23:04:32 -0800 Subject: [PATCH 05/15] OffsetForLeaderEpoch request/response protocol def --- kafka/protocol/offset_for_leader_epoch.py | 140 ++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 kafka/protocol/offset_for_leader_epoch.py diff --git a/kafka/protocol/offset_for_leader_epoch.py b/kafka/protocol/offset_for_leader_epoch.py new file mode 100644 index 000000000..afe8284eb --- /dev/null +++ b/kafka/protocol/offset_for_leader_epoch.py @@ -0,0 +1,140 @@ +from __future__ import absolute_import + +from kafka.protocol.api import Request, Response +from kafka.protocol.types import Array, CompactArray, CompactString, Int16, Int32, Int64, Schema, String, TaggedFields + + +class OffsetForLeaderEpochResponse_v0(Request): + API_KEY = 23 + API_VERSION = 0 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('error_code', Int16), + ('partition', Int32), + ('end_offset', Int64)))))) + + +class OffsetForLeaderEpochResponse_v1(Request): + API_KEY = 23 + API_VERSION = 1 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('error_code', Int16), + ('partition', Int32), + ('leader_epoch', Int32), + ('end_offset', Int64)))))) + + +class OffsetForLeaderEpochResponse_v2(Request): + API_KEY = 23 + API_VERSION = 2 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('error_code', Int16), + ('partition', Int32), + ('leader_epoch', Int32), + ('end_offset', Int64)))))) + + +class OffsetForLeaderEpochResponse_v3(Request): + API_KEY = 23 + API_VERSION = 3 + SCHEMA = OffsetForLeaderEpochResponse_v2.SCHEMA + + +class OffsetForLeaderEpochResponse_v4(Request): + API_KEY = 23 + API_VERSION = 4 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', CompactArray( + ('topic', CompactString('utf-8')), + ('partitions', CompactArray( + ('error_code', Int16), + ('partition', Int32), + ('leader_epoch', Int32), + ('end_offset', Int64), + ('tags', TaggedFields))), + ('tags', TaggedFields))), + ('tags', TaggedFields)) + + +class OffsetForLeaderEpochRequest_v0(Request): + API_KEY = 23 + API_VERSION = 0 + RESPONSE_TYPE = OffsetForLeaderEpochResponse_v0 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('leader_epoch', Int32)))))) + + +class OffsetForLeaderEpochRequest_v1(Request): + API_KEY = 23 + API_VERSION = 1 + RESPONSE_TYPE = OffsetForLeaderEpochResponse_v1 + SCHEMA = OffsetForLeaderEpochRequest_v0.SCHEMA + + +class OffsetForLeaderEpochRequest_v2(Request): + API_KEY = 23 + API_VERSION = 2 + RESPONSE_TYPE = OffsetForLeaderEpochResponse_v2 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('leader_epoch', Int32)))))) + + +class OffsetForLeaderEpochRequest_v3(Request): + API_KEY = 23 + API_VERSION = 3 + RESPONSE_TYPE = OffsetForLeaderEpochResponse_v3 + SCHEMA = Schema( + ('replica_id', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('leader_epoch', Int32)))))) + + +class OffsetForLeaderEpochRequest_v4(Request): + API_KEY = 23 + API_VERSION = 4 + RESPONSE_TYPE = OffsetForLeaderEpochResponse_v4 + SCHEMA = Schema( + ('replica_id', Int32), + ('topics', CompactArray( + ('topic', CompactString('utf-8')), + ('partitions', CompactArray( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('leader_epoch', Int32), + ('tags', TaggedFields))), + ('tags', TaggedFields))), + ('tags', TaggedFields)) + +OffsetForLeaderEpochRequest = [ + OffsetForLeaderEpochRequest_v0, OffsetForLeaderEpochRequest_v1, + OffsetForLeaderEpochRequest_v2, OffsetForLeaderEpochRequest_v3, + OffsetForLeaderEpochRequest_v4, +] +OffsetForLeaderEpochResponse = [ + OffsetForLeaderEpochResponse_v0, OffsetForLeaderEpochResponse_v1, + OffsetForLeaderEpochResponse_v2, OffsetForLeaderEpochResponse_v3, + OffsetForLeaderEpochResponse_v4, +] From 7473305e205f8c742660ebaeccf504d7889e3aa4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 4 Mar 2025 14:28:29 -0800 Subject: [PATCH 06/15] fixup consumer.position() --- kafka/consumer/group.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 74ce6ca90..a1b9b277d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -732,16 +732,16 @@ def position(self, partition): partition (TopicPartition): Partition to check Returns: - int: Offset + int: Offset or None """ if not isinstance(partition, TopicPartition): raise TypeError('partition must be a TopicPartition namedtuple') assert self._subscription.is_assigned(partition), 'Partition is not assigned' position = self._subscription.assignment[partition].position - if offset is None: + if position is None: self._update_fetch_positions([partition]) position = self._subscription.assignment[partition].position - return position.offset + return position.offset if position else None def highwater(self, partition): """Last known highwater offset for a partition. From 2ec578dc890e4c807bf403bbe1492614910a5d8f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 4 Mar 2025 14:40:44 -0800 Subject: [PATCH 07/15] metadata string --- kafka/consumer/fetcher.py | 6 +++--- kafka/consumer/group.py | 2 +- kafka/consumer/subscription_state.py | 2 +- kafka/structs.py | 4 ++-- test/test_coordinator.py | 32 ++++++++++++++-------------- test/test_fetcher.py | 2 +- 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7848573f9..7e45736f0 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -385,7 +385,7 @@ def _append(self, drained, part, max_records, update_offsets): drained[tp].append(record) if update_offsets: - self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, b'', leader_epoch) + self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, '', leader_epoch) return len(part_records) else: @@ -449,7 +449,7 @@ def _message_generator(self): self._subscriptions.assignment[tp].position.offset) continue - self._subscriptions.assignment[tp].position = OffsetAndMetadata(msg.offset + 1, b'', -1) + self._subscriptions.assignment[tp].position = OffsetAndMetadata(msg.offset + 1, '', -1) yield msg self._next_partition_records = None @@ -705,7 +705,7 @@ def _create_fetch_requests(self): "Advance position for partition %s from %s to %s (last record batch location plus one)" " to correct for deleted compacted messages and/or transactional control records", partition, self._subscriptions.assignment[partition].position.offset, next_offset_from_batch_header) - self._subscriptions.assignment[partition].position = OffsetAndMetadata(next_offset_from_batch_header, b'', -1) + self._subscriptions.assignment[partition].position = OffsetAndMetadata(next_offset_from_batch_header, '', -1) position = self._subscriptions.assignment[partition].position diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a1b9b277d..6f23bec8a 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1144,7 +1144,7 @@ def _message_generator_v2(self): log.debug("Not returning fetched records for partition %s" " since it is no longer fetchable", tp) break - self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, b'', -1) + self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, '', -1) yield record def _message_generator(self): diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 7ab864b3d..b30922b3e 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -404,7 +404,7 @@ def await_reset(self, strategy): self.has_valid_position = False def seek(self, offset): - self._position = OffsetAndMetadata(offset, b'', -1) + self._position = OffsetAndMetadata(offset, '', -1) self.awaiting_reset = False self.reset_strategy = None self.has_valid_position = True diff --git a/kafka/structs.py b/kafka/structs.py index f955615a0..16ba0daac 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -48,13 +48,13 @@ """The Kafka offset commit API The Kafka offset commit API allows users to provide additional metadata -(in the form of raw bytes) when an offset is committed. This can be useful +(in the form of a string) when an offset is committed. This can be useful (for example) to store information about which node made the commit, what time the commit was made, etc. Keyword Arguments: offset (int): The offset to be committed - metadata (bytes): Non-null metadata + metadata (str): Non-null metadata leader_epoch (int): The last known epoch from the leader / broker """ OffsetAndMetadata = namedtuple("OffsetAndMetadata", diff --git a/test/test_coordinator.py b/test/test_coordinator.py index e8a238f49..09422790e 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -230,13 +230,13 @@ def test_need_rejoin(coordinator): def test_refresh_committed_offsets_if_needed(mocker, coordinator): mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets', return_value = { - TopicPartition('foobar', 0): OffsetAndMetadata(123, b'', -1), - TopicPartition('foobar', 1): OffsetAndMetadata(234, b'', -1)}) + TopicPartition('foobar', 0): OffsetAndMetadata(123, '', -1), + TopicPartition('foobar', 1): OffsetAndMetadata(234, '', -1)}) coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)]) assert coordinator._subscription.needs_fetch_committed_offsets is True coordinator.refresh_committed_offsets_if_needed() assignment = coordinator._subscription.assignment - assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'', -1) + assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, '', -1) assert TopicPartition('foobar', 1) not in assignment assert coordinator._subscription.needs_fetch_committed_offsets is False @@ -303,8 +303,8 @@ def test_close(mocker, coordinator): @pytest.fixture def offsets(): return { - TopicPartition('foobar', 0): OffsetAndMetadata(123, b'', -1), - TopicPartition('foobar', 1): OffsetAndMetadata(234, b'', -1), + TopicPartition('foobar', 0): OffsetAndMetadata(123, '', -1), + TopicPartition('foobar', 1): OffsetAndMetadata(234, '', -1), } @@ -594,27 +594,27 @@ def test_send_offset_fetch_request_success(patched_coord, partitions): @pytest.mark.parametrize('response,error,dead', [ - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 14), (1, 234, '', 14)])]), Errors.GroupLoadInProgressError, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 16), (1, 234, '', 16)])]), Errors.NotCoordinatorForGroupError, True), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 25), (1, 234, '', 25)])]), Errors.UnknownMemberIdError, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 22), (1, 234, '', 22)])]), Errors.IllegalGenerationError, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 29), (1, 234, '', 29)])]), Errors.TopicAuthorizationFailedError, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 0), (1, 234, '', 0)])]), None, False), - (OffsetFetchResponse[1]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), + (OffsetFetchResponse[1]([('foobar', [(0, 123, '', 0), (1, 234, '', 0)])]), None, False), - (OffsetFetchResponse[2]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])], 0), + (OffsetFetchResponse[2]([('foobar', [(0, 123, '', 0), (1, 234, '', 0)])], 0), None, False), - (OffsetFetchResponse[3](0, [('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])], 0), + (OffsetFetchResponse[3](0, [('foobar', [(0, 123, '', 0), (1, 234, '', 0)])], 0), None, False), - (OffsetFetchResponse[4](0, [('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])], 0), + (OffsetFetchResponse[4](0, [('foobar', [(0, 123, '', 0), (1, 234, '', 0)])], 0), None, False), - (OffsetFetchResponse[5](0, [('foobar', [(0, 123, -1, b'', 0), (1, 234, -1, b'', 0)])], 0), + (OffsetFetchResponse[5](0, [('foobar', [(0, 123, -1, '', 0), (1, 234, -1, '', 0)])], 0), None, False), ]) def test_handle_offset_fetch_response(patched_coord, offsets, diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 309dd5517..af15fad57 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -138,7 +138,7 @@ def test_update_fetch_positions(fetcher, topic, mocker): fetcher._reset_offset.reset_mock() fetcher._subscriptions.need_offset_reset(partition) fetcher._subscriptions.assignment[partition].awaiting_reset = False - fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'', -1) + fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, '', -1) mocker.patch.object(fetcher._subscriptions, 'seek') fetcher.update_fetch_positions([partition]) assert fetcher._reset_offset.call_count == 0 From 3e4d2955e73e65a6be94a3392365f0cfdd62de05 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 5 Mar 2025 07:27:47 -0800 Subject: [PATCH 08/15] use leader_epoch from metadata for list_offsets --- kafka/cluster.py | 3 +++ kafka/consumer/fetcher.py | 10 +++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/kafka/cluster.py b/kafka/cluster.py index b97547c3e..c28d36d20 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -141,6 +141,9 @@ def leader_for_partition(self, partition): return None return self._partitions[partition.topic][partition.partition].leader + def leader_epoch_for_partition(self, partition): + return self._partitions[partition.topic][partition.partition].leader_epoch + def partitions_for_broker(self, broker_id): """Return TopicPartitions for which the broker is a leader. diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7e45736f0..64c3312eb 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -549,7 +549,8 @@ def _send_list_offsets_requests(self, timestamps): return Future().failure( Errors.LeaderNotAvailableError(partition)) else: - timestamps_by_node[node_id][partition] = timestamp + leader_epoch = self._client.cluster.leader_epoch_for_partition(partition) + timestamps_by_node[node_id][partition] = (timestamp, leader_epoch) # Aggregate results until we have all list_offsets_future = Future() @@ -574,13 +575,12 @@ def on_fail(err): _f.add_errback(on_fail) return list_offsets_future - def _send_list_offsets_request(self, node_id, timestamps): + def _send_list_offsets_request(self, node_id, timestamps_and_epochs): version = self._client.api_version(ListOffsetsRequest, max_version=3) by_topic = collections.defaultdict(list) - for tp, timestamp in six.iteritems(timestamps): + for tp, (timestamp, leader_epoch) in six.iteritems(timestamps_and_epochs): if version >= 4: - # TODO: leader_epoch - data = (tp.partition, -1, timestamp) + data = (tp.partition, leader_epoch, timestamp) elif version >= 1: data = (tp.partition, timestamp) else: From 6a637e0a7da6bc1e9aa453c6c79fa6254edbd9d8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 5 Mar 2025 08:04:02 -0800 Subject: [PATCH 09/15] fix ListOffsetsRequest_v4 --- kafka/protocol/list_offsets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/protocol/list_offsets.py b/kafka/protocol/list_offsets.py index 9c5ad5edf..2e36dd660 100644 --- a/kafka/protocol/list_offsets.py +++ b/kafka/protocol/list_offsets.py @@ -166,7 +166,7 @@ class ListOffsetsRequest_v4(Request): ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('current_leader_epoch', Int64), + ('current_leader_epoch', Int32), ('timestamp', Int64))))) ) DEFAULTS = { From 5d63ad90931d17283473f6a6bfe09359f55f068a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 5 Mar 2025 08:04:20 -0800 Subject: [PATCH 10/15] Bump to ListOffsets v4 in fetcher --- kafka/consumer/fetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 64c3312eb..976f589fe 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -576,7 +576,7 @@ def on_fail(err): return list_offsets_future def _send_list_offsets_request(self, node_id, timestamps_and_epochs): - version = self._client.api_version(ListOffsetsRequest, max_version=3) + version = self._client.api_version(ListOffsetsRequest, max_version=4) by_topic = collections.defaultdict(list) for tp, (timestamp, leader_epoch) in six.iteritems(timestamps_and_epochs): if version >= 4: From 1d2786fe8c10e82cf55aeaeedad2d4d02613e3ab Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 5 Mar 2025 08:28:47 -0800 Subject: [PATCH 11/15] test_fetcher: mock leader_epoch_for_partitions; add leader_epoch to expected timestamps --- test/test_fetcher.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index af15fad57..aec19a68d 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -108,6 +108,7 @@ def build_fetch_offsets(request): def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): fetcher._client._api_versions = BROKER_API_VERSIONS[api_version] mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) + mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) by_node = fetcher._create_fetch_requests() requests_and_offsets = by_node.values() assert set([r.API_VERSION for (r, _offsets) in requests_and_offsets]) == set([fetch_version]) @@ -175,6 +176,7 @@ def send_side_effect(*args, **kw): # always as available mocked_leader.side_effect = itertools.chain( [None, -1], itertools.cycle([0])) + mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) # Leader == None fut = fetcher._send_list_offsets_requests({tp: 0}) @@ -224,6 +226,7 @@ def send_side_effect(node_id, timestamps): mocked_leader = mocker.patch.object( fetcher._client.cluster, "leader_for_partition") mocked_leader.side_effect = itertools.cycle([0, 1]) + mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) # -- All node succeeded case tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)]) @@ -241,8 +244,8 @@ def send_side_effect(node_id, timestamps): else: second_future = f assert req_by_node == { - 0: {tp1: 0, tp3: 0}, - 1: {tp2: 0, tp4: 0} + 0: {tp1: (0, 0), tp3: (0, 0)}, + 1: {tp2: (0, 0), tp4: (0, 0)} } # We only resolved 1 future so far, so result future is not yet ready From af414d35f1075b16f5b48c1ccd518e9a6711b1f0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 5 Mar 2025 08:50:12 -0800 Subject: [PATCH 12/15] catch more error types in fetcher --- kafka/consumer/fetcher.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 976f589fe..2011adf10 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -649,22 +649,23 @@ def _handle_list_offsets_response(self, future, response): # we simply put None in the response. log.debug("Cannot search by timestamp for partition %s because the" " message format version is before 0.10.0", partition) - elif error_type is Errors.NotLeaderForPartitionError: + elif error_type in (Errors.NotLeaderForPartitionError, + Errors.ReplicaNotAvailableError, + Errors.KafkaStorageError): log.debug("Attempt to fetch offsets for partition %s failed due" - " to obsolete leadership information, retrying.", - partition) + " to %s, retrying.", error_type.__name__, partition) future.failure(error_type(partition)) return elif error_type is Errors.UnknownTopicOrPartitionError: log.warning("Received unknown topic or partition error in ListOffsets " - "request for partition %s. The topic/partition " + - "may not exist or the user may not have Describe access " - "to it.", partition) + "request for partition %s. The topic/partition " + + "may not exist or the user may not have Describe access " + "to it.", partition) future.failure(error_type(partition)) return else: log.warning("Attempt to fetch offsets for partition %s failed due to:" - " %s", partition, error_type) + " %s", partition, error_type.__name__) future.failure(error_type(partition)) return if not future.is_done: @@ -900,6 +901,7 @@ def _parse_fetched_data(self, completed_fetch): self._sensors.record_topic_fetch_metrics(tp.topic, num_bytes, records_count) elif error_type in (Errors.NotLeaderForPartitionError, + Errors.ReplicaNotAvailableError, Errors.UnknownTopicOrPartitionError, Errors.KafkaStorageError): log.debug("Error fetching partition %s: %s", tp, error_type.__name__) From f28ed0efafae5bcbfbfc16e7de707732c0b5745a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 5 Mar 2025 09:02:12 -0800 Subject: [PATCH 13/15] Use ANY matcher for leader_epoch in consumer integrations test --- test/test_consumer_integration.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index b1641019c..5aeb63d1d 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,7 +1,7 @@ import logging import time -from mock import patch +from mock import patch, ANY import pytest from kafka.vendor.six.moves import range @@ -258,9 +258,10 @@ def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_pro tp1: send_time }) + leader_epoch = ANY if env_kafka_version() >= (2, 1) else -1 assert offsets == { - tp0: OffsetAndTimestamp(p0msg.offset, send_time, -1), - tp1: OffsetAndTimestamp(p1msg.offset, send_time, -1) + tp0: OffsetAndTimestamp(p0msg.offset, send_time, leader_epoch), + tp1: OffsetAndTimestamp(p1msg.offset, send_time, leader_epoch) } offsets = consumer.beginning_offsets([tp0, tp1]) From fb61535ecce3edae7e2d8c71e17c8310c87ee354 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 5 Mar 2025 09:48:10 -0800 Subject: [PATCH 14/15] UnknownLeaderEpochError => invalid metadata --- kafka/errors.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/errors.py b/kafka/errors.py index b8fa06708..aaba89d39 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -664,6 +664,7 @@ class UnknownLeaderEpochError(BrokerResponseError): message = 'UNKNOWN_LEADER_EPOCH' description = 'The leader epoch in the request is newer than the epoch on the broker.' retriable = True + invalid_metadata = True class UnsupportedCompressionTypeError(BrokerResponseError): From 9e04d8e66f9580fcafc527dd59702f5ad0d1c0a2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 5 Mar 2025 11:57:37 -0800 Subject: [PATCH 15/15] discard leader_epoch data for now --- kafka/consumer/fetcher.py | 12 ++++++------ kafka/coordinator/consumer.py | 3 ++- test/test_fetcher.py | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 2011adf10..eefac5ba7 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -253,10 +253,9 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): or ``timeout_ms`` passed. Arguments: - timestamps: {TopicPartition: (int, int)} dict with (timestamp, leader_epoch) - tuples s to fetch offsets by. Timestamp is -1 for the latest available, and - -2 for the earliest available. Otherwise timestamp is treated as epoch milliseconds. - Leader epoch is -1 to ignore, otherwise last known epoch value from partition. + timestamps: {TopicPartition: int} dict with timestamps to fetch + offsets by. -1 for the latest available, -2 for the earliest + available. Otherwise timestamp is treated as epoch milliseconds. Returns: {TopicPartition: OffsetAndTimestamp}: Mapping of partition to @@ -385,7 +384,8 @@ def _append(self, drained, part, max_records, update_offsets): drained[tp].append(record) if update_offsets: - self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, '', leader_epoch) + # TODO: save leader_epoch + self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, '', -1) return len(part_records) else: @@ -549,7 +549,7 @@ def _send_list_offsets_requests(self, timestamps): return Future().failure( Errors.LeaderNotAvailableError(partition)) else: - leader_epoch = self._client.cluster.leader_epoch_for_partition(partition) + leader_epoch = -1 timestamps_by_node[node_id][partition] = (timestamp, leader_epoch) # Aggregate results until we have all diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index cf508c606..36c91ee42 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -835,7 +835,8 @@ def _handle_offset_fetch_response(self, future, response): elif offset >= 0: # record the position with the offset # (-1 indicates no committed offset to fetch) - offsets[tp] = OffsetAndMetadata(offset, metadata, leader_epoch) + # TODO: save leader_epoch + offsets[tp] = OffsetAndMetadata(offset, metadata, -1) else: log.debug("Group %s has no committed offset for partition" " %s", self.group_id, tp) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index aec19a68d..7e948e3cb 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -244,8 +244,8 @@ def send_side_effect(node_id, timestamps): else: second_future = f assert req_by_node == { - 0: {tp1: (0, 0), tp3: (0, 0)}, - 1: {tp2: (0, 0), tp4: (0, 0)} + 0: {tp1: (0, -1), tp3: (0, -1)}, + 1: {tp2: (0, -1), tp4: (0, -1)} } # We only resolved 1 future so far, so result future is not yet ready