Skip to content

Support OffsetFetch v5 / OffsetCommit v6 (2.1 baseline) #2505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
from kafka.future import Future
from kafka.metrics import AnonMeasurable
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.commit import OffsetCommitRequest
from kafka.protocol.find_coordinator import FindCoordinatorRequest
from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest,
LeaveGroupRequest, SyncGroupRequest)
from kafka.protocol.group import HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID

log = logging.getLogger('kafka.coordinator')

Expand All @@ -34,10 +32,7 @@ def __init__(self, generation_id, member_id, protocol):
self.member_id = member_id
self.protocol = protocol

Generation.NO_GENERATION = Generation(
OffsetCommitRequest[2].DEFAULT_GENERATION_ID,
JoinGroupRequest[0].UNKNOWN_MEMBER_ID,
None)
Generation.NO_GENERATION = Generation(DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, None)


class UnjoinedGroupException(Errors.KafkaError):
Expand Down
67 changes: 58 additions & 9 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ def _send_offset_commit_request(self, offsets):
offset_data[tp.topic][tp.partition] = offset

if self._subscription.partitions_auto_assigned():
generation = self.generation()
generation = self.generation() or Generation.NO_GENERATION
else:
generation = Generation.NO_GENERATION

Expand All @@ -585,8 +585,35 @@ def _send_offset_commit_request(self, offsets):
if self.config['api_version'] >= (0, 9) and generation is None:
return Future().failure(Errors.CommitFailedError())

version = self._client.api_version(OffsetCommitRequest, max_version=2)
if version == 2:
version = self._client.api_version(OffsetCommitRequest, max_version=6)
if version == 0:
request = OffsetCommitRequest[version](
self.group_id,
[(
topic, [(
partition,
offset.offset,
offset.metadata
) for partition, offset in six.iteritems(partitions)]
) for topic, partitions in six.iteritems(offset_data)]
)
elif version == 1:
request = OffsetCommitRequest[version](
self.group_id,
# This api version was only used in v0.8.2, prior to join group apis
# so this always ends up as NO_GENERATION
generation.generation_id,
generation.member_id,
[(
topic, [(
partition,
offset.offset,
-1, # timestamp, unused
offset.metadata
) for partition, offset in six.iteritems(partitions)]
) for topic, partitions in six.iteritems(offset_data)]
)
elif version <= 4:
request = OffsetCommitRequest[version](
self.group_id,
generation.generation_id,
Expand All @@ -600,25 +627,29 @@ def _send_offset_commit_request(self, offsets):
) for partition, offset in six.iteritems(partitions)]
) for topic, partitions in six.iteritems(offset_data)]
)
elif version == 1:
elif version <= 5:
request = OffsetCommitRequest[version](
self.group_id, -1, '',
self.group_id,
generation.generation_id,
generation.member_id,
[(
topic, [(
partition,
offset.offset,
-1,
offset.metadata
) for partition, offset in six.iteritems(partitions)]
) for topic, partitions in six.iteritems(offset_data)]
)
elif version == 0:
else:
request = OffsetCommitRequest[version](
self.group_id,
generation.generation_id,
generation.member_id,
[(
topic, [(
partition,
offset.offset,
-1, # leader_epoch
offset.metadata
) for partition, offset in six.iteritems(partitions)]
) for topic, partitions in six.iteritems(offset_data)]
Expand All @@ -634,6 +665,8 @@ def _send_offset_commit_request(self, offsets):
return future

def _handle_offset_commit_response(self, offsets, future, send_time, response):
if response.API_VERSION >= 3 and response.throttle_time_ms > 0:
log.warning()
# TODO look at adding request_latency_ms to response (like java kafka)
self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
unauthorized_topics = set()
Expand Down Expand Up @@ -735,7 +768,9 @@ def _send_offset_fetch_request(self, partitions):
for tp in partitions:
topic_partitions[tp.topic].add(tp.partition)

version = self._client.api_version(OffsetFetchRequest, max_version=1)
version = self._client.api_version(OffsetFetchRequest, max_version=5)
# Starting in version 2, the request can contain a null topics array to indicate that offsets should be fetched
# TODO: support
request = OffsetFetchRequest[version](
self.group_id,
list(topic_partitions.items())
Expand All @@ -749,9 +784,23 @@ def _send_offset_fetch_request(self, partitions):
return future

def _handle_offset_fetch_response(self, future, response):
if response.API_VERSION >= 3 and response.throttle_time_ms > 0:
log.warning()

if response.API_VERSION >= 2 and response.error_code != Errors.NoError.errno:
error_type = Errors.for_code(response.error_code)
# TODO: handle...

offsets = {}
for topic, partitions in response.topics:
for partition, offset, metadata, error_code in partitions:
for partition_data in partitions:
partition, offset = partition_data[:2]
if response.API_VERSION >= 5:
leader_epoch, metadata, error_code = partition_data[2:]
else:
metadata, error_code = partition_data[2:]
leader_epoch = -1
# TODO: save leader_epoch!
tp = TopicPartition(topic, partition)
error_type = Errors.for_code(error_code)
if error_type is not Errors.NoError:
Expand Down
114 changes: 109 additions & 5 deletions kafka/protocol/commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,24 @@ class OffsetCommitResponse_v3(Response):
)


class OffsetCommitResponse_v4(Response):
API_KEY = 8
API_VERSION = 4
SCHEMA = OffsetCommitResponse_v3.SCHEMA


class OffsetCommitResponse_v5(Response):
API_KEY = 8
API_VERSION = 5
SCHEMA = OffsetCommitResponse_v4.SCHEMA


class OffsetCommitResponse_v6(Response):
API_KEY = 8
API_VERSION = 6
SCHEMA = OffsetCommitResponse_v5.SCHEMA


class OffsetCommitRequest_v0(Request):
API_KEY = 8
API_VERSION = 0 # Zookeeper-backed storage
Expand Down Expand Up @@ -76,21 +94,20 @@ class OffsetCommitRequest_v1(Request):

class OffsetCommitRequest_v2(Request):
API_KEY = 8
API_VERSION = 2 # added retention_time, dropped timestamp
API_VERSION = 2
RESPONSE_TYPE = OffsetCommitResponse_v2
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('consumer_group_generation_id', Int32),
('consumer_id', String('utf-8')),
('retention_time', Int64),
('retention_time', Int64), # added retention_time, dropped timestamp
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('offset', Int64),
('metadata', String('utf-8'))))))
)
DEFAULT_GENERATION_ID = -1
DEFAULT_RETENTION_TIME = -1


Expand All @@ -99,15 +116,63 @@ class OffsetCommitRequest_v3(Request):
API_VERSION = 3
RESPONSE_TYPE = OffsetCommitResponse_v3
SCHEMA = OffsetCommitRequest_v2.SCHEMA
DEFAULT_RETENTION_TIME = -1


class OffsetCommitRequest_v4(Request):
API_KEY = 8
API_VERSION = 4
RESPONSE_TYPE = OffsetCommitResponse_v4
SCHEMA = OffsetCommitRequest_v3.SCHEMA
DEFAULT_RETENTION_TIME = -1


class OffsetCommitRequest_v5(Request):
API_KEY = 8
API_VERSION = 5 # drops retention_time
RESPONSE_TYPE = OffsetCommitResponse_v5
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('consumer_group_generation_id', Int32),
('consumer_id', String('utf-8')),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('offset', Int64),
('metadata', String('utf-8'))))))
)


class OffsetCommitRequest_v6(Request):
API_KEY = 8
API_VERSION = 6
RESPONSE_TYPE = OffsetCommitResponse_v6
SCHEMA = Schema(
('consumer_group', String('utf-8')),
('consumer_group_generation_id', Int32),
('consumer_id', String('utf-8')),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('offset', Int64),
('leader_epoch', Int32), # added for fencing / kip-320. default -1
('metadata', String('utf-8'))))))
)


OffsetCommitRequest = [
OffsetCommitRequest_v0, OffsetCommitRequest_v1,
OffsetCommitRequest_v2, OffsetCommitRequest_v3
OffsetCommitRequest_v2, OffsetCommitRequest_v3,
OffsetCommitRequest_v4, OffsetCommitRequest_v5,
OffsetCommitRequest_v6,
]
OffsetCommitResponse = [
OffsetCommitResponse_v0, OffsetCommitResponse_v1,
OffsetCommitResponse_v2, OffsetCommitResponse_v3
OffsetCommitResponse_v2, OffsetCommitResponse_v3,
OffsetCommitResponse_v4, OffsetCommitResponse_v5,
OffsetCommitResponse_v6,
]


Expand Down Expand Up @@ -163,6 +228,29 @@ class OffsetFetchResponse_v3(Response):
)


class OffsetFetchResponse_v4(Response):
API_KEY = 9
API_VERSION = 4
SCHEMA = OffsetFetchResponse_v3.SCHEMA


class OffsetFetchResponse_v5(Response):
API_KEY = 9
API_VERSION = 5
SCHEMA = Schema(
('throttle_time_ms', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('offset', Int64),
('leader_epoch', Int32),
('metadata', String('utf-8')),
('error_code', Int16))))),
('error_code', Int16)
)


class OffsetFetchRequest_v0(Request):
API_KEY = 9
API_VERSION = 0 # zookeeper-backed storage
Expand Down Expand Up @@ -199,11 +287,27 @@ class OffsetFetchRequest_v3(Request):
SCHEMA = OffsetFetchRequest_v2.SCHEMA


class OffsetFetchRequest_v4(Request):
API_KEY = 9
API_VERSION = 4
RESPONSE_TYPE = OffsetFetchResponse_v4
SCHEMA = OffsetFetchRequest_v3.SCHEMA


class OffsetFetchRequest_v5(Request):
API_KEY = 9
API_VERSION = 5
RESPONSE_TYPE = OffsetFetchResponse_v5
SCHEMA = OffsetFetchRequest_v4.SCHEMA


OffsetFetchRequest = [
OffsetFetchRequest_v0, OffsetFetchRequest_v1,
OffsetFetchRequest_v2, OffsetFetchRequest_v3,
OffsetFetchRequest_v4, OffsetFetchRequest_v5,
]
OffsetFetchResponse = [
OffsetFetchResponse_v0, OffsetFetchResponse_v1,
OffsetFetchResponse_v2, OffsetFetchResponse_v3,
OffsetFetchResponse_v4, OffsetFetchResponse_v5,
]
7 changes: 4 additions & 3 deletions kafka/protocol/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String


DEFAULT_GENERATION_ID = -1
UNKNOWN_MEMBER_ID = ''


class JoinGroupResponse_v0(Response):
API_KEY = 11
API_VERSION = 0
Expand Down Expand Up @@ -55,7 +59,6 @@ class JoinGroupRequest_v0(Request):
('protocol_name', String('utf-8')),
('protocol_metadata', Bytes)))
)
UNKNOWN_MEMBER_ID = ''


class JoinGroupRequest_v1(Request):
Expand All @@ -72,15 +75,13 @@ class JoinGroupRequest_v1(Request):
('protocol_name', String('utf-8')),
('protocol_metadata', Bytes)))
)
UNKNOWN_MEMBER_ID = ''


class JoinGroupRequest_v2(Request):
API_KEY = 11
API_VERSION = 2
RESPONSE_TYPE = JoinGroupResponse_v2
SCHEMA = JoinGroupRequest_v1.SCHEMA
UNKNOWN_MEMBER_ID = ''


JoinGroupRequest = [
Expand Down
Loading