diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 8f588aa32..764ea9467 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -14,10 +14,8 @@ from kafka.future import Future from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.commit import OffsetCommitRequest from kafka.protocol.find_coordinator import FindCoordinatorRequest -from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest, - LeaveGroupRequest, SyncGroupRequest) +from kafka.protocol.group import HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID log = logging.getLogger('kafka.coordinator') @@ -34,10 +32,7 @@ def __init__(self, generation_id, member_id, protocol): self.member_id = member_id self.protocol = protocol -Generation.NO_GENERATION = Generation( - OffsetCommitRequest[2].DEFAULT_GENERATION_ID, - JoinGroupRequest[0].UNKNOWN_MEMBER_ID, - None) +Generation.NO_GENERATION = Generation(DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, None) class UnjoinedGroupException(Errors.KafkaError): diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 5f62f730f..5850d1a2d 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -575,7 +575,7 @@ def _send_offset_commit_request(self, offsets): offset_data[tp.topic][tp.partition] = offset if self._subscription.partitions_auto_assigned(): - generation = self.generation() + generation = self.generation() or Generation.NO_GENERATION else: generation = Generation.NO_GENERATION @@ -585,8 +585,35 @@ def _send_offset_commit_request(self, offsets): if self.config['api_version'] >= (0, 9) and generation is None: return Future().failure(Errors.CommitFailedError()) - version = self._client.api_version(OffsetCommitRequest, max_version=2) - if version == 2: + version = self._client.api_version(OffsetCommitRequest, max_version=6) + if version == 0: + request = OffsetCommitRequest[version]( + self.group_id, + [( + topic, [( + partition, + offset.offset, + offset.metadata + ) for partition, offset in six.iteritems(partitions)] + ) for topic, partitions in six.iteritems(offset_data)] + ) + elif version == 1: + request = OffsetCommitRequest[version]( + self.group_id, + # This api version was only used in v0.8.2, prior to join group apis + # so this always ends up as NO_GENERATION + generation.generation_id, + generation.member_id, + [( + topic, [( + partition, + offset.offset, + -1, # timestamp, unused + offset.metadata + ) for partition, offset in six.iteritems(partitions)] + ) for topic, partitions in six.iteritems(offset_data)] + ) + elif version <= 4: request = OffsetCommitRequest[version]( self.group_id, generation.generation_id, @@ -600,25 +627,29 @@ def _send_offset_commit_request(self, offsets): ) for partition, offset in six.iteritems(partitions)] ) for topic, partitions in six.iteritems(offset_data)] ) - elif version == 1: + elif version <= 5: request = OffsetCommitRequest[version]( - self.group_id, -1, '', + self.group_id, + generation.generation_id, + generation.member_id, [( topic, [( partition, offset.offset, - -1, offset.metadata ) for partition, offset in six.iteritems(partitions)] ) for topic, partitions in six.iteritems(offset_data)] ) - elif version == 0: + else: request = OffsetCommitRequest[version]( self.group_id, + generation.generation_id, + generation.member_id, [( topic, [( partition, offset.offset, + -1, # leader_epoch offset.metadata ) for partition, offset in six.iteritems(partitions)] ) for topic, partitions in six.iteritems(offset_data)] @@ -634,6 +665,8 @@ def _send_offset_commit_request(self, offsets): return future def _handle_offset_commit_response(self, offsets, future, send_time, response): + if response.API_VERSION >= 3 and response.throttle_time_ms > 0: + log.warning() # TODO look at adding request_latency_ms to response (like java kafka) self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000) unauthorized_topics = set() @@ -735,7 +768,9 @@ def _send_offset_fetch_request(self, partitions): for tp in partitions: topic_partitions[tp.topic].add(tp.partition) - version = self._client.api_version(OffsetFetchRequest, max_version=1) + version = self._client.api_version(OffsetFetchRequest, max_version=5) + # Starting in version 2, the request can contain a null topics array to indicate that offsets should be fetched + # TODO: support request = OffsetFetchRequest[version]( self.group_id, list(topic_partitions.items()) @@ -749,9 +784,23 @@ def _send_offset_fetch_request(self, partitions): return future def _handle_offset_fetch_response(self, future, response): + if response.API_VERSION >= 3 and response.throttle_time_ms > 0: + log.warning() + + if response.API_VERSION >= 2 and response.error_code != Errors.NoError.errno: + error_type = Errors.for_code(response.error_code) + # TODO: handle... + offsets = {} for topic, partitions in response.topics: - for partition, offset, metadata, error_code in partitions: + for partition_data in partitions: + partition, offset = partition_data[:2] + if response.API_VERSION >= 5: + leader_epoch, metadata, error_code = partition_data[2:] + else: + metadata, error_code = partition_data[2:] + leader_epoch = -1 + # TODO: save leader_epoch! tp = TopicPartition(topic, partition) error_type = Errors.for_code(error_code) if error_type is not Errors.NoError: diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index f5828ba59..53c2466fe 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -41,6 +41,24 @@ class OffsetCommitResponse_v3(Response): ) +class OffsetCommitResponse_v4(Response): + API_KEY = 8 + API_VERSION = 4 + SCHEMA = OffsetCommitResponse_v3.SCHEMA + + +class OffsetCommitResponse_v5(Response): + API_KEY = 8 + API_VERSION = 5 + SCHEMA = OffsetCommitResponse_v4.SCHEMA + + +class OffsetCommitResponse_v6(Response): + API_KEY = 8 + API_VERSION = 6 + SCHEMA = OffsetCommitResponse_v5.SCHEMA + + class OffsetCommitRequest_v0(Request): API_KEY = 8 API_VERSION = 0 # Zookeeper-backed storage @@ -76,13 +94,13 @@ class OffsetCommitRequest_v1(Request): class OffsetCommitRequest_v2(Request): API_KEY = 8 - API_VERSION = 2 # added retention_time, dropped timestamp + API_VERSION = 2 RESPONSE_TYPE = OffsetCommitResponse_v2 SCHEMA = Schema( ('consumer_group', String('utf-8')), ('consumer_group_generation_id', Int32), ('consumer_id', String('utf-8')), - ('retention_time', Int64), + ('retention_time', Int64), # added retention_time, dropped timestamp ('topics', Array( ('topic', String('utf-8')), ('partitions', Array( @@ -90,7 +108,6 @@ class OffsetCommitRequest_v2(Request): ('offset', Int64), ('metadata', String('utf-8')))))) ) - DEFAULT_GENERATION_ID = -1 DEFAULT_RETENTION_TIME = -1 @@ -99,15 +116,63 @@ class OffsetCommitRequest_v3(Request): API_VERSION = 3 RESPONSE_TYPE = OffsetCommitResponse_v3 SCHEMA = OffsetCommitRequest_v2.SCHEMA + DEFAULT_RETENTION_TIME = -1 + + +class OffsetCommitRequest_v4(Request): + API_KEY = 8 + API_VERSION = 4 + RESPONSE_TYPE = OffsetCommitResponse_v4 + SCHEMA = OffsetCommitRequest_v3.SCHEMA + DEFAULT_RETENTION_TIME = -1 + + +class OffsetCommitRequest_v5(Request): + API_KEY = 8 + API_VERSION = 5 # drops retention_time + RESPONSE_TYPE = OffsetCommitResponse_v5 + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('consumer_group_generation_id', Int32), + ('consumer_id', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')))))) + ) + + +class OffsetCommitRequest_v6(Request): + API_KEY = 8 + API_VERSION = 6 + RESPONSE_TYPE = OffsetCommitResponse_v6 + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('consumer_group_generation_id', Int32), + ('consumer_id', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('leader_epoch', Int32), # added for fencing / kip-320. default -1 + ('metadata', String('utf-8')))))) + ) OffsetCommitRequest = [ OffsetCommitRequest_v0, OffsetCommitRequest_v1, - OffsetCommitRequest_v2, OffsetCommitRequest_v3 + OffsetCommitRequest_v2, OffsetCommitRequest_v3, + OffsetCommitRequest_v4, OffsetCommitRequest_v5, + OffsetCommitRequest_v6, ] OffsetCommitResponse = [ OffsetCommitResponse_v0, OffsetCommitResponse_v1, - OffsetCommitResponse_v2, OffsetCommitResponse_v3 + OffsetCommitResponse_v2, OffsetCommitResponse_v3, + OffsetCommitResponse_v4, OffsetCommitResponse_v5, + OffsetCommitResponse_v6, ] @@ -163,6 +228,29 @@ class OffsetFetchResponse_v3(Response): ) +class OffsetFetchResponse_v4(Response): + API_KEY = 9 + API_VERSION = 4 + SCHEMA = OffsetFetchResponse_v3.SCHEMA + + +class OffsetFetchResponse_v5(Response): + API_KEY = 9 + API_VERSION = 5 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('leader_epoch', Int32), + ('metadata', String('utf-8')), + ('error_code', Int16))))), + ('error_code', Int16) + ) + + class OffsetFetchRequest_v0(Request): API_KEY = 9 API_VERSION = 0 # zookeeper-backed storage @@ -199,11 +287,27 @@ class OffsetFetchRequest_v3(Request): SCHEMA = OffsetFetchRequest_v2.SCHEMA +class OffsetFetchRequest_v4(Request): + API_KEY = 9 + API_VERSION = 4 + RESPONSE_TYPE = OffsetFetchResponse_v4 + SCHEMA = OffsetFetchRequest_v3.SCHEMA + + +class OffsetFetchRequest_v5(Request): + API_KEY = 9 + API_VERSION = 5 + RESPONSE_TYPE = OffsetFetchResponse_v5 + SCHEMA = OffsetFetchRequest_v4.SCHEMA + + OffsetFetchRequest = [ OffsetFetchRequest_v0, OffsetFetchRequest_v1, OffsetFetchRequest_v2, OffsetFetchRequest_v3, + OffsetFetchRequest_v4, OffsetFetchRequest_v5, ] OffsetFetchResponse = [ OffsetFetchResponse_v0, OffsetFetchResponse_v1, OffsetFetchResponse_v2, OffsetFetchResponse_v3, + OffsetFetchResponse_v4, OffsetFetchResponse_v5, ] diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index bcb96553b..170e49160 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -5,6 +5,10 @@ from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String +DEFAULT_GENERATION_ID = -1 +UNKNOWN_MEMBER_ID = '' + + class JoinGroupResponse_v0(Response): API_KEY = 11 API_VERSION = 0 @@ -55,7 +59,6 @@ class JoinGroupRequest_v0(Request): ('protocol_name', String('utf-8')), ('protocol_metadata', Bytes))) ) - UNKNOWN_MEMBER_ID = '' class JoinGroupRequest_v1(Request): @@ -72,7 +75,6 @@ class JoinGroupRequest_v1(Request): ('protocol_name', String('utf-8')), ('protocol_metadata', Bytes))) ) - UNKNOWN_MEMBER_ID = '' class JoinGroupRequest_v2(Request): @@ -80,7 +82,6 @@ class JoinGroupRequest_v2(Request): API_VERSION = 2 RESPONSE_TYPE = JoinGroupResponse_v2 SCHEMA = JoinGroupRequest_v1.SCHEMA - UNKNOWN_MEMBER_ID = '' JoinGroupRequest = [ diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 0c4ee6d33..c0e7c6d60 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -439,7 +439,11 @@ def test_send_offset_commit_request_fail(mocker, patched_coord, offsets): @pytest.mark.parametrize('api_version,req_type', [ ((0, 8, 1), OffsetCommitRequest[0]), ((0, 8, 2), OffsetCommitRequest[1]), - ((0, 9), OffsetCommitRequest[2])]) + ((0, 9), OffsetCommitRequest[2]), + ((0, 11), OffsetCommitRequest[3]), + ((2, 0), OffsetCommitRequest[4]), + ((2, 1), OffsetCommitRequest[6]), +]) def test_send_offset_commit_request_versions(patched_coord, offsets, api_version, req_type): expect_node = 0 @@ -499,13 +503,27 @@ def test_send_offset_commit_request_success(mocker, patched_coord, offsets): Errors.InvalidTopicError, False), (OffsetCommitResponse[0]([('foobar', [(0, 29), (1, 29)])]), Errors.TopicAuthorizationFailedError, False), + (OffsetCommitResponse[0]([('foobar', [(0, 0), (1, 0)])]), + None, False), + (OffsetCommitResponse[1]([('foobar', [(0, 0), (1, 0)])]), + None, False), + (OffsetCommitResponse[2]([('foobar', [(0, 0), (1, 0)])]), + None, False), + (OffsetCommitResponse[3](0, [('foobar', [(0, 0), (1, 0)])]), + None, False), + (OffsetCommitResponse[4](0, [('foobar', [(0, 0), (1, 0)])]), + None, False), + (OffsetCommitResponse[5](0, [('foobar', [(0, 0), (1, 0)])]), + None, False), + (OffsetCommitResponse[6](0, [('foobar', [(0, 0), (1, 0)])]), + None, False), ]) def test_handle_offset_commit_response(mocker, patched_coord, offsets, response, error, dead): future = Future() patched_coord._handle_offset_commit_response(offsets, future, time.time(), response) - assert isinstance(future.exception, error) + assert isinstance(future.exception, error) if error else True assert patched_coord.coordinator_id is (None if dead else 0) @@ -534,7 +552,12 @@ def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions): @pytest.mark.parametrize('api_version,req_type', [ ((0, 8, 1), OffsetFetchRequest[0]), ((0, 8, 2), OffsetFetchRequest[1]), - ((0, 9), OffsetFetchRequest[1])]) + ((0, 9), OffsetFetchRequest[1]), + ((0, 10, 2), OffsetFetchRequest[2]), + ((0, 11), OffsetFetchRequest[3]), + ((2, 0), OffsetFetchRequest[4]), + ((2, 1), OffsetFetchRequest[5]), +]) def test_send_offset_fetch_request_versions(patched_coord, partitions, api_version, req_type): # assuming fixture sets coordinator=0, least_loaded_node=1 @@ -583,6 +606,16 @@ def test_send_offset_fetch_request_success(patched_coord, partitions): Errors.TopicAuthorizationFailedError, False), (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), None, False), + (OffsetFetchResponse[1]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), + None, False), + (OffsetFetchResponse[2]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])], 0), + None, False), + (OffsetFetchResponse[3](0, [('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])], 0), + None, False), + (OffsetFetchResponse[4](0, [('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])], 0), + None, False), + (OffsetFetchResponse[5](0, [('foobar', [(0, 123, -1, b'', 0), (1, 234, -1, b'', 0)])], 0), + None, False), ]) def test_handle_offset_fetch_response(patched_coord, offsets, response, error, dead):