diff --git a/kafka/admin/client.py b/kafka/admin/client.py index a46cf9c58..29ee6cd9a 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1353,7 +1353,7 @@ def _list_consumer_group_offsets_send_request(self, group_id, Returns: A message future """ - version = self._client.api_version(OffsetFetchRequest, max_version=3) + version = self._client.api_version(OffsetFetchRequest, max_version=5) if version <= 3: if partitions is None: if version <= 1: @@ -1386,7 +1386,7 @@ def _list_consumer_group_offsets_process_response(self, response): A dictionary composed of TopicPartition keys and OffsetAndMetadata values. """ - if response.API_VERSION <= 3: + if response.API_VERSION <= 5: # OffsetFetchResponse_v1 lacks a top-level error_code if response.API_VERSION > 1: @@ -1401,13 +1401,18 @@ def _list_consumer_group_offsets_process_response(self, response): # OffsetAndMetadata values--this is what the Java AdminClient returns offsets = {} for topic, partitions in response.topics: - for partition, offset, metadata, error_code in partitions: + for partition_data in partitions: + if response.API_VERSION <= 4: + partition, offset, metadata, error_code = partition_data + leader_epoch = -1 + else: + partition, offset, leader_epoch, metadata, error_code = partition_data error_type = Errors.for_code(error_code) if error_type is not Errors.NoError: raise error_type( "Unable to fetch consumer group offsets for topic {}, partition {}" .format(topic, partition)) - offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata) + offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata, leader_epoch) else: raise NotImplementedError( "Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient." @@ -1439,7 +1444,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, Returns: dictionary: A dictionary with TopicPartition keys and - OffsetAndMetada values. Partitions that are not specified and for + OffsetAndMetadata values. Partitions that are not specified and for which the group_id does not have a recorded offset are omitted. An offset value of `-1` indicates the group_id has no offset for that TopicPartition. A `-1` can only happen for partitions that are diff --git a/kafka/cluster.py b/kafka/cluster.py index b97547c3e..c28d36d20 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -141,6 +141,9 @@ def leader_for_partition(self, partition): return None return self._partitions[partition.topic][partition.partition].leader + def leader_epoch_for_partition(self, partition): + return self._partitions[partition.topic][partition.partition].leader_epoch + def partitions_for_broker(self, broker_id): """Return TopicPartitions for which the broker is a leader. diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 98f5dbcfa..eefac5ba7 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -18,7 +18,7 @@ ) from kafka.record import MemoryRecords from kafka.serializer import Deserializer -from kafka.structs import TopicPartition, OffsetAndTimestamp +from kafka.structs import TopicPartition, OffsetAndMetadata, OffsetAndTimestamp log = logging.getLogger(__name__) @@ -28,7 +28,7 @@ READ_COMMITTED = 1 ConsumerRecord = collections.namedtuple("ConsumerRecord", - ["topic", "partition", "offset", "timestamp", "timestamp_type", + ["topic", "partition", "leader_epoch", "offset", "timestamp", "timestamp_type", "key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"]) @@ -198,9 +198,6 @@ def get_offsets_by_times(self, timestamps, timeout_ms): for tp in timestamps: if tp not in offsets: offsets[tp] = None - else: - offset, timestamp = offsets[tp] - offsets[tp] = OffsetAndTimestamp(offset, timestamp) return offsets def beginning_offsets(self, partitions, timeout_ms): @@ -215,7 +212,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): timestamps = dict([(tp, timestamp) for tp in partitions]) offsets = self._retrieve_offsets(timestamps, timeout_ms) for tp in timestamps: - offsets[tp] = offsets[tp][0] + offsets[tp] = offsets[tp].offset return offsets def _reset_offset(self, partition): @@ -240,7 +237,7 @@ def _reset_offset(self, partition): offsets = self._retrieve_offsets({partition: timestamp}) if partition in offsets: - offset = offsets[partition][0] + offset = offsets[partition].offset # we might lose the assignment while fetching the offset, # so check it is still active @@ -261,8 +258,8 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): available. Otherwise timestamp is treated as epoch milliseconds. Returns: - {TopicPartition: (int, int)}: Mapping of partition to - retrieved offset and timestamp. If offset does not exist for + {TopicPartition: OffsetAndTimestamp}: Mapping of partition to + retrieved offset, timestamp, and leader_epoch. If offset does not exist for the provided timestamp, that partition will be missing from this mapping. """ @@ -373,20 +370,22 @@ def _append(self, drained, part, max_records, update_offsets): log.debug("Not returning fetched records for assigned partition" " %s since it is no longer fetchable", tp) - elif fetch_offset == position: + elif fetch_offset == position.offset: # we are ensured to have at least one record since we already checked for emptiness part_records = part.take(max_records) next_offset = part_records[-1].offset + 1 + leader_epoch = part_records[-1].leader_epoch log.log(0, "Returning fetched records at offset %d for assigned" - " partition %s and update position to %s", position, - tp, next_offset) + " partition %s and update position to %s (leader epoch %s)", position.offset, + tp, next_offset, leader_epoch) for record in part_records: drained[tp].append(record) if update_offsets: - self._subscriptions.assignment[tp].position = next_offset + # TODO: save leader_epoch + self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, '', -1) return len(part_records) else: @@ -394,7 +393,7 @@ def _append(self, drained, part, max_records, update_offsets): # position, ignore them they must be from an obsolete request log.debug("Ignoring fetched records for %s at offset %s since" " the current position is %d", tp, part.fetch_offset, - position) + position.offset) part.discard() return 0 @@ -444,13 +443,13 @@ def _message_generator(self): break # Compressed messagesets may include earlier messages - elif msg.offset < self._subscriptions.assignment[tp].position: + elif msg.offset < self._subscriptions.assignment[tp].position.offset: log.debug("Skipping message offset: %s (expecting %s)", msg.offset, - self._subscriptions.assignment[tp].position) + self._subscriptions.assignment[tp].position.offset) continue - self._subscriptions.assignment[tp].position = msg.offset + 1 + self._subscriptions.assignment[tp].position = OffsetAndMetadata(msg.offset + 1, '', -1) yield msg self._next_partition_records = None @@ -463,8 +462,9 @@ def _unpack_records(self, tp, records): # Try DefaultsRecordBatch / message log format v2 # base_offset, last_offset_delta, and control batches try: - self._subscriptions.assignment[tp].last_offset_from_record_batch = batch.base_offset + \ - batch.last_offset_delta + batch_offset = batch.base_offset + batch.last_offset_delta + leader_epoch = batch.leader_epoch + self._subscriptions.assignment[tp].last_offset_from_record_batch = batch_offset # Control batches have a single record indicating whether a transaction # was aborted or committed. # When isolation_level is READ_COMMITTED (currently unsupported) @@ -475,6 +475,7 @@ def _unpack_records(self, tp, records): batch = records.next_batch() continue except AttributeError: + leader_epoch = -1 pass for record in batch: @@ -491,7 +492,7 @@ def _unpack_records(self, tp, records): len(h_key.encode("utf-8")) + (len(h_val) if h_val is not None else 0) for h_key, h_val in headers) if headers else -1 yield ConsumerRecord( - tp.topic, tp.partition, record.offset, record.timestamp, + tp.topic, tp.partition, leader_epoch, record.offset, record.timestamp, record.timestamp_type, key, value, headers, record.checksum, key_size, value_size, header_size) @@ -548,7 +549,8 @@ def _send_list_offsets_requests(self, timestamps): return Future().failure( Errors.LeaderNotAvailableError(partition)) else: - timestamps_by_node[node_id][partition] = timestamp + leader_epoch = -1 + timestamps_by_node[node_id][partition] = (timestamp, leader_epoch) # Aggregate results until we have all list_offsets_future = Future() @@ -573,11 +575,13 @@ def on_fail(err): _f.add_errback(on_fail) return list_offsets_future - def _send_list_offsets_request(self, node_id, timestamps): - version = self._client.api_version(ListOffsetsRequest, max_version=3) + def _send_list_offsets_request(self, node_id, timestamps_and_epochs): + version = self._client.api_version(ListOffsetsRequest, max_version=4) by_topic = collections.defaultdict(list) - for tp, timestamp in six.iteritems(timestamps): - if version >= 1: + for tp, (timestamp, leader_epoch) in six.iteritems(timestamps_and_epochs): + if version >= 4: + data = (tp.partition, leader_epoch, timestamp) + elif version >= 1: data = (tp.partition, timestamp) else: data = (tp.partition, timestamp, 1) @@ -628,38 +632,40 @@ def _handle_list_offsets_response(self, future, response): offset = UNKNOWN_OFFSET else: offset = offsets[0] - log.debug("Handling v0 ListOffsetsResponse response for %s. " - "Fetched offset %s", partition, offset) - if offset != UNKNOWN_OFFSET: - timestamp_offset_map[partition] = (offset, None) - else: + timestamp = None + leader_epoch = -1 + elif response.API_VERSION <= 3: timestamp, offset = partition_info[2:] - log.debug("Handling ListOffsetsResponse response for %s. " - "Fetched offset %s, timestamp %s", - partition, offset, timestamp) - if offset != UNKNOWN_OFFSET: - timestamp_offset_map[partition] = (offset, timestamp) + leader_epoch = -1 + else: + timestamp, offset, leader_epoch = partition_info[2:] + log.debug("Handling ListOffsetsResponse response for %s. " + "Fetched offset %s, timestamp %s, leader_epoch %s", + partition, offset, timestamp, leader_epoch) + if offset != UNKNOWN_OFFSET: + timestamp_offset_map[partition] = OffsetAndTimestamp(offset, timestamp, leader_epoch) elif error_type is Errors.UnsupportedForMessageFormatError: # The message format on the broker side is before 0.10.0, # we simply put None in the response. log.debug("Cannot search by timestamp for partition %s because the" " message format version is before 0.10.0", partition) - elif error_type is Errors.NotLeaderForPartitionError: + elif error_type in (Errors.NotLeaderForPartitionError, + Errors.ReplicaNotAvailableError, + Errors.KafkaStorageError): log.debug("Attempt to fetch offsets for partition %s failed due" - " to obsolete leadership information, retrying.", - partition) + " to %s, retrying.", error_type.__name__, partition) future.failure(error_type(partition)) return elif error_type is Errors.UnknownTopicOrPartitionError: log.warning("Received unknown topic or partition error in ListOffsets " - "request for partition %s. The topic/partition " + - "may not exist or the user may not have Describe access " - "to it.", partition) + "request for partition %s. The topic/partition " + + "may not exist or the user may not have Describe access " + "to it.", partition) future.failure(error_type(partition)) return else: log.warning("Attempt to fetch offsets for partition %s failed due to:" - " %s", partition, error_type) + " %s", partition, error_type.__name__) future.failure(error_type(partition)) return if not future.is_done: @@ -686,7 +692,7 @@ def _create_fetch_requests(self): """ # create the fetch info as a dict of lists of partition info tuples # which can be passed to FetchRequest() via .items() - version = self._client.api_version(FetchRequest, max_version=8) + version = self._client.api_version(FetchRequest, max_version=10) fetchable = collections.defaultdict(dict) for partition in self._fetchable_partitions(): @@ -695,12 +701,12 @@ def _create_fetch_requests(self): # advance position for any deleted compacted messages if required if self._subscriptions.assignment[partition].last_offset_from_record_batch: next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_record_batch + 1 - if next_offset_from_batch_header > self._subscriptions.assignment[partition].position: + if next_offset_from_batch_header > self._subscriptions.assignment[partition].position.offset: log.debug( "Advance position for partition %s from %s to %s (last record batch location plus one)" " to correct for deleted compacted messages and/or transactional control records", - partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header) - self._subscriptions.assignment[partition].position = next_offset_from_batch_header + partition, self._subscriptions.assignment[partition].position.offset, next_offset_from_batch_header) + self._subscriptions.assignment[partition].position = OffsetAndMetadata(next_offset_from_batch_header, '', -1) position = self._subscriptions.assignment[partition].position @@ -718,19 +724,28 @@ def _create_fetch_requests(self): if version < 5: partition_info = ( partition.partition, - position, + position.offset, self.config['max_partition_fetch_bytes'] ) + elif version <= 8: + partition_info = ( + partition.partition, + position.offset, + -1, # log_start_offset is used internally by brokers / replicas only + self.config['max_partition_fetch_bytes'], + ) else: partition_info = ( partition.partition, - position, + position.leader_epoch, + position.offset, -1, # log_start_offset is used internally by brokers / replicas only self.config['max_partition_fetch_bytes'], ) + fetchable[node_id][partition] = partition_info log.debug("Adding fetch request for partition %s at offset %d", - partition, position) + partition, position.offset) requests = {} for node_id, next_partitions in six.iteritems(fetchable): @@ -778,7 +793,10 @@ def _create_fetch_requests(self): fetch_offsets = {} for tp, partition_data in six.iteritems(next_partitions): - offset = partition_data[1] + if version <= 8: + offset = partition_data[1] + else: + offset = partition_data[2] fetch_offsets[tp] = offset requests[node_id] = (request, fetch_offsets) @@ -807,7 +825,7 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): tp = TopicPartition(topic, partition_data[0]) fetch_offset = fetch_offsets[tp] completed_fetch = CompletedFetch( - tp, fetch_offsets[tp], + tp, fetch_offset, response.API_VERSION, partition_data[1:], metric_aggregator @@ -847,18 +865,18 @@ def _parse_fetched_data(self, completed_fetch): # Note that the *response* may return a messageset that starts # earlier (e.g., compressed messages) or later (e.g., compacted topic) position = self._subscriptions.assignment[tp].position - if position is None or position != fetch_offset: + if position is None or position.offset != fetch_offset: log.debug("Discarding fetch response for partition %s" " since its offset %d does not match the" " expected offset %d", tp, fetch_offset, - position) + position.offset) return None records = MemoryRecords(completed_fetch.partition_data[-1]) if records.has_next(): log.debug("Adding fetched record for partition %s with" " offset %d to buffered record list", tp, - position) + position.offset) unpacked = list(self._unpack_records(tp, records)) parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked) if unpacked: @@ -883,16 +901,17 @@ def _parse_fetched_data(self, completed_fetch): self._sensors.record_topic_fetch_metrics(tp.topic, num_bytes, records_count) elif error_type in (Errors.NotLeaderForPartitionError, + Errors.ReplicaNotAvailableError, Errors.UnknownTopicOrPartitionError, Errors.KafkaStorageError): log.debug("Error fetching partition %s: %s", tp, error_type.__name__) self._client.cluster.request_update() elif error_type is Errors.OffsetOutOfRangeError: position = self._subscriptions.assignment[tp].position - if position is None or position != fetch_offset: + if position is None or position.offset != fetch_offset: log.debug("Discarding stale fetch response for partition %s" " since the fetched offset %d does not match the" - " current offset %d", tp, fetch_offset, position) + " current offset %d", tp, fetch_offset, position.offset) elif self._subscriptions.has_default_offset_reset_policy(): log.info("Fetch offset %s is out of range for topic-partition %s", fetch_offset, tp) self._subscriptions.need_offset_reset(tp) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index f150c4bd6..6f23bec8a 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -17,7 +17,7 @@ from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.metrics import MetricConfig, Metrics from kafka.protocol.list_offsets import OffsetResetStrategy -from kafka.structs import TopicPartition +from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.version import __version__ log = logging.getLogger(__name__) @@ -732,16 +732,16 @@ def position(self, partition): partition (TopicPartition): Partition to check Returns: - int: Offset + int: Offset or None """ if not isinstance(partition, TopicPartition): raise TypeError('partition must be a TopicPartition namedtuple') assert self._subscription.is_assigned(partition), 'Partition is not assigned' - offset = self._subscription.assignment[partition].position - if offset is None: + position = self._subscription.assignment[partition].position + if position is None: self._update_fetch_positions([partition]) - offset = self._subscription.assignment[partition].position - return offset + position = self._subscription.assignment[partition].position + return position.offset if position else None def highwater(self, partition): """Last known highwater offset for a partition. @@ -1144,7 +1144,7 @@ def _message_generator_v2(self): log.debug("Not returning fetched records for partition %s" " since it is no longer fetchable", tp) break - self._subscription.assignment[tp].position = record.offset + 1 + self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, '', -1) yield record def _message_generator(self): diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index a329ad3e9..b30922b3e 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -319,7 +319,7 @@ def all_consumed_offsets(self): all_consumed = {} for partition, state in six.iteritems(self.assignment): if state.has_valid_position: - all_consumed[partition] = OffsetAndMetadata(state.position, '') + all_consumed[partition] = state.position return all_consumed def need_offset_reset(self, partition, offset_reset_strategy=None): @@ -379,7 +379,7 @@ def __init__(self): self.paused = False # whether this partition has been paused by the user self.awaiting_reset = False # whether we are awaiting reset self.reset_strategy = None # the reset strategy if awaitingReset is set - self._position = None # offset exposed to the user + self._position = None # OffsetAndMetadata exposed to the user self.highwater = None self.drop_pending_record_batch = False # The last message offset hint available from a record batch with @@ -388,6 +388,7 @@ def __init__(self): def _set_position(self, offset): assert self.has_valid_position, 'Valid position required' + assert isinstance(offset, OffsetAndMetadata) self._position = offset def _get_position(self): @@ -403,7 +404,7 @@ def await_reset(self, strategy): self.has_valid_position = False def seek(self, offset): - self._position = offset + self._position = OffsetAndMetadata(offset, '', -1) self.awaiting_reset = False self.reset_strategy = None self.has_valid_position = True diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 3734e8817..36c91ee42 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -649,7 +649,7 @@ def _send_offset_commit_request(self, offsets): topic, [( partition, offset.offset, - -1, # leader_epoch + offset.leader_epoch, offset.metadata ) for partition, offset in six.iteritems(partitions)] ) for topic, partitions in six.iteritems(offset_data)] @@ -809,7 +809,6 @@ def _handle_offset_fetch_response(self, future, response): else: metadata, error_code = partition_data[2:] leader_epoch = -1 - # TODO: save leader_epoch! tp = TopicPartition(topic, partition) error_type = Errors.for_code(error_code) if error_type is not Errors.NoError: @@ -836,7 +835,8 @@ def _handle_offset_fetch_response(self, future, response): elif offset >= 0: # record the position with the offset # (-1 indicates no committed offset to fetch) - offsets[tp] = OffsetAndMetadata(offset, metadata) + # TODO: save leader_epoch + offsets[tp] = OffsetAndMetadata(offset, metadata, -1) else: log.debug("Group %s has no committed offset for partition" " %s", self.group_id, tp) diff --git a/kafka/errors.py b/kafka/errors.py index b8fa06708..aaba89d39 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -664,6 +664,7 @@ class UnknownLeaderEpochError(BrokerResponseError): message = 'UNKNOWN_LEADER_EPOCH' description = 'The leader epoch in the request is newer than the epoch on the broker.' retriable = True + invalid_metadata = True class UnsupportedCompressionTypeError(BrokerResponseError): diff --git a/kafka/protocol/list_offsets.py b/kafka/protocol/list_offsets.py index 9c5ad5edf..2e36dd660 100644 --- a/kafka/protocol/list_offsets.py +++ b/kafka/protocol/list_offsets.py @@ -166,7 +166,7 @@ class ListOffsetsRequest_v4(Request): ('topic', String('utf-8')), ('partitions', Array( ('partition', Int32), - ('current_leader_epoch', Int64), + ('current_leader_epoch', Int32), ('timestamp', Int64))))) ) DEFAULTS = { diff --git a/kafka/protocol/offset_for_leader_epoch.py b/kafka/protocol/offset_for_leader_epoch.py new file mode 100644 index 000000000..afe8284eb --- /dev/null +++ b/kafka/protocol/offset_for_leader_epoch.py @@ -0,0 +1,140 @@ +from __future__ import absolute_import + +from kafka.protocol.api import Request, Response +from kafka.protocol.types import Array, CompactArray, CompactString, Int16, Int32, Int64, Schema, String, TaggedFields + + +class OffsetForLeaderEpochResponse_v0(Request): + API_KEY = 23 + API_VERSION = 0 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('error_code', Int16), + ('partition', Int32), + ('end_offset', Int64)))))) + + +class OffsetForLeaderEpochResponse_v1(Request): + API_KEY = 23 + API_VERSION = 1 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('error_code', Int16), + ('partition', Int32), + ('leader_epoch', Int32), + ('end_offset', Int64)))))) + + +class OffsetForLeaderEpochResponse_v2(Request): + API_KEY = 23 + API_VERSION = 2 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('error_code', Int16), + ('partition', Int32), + ('leader_epoch', Int32), + ('end_offset', Int64)))))) + + +class OffsetForLeaderEpochResponse_v3(Request): + API_KEY = 23 + API_VERSION = 3 + SCHEMA = OffsetForLeaderEpochResponse_v2.SCHEMA + + +class OffsetForLeaderEpochResponse_v4(Request): + API_KEY = 23 + API_VERSION = 4 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', CompactArray( + ('topic', CompactString('utf-8')), + ('partitions', CompactArray( + ('error_code', Int16), + ('partition', Int32), + ('leader_epoch', Int32), + ('end_offset', Int64), + ('tags', TaggedFields))), + ('tags', TaggedFields))), + ('tags', TaggedFields)) + + +class OffsetForLeaderEpochRequest_v0(Request): + API_KEY = 23 + API_VERSION = 0 + RESPONSE_TYPE = OffsetForLeaderEpochResponse_v0 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('leader_epoch', Int32)))))) + + +class OffsetForLeaderEpochRequest_v1(Request): + API_KEY = 23 + API_VERSION = 1 + RESPONSE_TYPE = OffsetForLeaderEpochResponse_v1 + SCHEMA = OffsetForLeaderEpochRequest_v0.SCHEMA + + +class OffsetForLeaderEpochRequest_v2(Request): + API_KEY = 23 + API_VERSION = 2 + RESPONSE_TYPE = OffsetForLeaderEpochResponse_v2 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('leader_epoch', Int32)))))) + + +class OffsetForLeaderEpochRequest_v3(Request): + API_KEY = 23 + API_VERSION = 3 + RESPONSE_TYPE = OffsetForLeaderEpochResponse_v3 + SCHEMA = Schema( + ('replica_id', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('leader_epoch', Int32)))))) + + +class OffsetForLeaderEpochRequest_v4(Request): + API_KEY = 23 + API_VERSION = 4 + RESPONSE_TYPE = OffsetForLeaderEpochResponse_v4 + SCHEMA = Schema( + ('replica_id', Int32), + ('topics', CompactArray( + ('topic', CompactString('utf-8')), + ('partitions', CompactArray( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('leader_epoch', Int32), + ('tags', TaggedFields))), + ('tags', TaggedFields))), + ('tags', TaggedFields)) + +OffsetForLeaderEpochRequest = [ + OffsetForLeaderEpochRequest_v0, OffsetForLeaderEpochRequest_v1, + OffsetForLeaderEpochRequest_v2, OffsetForLeaderEpochRequest_v3, + OffsetForLeaderEpochRequest_v4, +] +OffsetForLeaderEpochResponse = [ + OffsetForLeaderEpochResponse_v0, OffsetForLeaderEpochResponse_v1, + OffsetForLeaderEpochResponse_v2, OffsetForLeaderEpochResponse_v3, + OffsetForLeaderEpochResponse_v4, +] diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index b3a6fd082..14732cb06 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -136,6 +136,10 @@ def __init__(self, buffer): def base_offset(self): return self._header_data[0] + @property + def leader_epoch(self): + return self._header_data[2] + @property def magic(self): return self._header_data[3] diff --git a/kafka/structs.py b/kafka/structs.py index dc4f07bee..16ba0daac 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -55,10 +55,10 @@ Keyword Arguments: offset (int): The offset to be committed metadata (str): Non-null metadata + leader_epoch (int): The last known epoch from the leader / broker """ OffsetAndMetadata = namedtuple("OffsetAndMetadata", - # TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata) - ["offset", "metadata"]) + ["offset", "metadata", "leader_epoch"]) """An offset and timestamp tuple @@ -66,9 +66,10 @@ Keyword Arguments: offset (int): An offset timestamp (int): The timestamp associated to the offset + leader_epoch (int): The last known epoch from the leader / broker """ OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", - ["offset", "timestamp"]) + ["offset", "timestamp", "leader_epoch"]) MemberInformation = namedtuple("MemberInformation", ["member_id", "client_id", "client_host", "member_metadata", "member_assignment"]) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 6789329b4..5aeb63d1d 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,7 +1,7 @@ import logging import time -from mock import patch +from mock import patch, ANY import pytest from kafka.vendor.six.moves import range @@ -258,9 +258,10 @@ def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_pro tp1: send_time }) + leader_epoch = ANY if env_kafka_version() >= (2, 1) else -1 assert offsets == { - tp0: OffsetAndTimestamp(p0msg.offset, send_time), - tp1: OffsetAndTimestamp(p1msg.offset, send_time) + tp0: OffsetAndTimestamp(p0msg.offset, send_time, leader_epoch), + tp1: OffsetAndTimestamp(p1msg.offset, send_time, leader_epoch) } offsets = consumer.beginning_offsets([tp0, tp1]) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index c0e7c6d60..09422790e 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -230,13 +230,13 @@ def test_need_rejoin(coordinator): def test_refresh_committed_offsets_if_needed(mocker, coordinator): mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets', return_value = { - TopicPartition('foobar', 0): OffsetAndMetadata(123, b''), - TopicPartition('foobar', 1): OffsetAndMetadata(234, b'')}) + TopicPartition('foobar', 0): OffsetAndMetadata(123, '', -1), + TopicPartition('foobar', 1): OffsetAndMetadata(234, '', -1)}) coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)]) assert coordinator._subscription.needs_fetch_committed_offsets is True coordinator.refresh_committed_offsets_if_needed() assignment = coordinator._subscription.assignment - assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, b'') + assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, '', -1) assert TopicPartition('foobar', 1) not in assignment assert coordinator._subscription.needs_fetch_committed_offsets is False @@ -303,8 +303,8 @@ def test_close(mocker, coordinator): @pytest.fixture def offsets(): return { - TopicPartition('foobar', 0): OffsetAndMetadata(123, b''), - TopicPartition('foobar', 1): OffsetAndMetadata(234, b''), + TopicPartition('foobar', 0): OffsetAndMetadata(123, '', -1), + TopicPartition('foobar', 1): OffsetAndMetadata(234, '', -1), } @@ -594,27 +594,27 @@ def test_send_offset_fetch_request_success(patched_coord, partitions): @pytest.mark.parametrize('response,error,dead', [ - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 14), (1, 234, '', 14)])]), Errors.GroupLoadInProgressError, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 16), (1, 234, '', 16)])]), Errors.NotCoordinatorForGroupError, True), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 25), (1, 234, '', 25)])]), Errors.UnknownMemberIdError, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 22), (1, 234, '', 22)])]), Errors.IllegalGenerationError, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 29), (1, 234, '', 29)])]), Errors.TopicAuthorizationFailedError, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), + (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 0), (1, 234, '', 0)])]), None, False), - (OffsetFetchResponse[1]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), + (OffsetFetchResponse[1]([('foobar', [(0, 123, '', 0), (1, 234, '', 0)])]), None, False), - (OffsetFetchResponse[2]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])], 0), + (OffsetFetchResponse[2]([('foobar', [(0, 123, '', 0), (1, 234, '', 0)])], 0), None, False), - (OffsetFetchResponse[3](0, [('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])], 0), + (OffsetFetchResponse[3](0, [('foobar', [(0, 123, '', 0), (1, 234, '', 0)])], 0), None, False), - (OffsetFetchResponse[4](0, [('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])], 0), + (OffsetFetchResponse[4](0, [('foobar', [(0, 123, '', 0), (1, 234, '', 0)])], 0), None, False), - (OffsetFetchResponse[5](0, [('foobar', [(0, 123, -1, b'', 0), (1, 234, -1, b'', 0)])], 0), + (OffsetFetchResponse[5](0, [('foobar', [(0, 123, -1, '', 0), (1, 234, -1, '', 0)])], 0), None, False), ]) def test_handle_offset_fetch_response(patched_coord, offsets, diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 256c24fda..7e948e3cb 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -24,7 +24,7 @@ UnknownTopicOrPartitionError, OffsetOutOfRangeError ) from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords -from kafka.structs import OffsetAndMetadata, TopicPartition +from kafka.structs import OffsetAndMetadata, OffsetAndTimestamp, TopicPartition @pytest.fixture @@ -108,6 +108,7 @@ def build_fetch_offsets(request): def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): fetcher._client._api_versions = BROKER_API_VERSIONS[api_version] mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) + mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) by_node = fetcher._create_fetch_requests() requests_and_offsets = by_node.values() assert set([r.API_VERSION for (r, _offsets) in requests_and_offsets]) == set([fetch_version]) @@ -138,7 +139,7 @@ def test_update_fetch_positions(fetcher, topic, mocker): fetcher._reset_offset.reset_mock() fetcher._subscriptions.need_offset_reset(partition) fetcher._subscriptions.assignment[partition].awaiting_reset = False - fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, b'') + fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, '', -1) mocker.patch.object(fetcher._subscriptions, 'seek') fetcher.update_fetch_positions([partition]) assert fetcher._reset_offset.call_count == 0 @@ -152,10 +153,10 @@ def test__reset_offset(fetcher, mocker): fetcher._subscriptions.need_offset_reset(tp) mocked = mocker.patch.object(fetcher, '_retrieve_offsets') - mocked.return_value = {tp: (1001, None)} + mocked.return_value = {tp: OffsetAndTimestamp(1001, None, -1)} fetcher._reset_offset(tp) assert not fetcher._subscriptions.assignment[tp].awaiting_reset - assert fetcher._subscriptions.assignment[tp].position == 1001 + assert fetcher._subscriptions.assignment[tp].position.offset == 1001 def test__send_list_offsets_requests(fetcher, mocker): @@ -175,6 +176,7 @@ def send_side_effect(*args, **kw): # always as available mocked_leader.side_effect = itertools.chain( [None, -1], itertools.cycle([0])) + mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) # Leader == None fut = fetcher._send_list_offsets_requests({tp: 0}) @@ -224,6 +226,7 @@ def send_side_effect(node_id, timestamps): mocked_leader = mocker.patch.object( fetcher._client.cluster, "leader_for_partition") mocked_leader.side_effect = itertools.cycle([0, 1]) + mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) # -- All node succeeded case tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)]) @@ -241,8 +244,8 @@ def send_side_effect(node_id, timestamps): else: second_future = f assert req_by_node == { - 0: {tp1: 0, tp3: 0}, - 1: {tp2: 0, tp4: 0} + 0: {tp1: (0, -1), tp3: (0, -1)}, + 1: {tp2: (0, -1), tp4: (0, -1)} } # We only resolved 1 future so far, so result future is not yet ready @@ -279,7 +282,7 @@ def test__handle_list_offsets_response_v1(fetcher, mocker): ]) fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() - assert fut.value == {TopicPartition("topic", 1): (9999, 1000)} + assert fut.value == {TopicPartition("topic", 1): OffsetAndTimestamp(9999, 1000, -1)} # Broker returns NotLeaderForPartitionError fut = Future() @@ -322,7 +325,7 @@ def test__handle_list_offsets_response_v2_v3(fetcher, mocker): ]) fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() - assert fut.value == {TopicPartition("topic", 0): (9999, 1000)} + assert fut.value == {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)} # v3 response is the same format fut = Future() @@ -332,7 +335,29 @@ def test__handle_list_offsets_response_v2_v3(fetcher, mocker): ]) fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() - assert fut.value == {TopicPartition("topic", 0): (9999, 1000)} + assert fut.value == {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)} + + +def test__handle_list_offsets_response_v4_v5(fetcher, mocker): + # includes leader_epoch + fut = Future() + res = ListOffsetsResponse[4]( + 123, # throttle_time_ms + [("topic", [(0, 0, 1000, 9999, 1234)]) + ]) + fetcher._handle_list_offsets_response(fut, res) + assert fut.succeeded() + assert fut.value == {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)} + + # v5 response is the same format + fut = Future() + res = ListOffsetsResponse[5]( + 123, # throttle_time_ms + [("topic", [(0, 0, 1000, 9999, 1234)]) + ]) + fetcher._handle_list_offsets_response(fut, res) + assert fut.succeeded() + assert fut.value == {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)} def test_fetched_records(fetcher, topic, mocker): @@ -546,7 +571,7 @@ def test_partition_records_offset(): batch_end = 130 fetch_offset = 123 tp = TopicPartition('foo', 0) - messages = [ConsumerRecord(tp.topic, tp.partition, i, + messages = [ConsumerRecord(tp.topic, tp.partition, -1, i, None, None, 'key', 'value', [], 'checksum', 0, 0, -1) for i in range(batch_start, batch_end)] records = Fetcher.PartitionRecords(fetch_offset, None, messages) @@ -571,7 +596,7 @@ def test_partition_records_no_fetch_offset(): batch_end = 100 fetch_offset = 123 tp = TopicPartition('foo', 0) - messages = [ConsumerRecord(tp.topic, tp.partition, i, + messages = [ConsumerRecord(tp.topic, tp.partition, -1, i, None, None, 'key', 'value', None, 'checksum', 0, 0, -1) for i in range(batch_start, batch_end)] records = Fetcher.PartitionRecords(fetch_offset, None, messages) @@ -586,7 +611,7 @@ def test_partition_records_compacted_offset(): batch_end = 100 fetch_offset = 42 tp = TopicPartition('foo', 0) - messages = [ConsumerRecord(tp.topic, tp.partition, i, + messages = [ConsumerRecord(tp.topic, tp.partition, -1, i, None, None, 'key', 'value', None, 'checksum', 0, 0, -1) for i in range(batch_start, batch_end) if i != fetch_offset] records = Fetcher.PartitionRecords(fetch_offset, None, messages)