Skip to content

Commit 2de3c34

Browse files
authored
Support OffsetFetch v5 / OffsetCommit v6 (2.1 baseline) (#2505)
1 parent 3cf418a commit 2de3c34

File tree

5 files changed

+209
-27
lines changed

5 files changed

+209
-27
lines changed

kafka/coordinator/base.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414
from kafka.future import Future
1515
from kafka.metrics import AnonMeasurable
1616
from kafka.metrics.stats import Avg, Count, Max, Rate
17-
from kafka.protocol.commit import OffsetCommitRequest
1817
from kafka.protocol.find_coordinator import FindCoordinatorRequest
19-
from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest,
20-
LeaveGroupRequest, SyncGroupRequest)
18+
from kafka.protocol.group import HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID
2119

2220
log = logging.getLogger('kafka.coordinator')
2321

@@ -34,10 +32,7 @@ def __init__(self, generation_id, member_id, protocol):
3432
self.member_id = member_id
3533
self.protocol = protocol
3634

37-
Generation.NO_GENERATION = Generation(
38-
OffsetCommitRequest[2].DEFAULT_GENERATION_ID,
39-
JoinGroupRequest[0].UNKNOWN_MEMBER_ID,
40-
None)
35+
Generation.NO_GENERATION = Generation(DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, None)
4136

4237

4338
class UnjoinedGroupException(Errors.KafkaError):

kafka/coordinator/consumer.py

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ def _send_offset_commit_request(self, offsets):
575575
offset_data[tp.topic][tp.partition] = offset
576576

577577
if self._subscription.partitions_auto_assigned():
578-
generation = self.generation()
578+
generation = self.generation() or Generation.NO_GENERATION
579579
else:
580580
generation = Generation.NO_GENERATION
581581

@@ -585,8 +585,35 @@ def _send_offset_commit_request(self, offsets):
585585
if self.config['api_version'] >= (0, 9) and generation is None:
586586
return Future().failure(Errors.CommitFailedError())
587587

588-
version = self._client.api_version(OffsetCommitRequest, max_version=2)
589-
if version == 2:
588+
version = self._client.api_version(OffsetCommitRequest, max_version=6)
589+
if version == 0:
590+
request = OffsetCommitRequest[version](
591+
self.group_id,
592+
[(
593+
topic, [(
594+
partition,
595+
offset.offset,
596+
offset.metadata
597+
) for partition, offset in six.iteritems(partitions)]
598+
) for topic, partitions in six.iteritems(offset_data)]
599+
)
600+
elif version == 1:
601+
request = OffsetCommitRequest[version](
602+
self.group_id,
603+
# This api version was only used in v0.8.2, prior to join group apis
604+
# so this always ends up as NO_GENERATION
605+
generation.generation_id,
606+
generation.member_id,
607+
[(
608+
topic, [(
609+
partition,
610+
offset.offset,
611+
-1, # timestamp, unused
612+
offset.metadata
613+
) for partition, offset in six.iteritems(partitions)]
614+
) for topic, partitions in six.iteritems(offset_data)]
615+
)
616+
elif version <= 4:
590617
request = OffsetCommitRequest[version](
591618
self.group_id,
592619
generation.generation_id,
@@ -600,25 +627,29 @@ def _send_offset_commit_request(self, offsets):
600627
) for partition, offset in six.iteritems(partitions)]
601628
) for topic, partitions in six.iteritems(offset_data)]
602629
)
603-
elif version == 1:
630+
elif version <= 5:
604631
request = OffsetCommitRequest[version](
605-
self.group_id, -1, '',
632+
self.group_id,
633+
generation.generation_id,
634+
generation.member_id,
606635
[(
607636
topic, [(
608637
partition,
609638
offset.offset,
610-
-1,
611639
offset.metadata
612640
) for partition, offset in six.iteritems(partitions)]
613641
) for topic, partitions in six.iteritems(offset_data)]
614642
)
615-
elif version == 0:
643+
else:
616644
request = OffsetCommitRequest[version](
617645
self.group_id,
646+
generation.generation_id,
647+
generation.member_id,
618648
[(
619649
topic, [(
620650
partition,
621651
offset.offset,
652+
-1, # leader_epoch
622653
offset.metadata
623654
) for partition, offset in six.iteritems(partitions)]
624655
) for topic, partitions in six.iteritems(offset_data)]
@@ -634,6 +665,8 @@ def _send_offset_commit_request(self, offsets):
634665
return future
635666

636667
def _handle_offset_commit_response(self, offsets, future, send_time, response):
668+
if response.API_VERSION >= 3 and response.throttle_time_ms > 0:
669+
log.warning()
637670
# TODO look at adding request_latency_ms to response (like java kafka)
638671
self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
639672
unauthorized_topics = set()
@@ -735,7 +768,9 @@ def _send_offset_fetch_request(self, partitions):
735768
for tp in partitions:
736769
topic_partitions[tp.topic].add(tp.partition)
737770

738-
version = self._client.api_version(OffsetFetchRequest, max_version=1)
771+
version = self._client.api_version(OffsetFetchRequest, max_version=5)
772+
# Starting in version 2, the request can contain a null topics array to indicate that offsets should be fetched
773+
# TODO: support
739774
request = OffsetFetchRequest[version](
740775
self.group_id,
741776
list(topic_partitions.items())
@@ -749,9 +784,23 @@ def _send_offset_fetch_request(self, partitions):
749784
return future
750785

751786
def _handle_offset_fetch_response(self, future, response):
787+
if response.API_VERSION >= 3 and response.throttle_time_ms > 0:
788+
log.warning()
789+
790+
if response.API_VERSION >= 2 and response.error_code != Errors.NoError.errno:
791+
error_type = Errors.for_code(response.error_code)
792+
# TODO: handle...
793+
752794
offsets = {}
753795
for topic, partitions in response.topics:
754-
for partition, offset, metadata, error_code in partitions:
796+
for partition_data in partitions:
797+
partition, offset = partition_data[:2]
798+
if response.API_VERSION >= 5:
799+
leader_epoch, metadata, error_code = partition_data[2:]
800+
else:
801+
metadata, error_code = partition_data[2:]
802+
leader_epoch = -1
803+
# TODO: save leader_epoch!
755804
tp = TopicPartition(topic, partition)
756805
error_type = Errors.for_code(error_code)
757806
if error_type is not Errors.NoError:

kafka/protocol/commit.py

Lines changed: 109 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,24 @@ class OffsetCommitResponse_v3(Response):
4141
)
4242

4343

44+
class OffsetCommitResponse_v4(Response):
45+
API_KEY = 8
46+
API_VERSION = 4
47+
SCHEMA = OffsetCommitResponse_v3.SCHEMA
48+
49+
50+
class OffsetCommitResponse_v5(Response):
51+
API_KEY = 8
52+
API_VERSION = 5
53+
SCHEMA = OffsetCommitResponse_v4.SCHEMA
54+
55+
56+
class OffsetCommitResponse_v6(Response):
57+
API_KEY = 8
58+
API_VERSION = 6
59+
SCHEMA = OffsetCommitResponse_v5.SCHEMA
60+
61+
4462
class OffsetCommitRequest_v0(Request):
4563
API_KEY = 8
4664
API_VERSION = 0 # Zookeeper-backed storage
@@ -76,21 +94,20 @@ class OffsetCommitRequest_v1(Request):
7694

7795
class OffsetCommitRequest_v2(Request):
7896
API_KEY = 8
79-
API_VERSION = 2 # added retention_time, dropped timestamp
97+
API_VERSION = 2
8098
RESPONSE_TYPE = OffsetCommitResponse_v2
8199
SCHEMA = Schema(
82100
('consumer_group', String('utf-8')),
83101
('consumer_group_generation_id', Int32),
84102
('consumer_id', String('utf-8')),
85-
('retention_time', Int64),
103+
('retention_time', Int64), # added retention_time, dropped timestamp
86104
('topics', Array(
87105
('topic', String('utf-8')),
88106
('partitions', Array(
89107
('partition', Int32),
90108
('offset', Int64),
91109
('metadata', String('utf-8'))))))
92110
)
93-
DEFAULT_GENERATION_ID = -1
94111
DEFAULT_RETENTION_TIME = -1
95112

96113

@@ -99,15 +116,63 @@ class OffsetCommitRequest_v3(Request):
99116
API_VERSION = 3
100117
RESPONSE_TYPE = OffsetCommitResponse_v3
101118
SCHEMA = OffsetCommitRequest_v2.SCHEMA
119+
DEFAULT_RETENTION_TIME = -1
120+
121+
122+
class OffsetCommitRequest_v4(Request):
123+
API_KEY = 8
124+
API_VERSION = 4
125+
RESPONSE_TYPE = OffsetCommitResponse_v4
126+
SCHEMA = OffsetCommitRequest_v3.SCHEMA
127+
DEFAULT_RETENTION_TIME = -1
128+
129+
130+
class OffsetCommitRequest_v5(Request):
131+
API_KEY = 8
132+
API_VERSION = 5 # drops retention_time
133+
RESPONSE_TYPE = OffsetCommitResponse_v5
134+
SCHEMA = Schema(
135+
('consumer_group', String('utf-8')),
136+
('consumer_group_generation_id', Int32),
137+
('consumer_id', String('utf-8')),
138+
('topics', Array(
139+
('topic', String('utf-8')),
140+
('partitions', Array(
141+
('partition', Int32),
142+
('offset', Int64),
143+
('metadata', String('utf-8'))))))
144+
)
145+
146+
147+
class OffsetCommitRequest_v6(Request):
148+
API_KEY = 8
149+
API_VERSION = 6
150+
RESPONSE_TYPE = OffsetCommitResponse_v6
151+
SCHEMA = Schema(
152+
('consumer_group', String('utf-8')),
153+
('consumer_group_generation_id', Int32),
154+
('consumer_id', String('utf-8')),
155+
('topics', Array(
156+
('topic', String('utf-8')),
157+
('partitions', Array(
158+
('partition', Int32),
159+
('offset', Int64),
160+
('leader_epoch', Int32), # added for fencing / kip-320. default -1
161+
('metadata', String('utf-8'))))))
162+
)
102163

103164

104165
OffsetCommitRequest = [
105166
OffsetCommitRequest_v0, OffsetCommitRequest_v1,
106-
OffsetCommitRequest_v2, OffsetCommitRequest_v3
167+
OffsetCommitRequest_v2, OffsetCommitRequest_v3,
168+
OffsetCommitRequest_v4, OffsetCommitRequest_v5,
169+
OffsetCommitRequest_v6,
107170
]
108171
OffsetCommitResponse = [
109172
OffsetCommitResponse_v0, OffsetCommitResponse_v1,
110-
OffsetCommitResponse_v2, OffsetCommitResponse_v3
173+
OffsetCommitResponse_v2, OffsetCommitResponse_v3,
174+
OffsetCommitResponse_v4, OffsetCommitResponse_v5,
175+
OffsetCommitResponse_v6,
111176
]
112177

113178

@@ -163,6 +228,29 @@ class OffsetFetchResponse_v3(Response):
163228
)
164229

165230

231+
class OffsetFetchResponse_v4(Response):
232+
API_KEY = 9
233+
API_VERSION = 4
234+
SCHEMA = OffsetFetchResponse_v3.SCHEMA
235+
236+
237+
class OffsetFetchResponse_v5(Response):
238+
API_KEY = 9
239+
API_VERSION = 5
240+
SCHEMA = Schema(
241+
('throttle_time_ms', Int32),
242+
('topics', Array(
243+
('topic', String('utf-8')),
244+
('partitions', Array(
245+
('partition', Int32),
246+
('offset', Int64),
247+
('leader_epoch', Int32),
248+
('metadata', String('utf-8')),
249+
('error_code', Int16))))),
250+
('error_code', Int16)
251+
)
252+
253+
166254
class OffsetFetchRequest_v0(Request):
167255
API_KEY = 9
168256
API_VERSION = 0 # zookeeper-backed storage
@@ -199,11 +287,27 @@ class OffsetFetchRequest_v3(Request):
199287
SCHEMA = OffsetFetchRequest_v2.SCHEMA
200288

201289

290+
class OffsetFetchRequest_v4(Request):
291+
API_KEY = 9
292+
API_VERSION = 4
293+
RESPONSE_TYPE = OffsetFetchResponse_v4
294+
SCHEMA = OffsetFetchRequest_v3.SCHEMA
295+
296+
297+
class OffsetFetchRequest_v5(Request):
298+
API_KEY = 9
299+
API_VERSION = 5
300+
RESPONSE_TYPE = OffsetFetchResponse_v5
301+
SCHEMA = OffsetFetchRequest_v4.SCHEMA
302+
303+
202304
OffsetFetchRequest = [
203305
OffsetFetchRequest_v0, OffsetFetchRequest_v1,
204306
OffsetFetchRequest_v2, OffsetFetchRequest_v3,
307+
OffsetFetchRequest_v4, OffsetFetchRequest_v5,
205308
]
206309
OffsetFetchResponse = [
207310
OffsetFetchResponse_v0, OffsetFetchResponse_v1,
208311
OffsetFetchResponse_v2, OffsetFetchResponse_v3,
312+
OffsetFetchResponse_v4, OffsetFetchResponse_v5,
209313
]

kafka/protocol/group.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
66

77

8+
DEFAULT_GENERATION_ID = -1
9+
UNKNOWN_MEMBER_ID = ''
10+
11+
812
class JoinGroupResponse_v0(Response):
913
API_KEY = 11
1014
API_VERSION = 0
@@ -61,7 +65,6 @@ class JoinGroupRequest_v0(Request):
6165
('protocol_name', String('utf-8')),
6266
('protocol_metadata', Bytes)))
6367
)
64-
UNKNOWN_MEMBER_ID = ''
6568

6669

6770
class JoinGroupRequest_v1(Request):
@@ -78,15 +81,13 @@ class JoinGroupRequest_v1(Request):
7881
('protocol_name', String('utf-8')),
7982
('protocol_metadata', Bytes)))
8083
)
81-
UNKNOWN_MEMBER_ID = ''
8284

8385

8486
class JoinGroupRequest_v2(Request):
8587
API_KEY = 11
8688
API_VERSION = 2
8789
RESPONSE_TYPE = JoinGroupResponse_v2
8890
SCHEMA = JoinGroupRequest_v1.SCHEMA
89-
UNKNOWN_MEMBER_ID = ''
9091

9192

9293
class JoinGroupRequest_v3(Request):

0 commit comments

Comments
 (0)