Skip to content

Commit 0197209

Browse files
authored
Merge pull request #7 from aiven/gabi-kafka-client-zstd-compression
kafka: kafka client zstd compression #7
2 parents 1ba1fe7 + cc3c3fd commit 0197209

File tree

13 files changed

+173
-37
lines changed

13 files changed

+173
-37
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Some simple testing tasks (sorry, UNIX only).
22

33
FLAGS=
4-
KAFKA_VERSION=0.11.0.2
4+
KAFKA_VERSION=2.4.0
55
SCALA_VERSION=2.12
66

77
setup:

kafka/codec.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,18 @@
1010

1111
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
1212
_XERIAL_V1_FORMAT = 'bccccccBii'
13+
ZSTD_MAX_OUTPUT_SIZE = 1024 ** 3
1314

1415
try:
1516
import snappy
1617
except ImportError:
1718
snappy = None
1819

20+
try:
21+
import zstandard as zstd
22+
except ImportError:
23+
zstd = None
24+
1925
try:
2026
import lz4.frame as lz4
2127

@@ -58,6 +64,10 @@ def has_snappy():
5864
return snappy is not None
5965

6066

67+
def has_zstd():
68+
return zstd is not None
69+
70+
6171
def has_lz4():
6272
if lz4 is not None:
6373
return True
@@ -299,3 +309,18 @@ def lz4_decode_old_kafka(payload):
299309
payload[header_size:]
300310
])
301311
return lz4_decode(munged_payload)
312+
313+
314+
def zstd_encode(payload):
315+
if not zstd:
316+
raise NotImplementedError("Zstd codec is not available")
317+
return zstd.ZstdCompressor().compress(payload)
318+
319+
320+
def zstd_decode(payload):
321+
if not zstd:
322+
raise NotImplementedError("Zstd codec is not available")
323+
try:
324+
return zstd.ZstdDecompressor().decompress(payload)
325+
except zstd.ZstdError:
326+
return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE)

kafka/producer/future.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, seri
3838
produce_future.add_errback(self.failure)
3939

4040
def _produce_success(self, offset_and_timestamp):
41-
offset, produce_timestamp_ms = offset_and_timestamp
41+
offset, produce_timestamp_ms, log_start_offset = offset_and_timestamp
4242

4343
# Unpacking from args tuple is minor speed optimization
4444
(relative_offset, timestamp_ms, checksum,
@@ -51,7 +51,7 @@ def _produce_success(self, offset_and_timestamp):
5151
if offset != -1 and relative_offset is not None:
5252
offset += relative_offset
5353
tp = self._produce_future.topic_partition
54-
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
54+
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, log_start_offset,
5555
checksum, serialized_key_size,
5656
serialized_value_size, serialized_header_size)
5757
self.success(metadata)
@@ -67,5 +67,5 @@ def get(self, timeout=None):
6767

6868

6969
RecordMetadata = collections.namedtuple(
70-
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
70+
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'log_start_offset',
7171
'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])

kafka/producer/kafka.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
import kafka.errors as Errors
1414
from kafka.client_async import KafkaClient, selectors
15-
from kafka.codec import has_gzip, has_snappy, has_lz4
15+
from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd
1616
from kafka.metrics import MetricConfig, Metrics
1717
from kafka.partitioner.default import DefaultPartitioner
1818
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
@@ -119,7 +119,7 @@ class KafkaProducer(object):
119119
available guarantee.
120120
If unset, defaults to acks=1.
121121
compression_type (str): The compression type for all data generated by
122-
the producer. Valid values are 'gzip', 'snappy', 'lz4', or None.
122+
the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or None.
123123
Compression is of full batches of data, so the efficacy of batching
124124
will also impact the compression ratio (more batching means better
125125
compression). Default: None.
@@ -339,6 +339,7 @@ class KafkaProducer(object):
339339
'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
340340
'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
341341
'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4),
342+
'zstd': (has_zstd, DefaultRecordBatchBuilder.CODEC_ZSTD),
342343
None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE),
343344
}
344345

@@ -388,6 +389,9 @@ def __init__(self, **configs):
388389
if self.config['compression_type'] == 'lz4':
389390
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
390391

392+
if self.config['compression_type'] == 'zstd':
393+
assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers'
394+
391395
# Check compression_type for library support
392396
ct = self.config['compression_type']
393397
if ct not in self._COMPRESSORS:

kafka/producer/record_accumulator.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,16 @@ def try_append(self, timestamp_ms, key, value, headers):
6868
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
6969
return future
7070

71-
def done(self, base_offset=None, timestamp_ms=None, exception=None):
71+
def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None):
7272
level = logging.DEBUG if exception is None else logging.WARNING
7373
log.log(level, "Produced messages to topic-partition %s with base offset"
74-
" %s and error %s.", self.topic_partition, base_offset,
75-
exception) # trace
74+
" %s log start offset %s and error %s.", self.topic_partition, base_offset,
75+
log_start_offset, exception) # trace
7676
if self.produce_future.is_done:
7777
log.warning('Batch is already closed -- ignoring batch.done()')
7878
return
7979
elif exception is None:
80-
self.produce_future.success((base_offset, timestamp_ms))
80+
self.produce_future.success((base_offset, timestamp_ms, log_start_offset))
8181
else:
8282
self.produce_future.failure(exception)
8383

kafka/producer/sender.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,13 +195,18 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
195195

196196
for topic, partitions in response.topics:
197197
for partition_info in partitions:
198+
error_message = None
198199
if response.API_VERSION < 2:
199200
partition, error_code, offset = partition_info
200201
ts = None
201-
else:
202+
elif 2 <= response.API_VERSION <= 4:
202203
partition, error_code, offset, ts = partition_info
204+
elif 5 <= response.API_VERSION <= 7:
205+
partition, error_code, offset, ts, log_start_offset = partition_info
206+
else:
207+
partition, error_code, offset, ts, log_start_offset, _, error_message = partition_info
203208
tp = TopicPartition(topic, partition)
204-
error = Errors.for_code(error_code)
209+
error = error_message or Errors.for_code(error_code)
205210
batch = batches_by_partition[tp]
206211
self._complete_batch(batch, error, offset, ts)
207212

@@ -213,14 +218,15 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
213218
for batch in batches:
214219
self._complete_batch(batch, None, -1, None)
215220

216-
def _complete_batch(self, batch, error, base_offset, timestamp_ms=None):
221+
def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None):
217222
"""Complete or retry the given batch of records.
218223
219224
Arguments:
220225
batch (RecordBatch): The record batch
221226
error (Exception): The error (or None if none)
222227
base_offset (int): The base offset assigned to the records if successful
223228
timestamp_ms (int, optional): The timestamp returned by the broker for this batch
229+
log_start_offset (int): The start offset of the log at the time this produce response was created
224230
"""
225231
# Standardize no-error to None
226232
if error is Errors.NoError:
@@ -240,7 +246,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None):
240246
error = error(batch.topic_partition.topic)
241247

242248
# tell the user the result of their request
243-
batch.done(base_offset, timestamp_ms, error)
249+
batch.done(base_offset, timestamp_ms, error, log_start_offset)
244250
self._accumulator.deallocate(batch)
245251
if error is not None:
246252
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
@@ -293,7 +299,15 @@ def _produce_request(self, node_id, acks, timeout, batches):
293299
produce_records_by_partition[topic][partition] = buf
294300

295301
kwargs = {}
296-
if self.config['api_version'] >= (0, 11):
302+
if self.config['api_version'] >= (2, 1):
303+
version = 7
304+
elif self.config['api_version'] >= (2, 0):
305+
version = 6
306+
elif self.config['api_version'] >= (1, 1):
307+
version = 5
308+
elif self.config['api_version'] >= (1, 0):
309+
version = 4
310+
elif self.config['api_version'] >= (0, 11):
297311
version = 3
298312
kwargs = dict(transactional_id=None)
299313
elif self.config['api_version'] >= (0, 10):

kafka/protocol/produce.py

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ class ProduceResponse_v4(Response):
6161
API_VERSION = 4
6262
SCHEMA = ProduceResponse_v3.SCHEMA
6363

64-
6564
class ProduceResponse_v5(Response):
6665
API_KEY = 0
6766
API_VERSION = 5
@@ -78,6 +77,50 @@ class ProduceResponse_v5(Response):
7877
)
7978

8079

80+
class ProduceResponse_v6(Response):
81+
"""
82+
The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
83+
"""
84+
API_KEY = 0
85+
API_VERSION = 6
86+
SCHEMA = ProduceResponse_v5.SCHEMA
87+
88+
89+
class ProduceResponse_v7(Response):
90+
"""
91+
V7 bumped up to indicate ZStandard capability. (see KIP-110)
92+
"""
93+
API_KEY = 0
94+
API_VERSION = 7
95+
SCHEMA = ProduceResponse_v6.SCHEMA
96+
97+
98+
class ProduceResponse_v8(Response):
99+
"""
100+
V8 bumped up to add two new fields record_errors offset list and error_message
101+
(See KIP-467)
102+
"""
103+
API_KEY = 0
104+
API_VERSION = 8
105+
SCHEMA = Schema(
106+
('topics', Array(
107+
('topic', String('utf-8')),
108+
('partitions', Array(
109+
('partition', Int32),
110+
('error_code', Int16),
111+
('offset', Int64),
112+
('timestamp', Int64),
113+
('log_start_offset', Int64)),
114+
('record_errors', (Array(
115+
('batch_index', Int32),
116+
('batch_index_error_message', String('utf-8'))
117+
))),
118+
('error_message', String('utf-8'))
119+
))),
120+
('throttle_time_ms', Int32)
121+
)
122+
123+
81124
class ProduceRequest(Request):
82125
API_KEY = 0
83126

@@ -147,11 +190,41 @@ class ProduceRequest_v5(ProduceRequest):
147190
SCHEMA = ProduceRequest_v4.SCHEMA
148191

149192

193+
class ProduceRequest_v6(ProduceRequest):
194+
"""
195+
The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
196+
"""
197+
API_VERSION = 6
198+
RESPONSE_TYPE = ProduceResponse_v6
199+
SCHEMA = ProduceRequest_v5.SCHEMA
200+
201+
202+
class ProduceRequest_v7(ProduceRequest):
203+
"""
204+
V7 bumped up to indicate ZStandard capability. (see KIP-110)
205+
"""
206+
API_VERSION = 7
207+
RESPONSE_TYPE = ProduceResponse_v7
208+
SCHEMA = ProduceRequest_v6.SCHEMA
209+
210+
211+
class ProduceRequest_v8(ProduceRequest):
212+
"""
213+
V8 bumped up to add two new fields record_errors offset list and error_message to PartitionResponse
214+
(See KIP-467)
215+
"""
216+
API_VERSION = 8
217+
RESPONSE_TYPE = ProduceResponse_v8
218+
SCHEMA = ProduceRequest_v7.SCHEMA
219+
220+
150221
ProduceRequest = [
151222
ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2,
152-
ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5
223+
ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5,
224+
ProduceRequest_v6, ProduceRequest_v7, ProduceRequest_v8,
153225
]
154226
ProduceResponse = [
155227
ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2,
156-
ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5
228+
ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5,
229+
ProduceResponse_v6, ProduceResponse_v7, ProduceResponse_v8,
157230
]

kafka/record/default_records.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@
6262
)
6363
from kafka.errors import CorruptRecordException, UnsupportedCodecError
6464
from kafka.codec import (
65-
gzip_encode, snappy_encode, lz4_encode,
66-
gzip_decode, snappy_decode, lz4_decode
65+
gzip_encode, snappy_encode, lz4_encode, zstd_encode,
66+
gzip_decode, snappy_decode, lz4_decode, zstd_decode
6767
)
6868
import kafka.codec as codecs
6969

@@ -97,6 +97,7 @@ class DefaultRecordBase(object):
9797
CODEC_GZIP = 0x01
9898
CODEC_SNAPPY = 0x02
9999
CODEC_LZ4 = 0x03
100+
CODEC_ZSTD = 0x04
100101
TIMESTAMP_TYPE_MASK = 0x08
101102
TRANSACTIONAL_MASK = 0x10
102103
CONTROL_MASK = 0x20
@@ -111,6 +112,8 @@ def _assert_has_codec(self, compression_type):
111112
checker, name = codecs.has_snappy, "snappy"
112113
elif compression_type == self.CODEC_LZ4:
113114
checker, name = codecs.has_lz4, "lz4"
115+
elif compression_type == self.CODEC_ZSTD:
116+
checker, name = codecs.has_zstd, "zstd"
114117
if not checker():
115118
raise UnsupportedCodecError(
116119
"Libraries for {} compression codec not found".format(name))
@@ -185,6 +188,8 @@ def _maybe_uncompress(self):
185188
uncompressed = snappy_decode(data.tobytes())
186189
if compression_type == self.CODEC_LZ4:
187190
uncompressed = lz4_decode(data.tobytes())
191+
if compression_type == self.CODEC_ZSTD:
192+
uncompressed = zstd_decode(data.tobytes())
188193
self._buffer = bytearray(uncompressed)
189194
self._pos = 0
190195
self._decompressed = True
@@ -517,6 +522,8 @@ def _maybe_compress(self):
517522
compressed = snappy_encode(data)
518523
elif self._compression_type == self.CODEC_LZ4:
519524
compressed = lz4_encode(data)
525+
elif self._compression_type == self.CODEC_ZSTD:
526+
compressed = zstd_encode(data)
520527
compressed_size = len(compressed)
521528
if len(data) <= compressed_size:
522529
# We did not get any benefit from compression, lets send

kafka/record/memory_records.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class MemoryRecordsBuilder(object):
117117

118118
def __init__(self, magic, compression_type, batch_size):
119119
assert magic in [0, 1, 2], "Not supported magic"
120-
assert compression_type in [0, 1, 2, 3], "Not valid compression type"
120+
assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type"
121121
if magic >= 2:
122122
self._builder = DefaultRecordBatchBuilder(
123123
magic=magic, compression_type=compression_type,

requirements-dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ pytest-mock==1.10.0
1515
sphinx-rtd-theme==0.2.4
1616
crc32c==1.7
1717
py==1.8.0
18+
zstandard==0.13.0

0 commit comments

Comments
 (0)