diff --git a/Makefile b/Makefile index b4dcbffc9..ec82ee1c0 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # Some simple testing tasks (sorry, UNIX only). FLAGS= -KAFKA_VERSION=0.11.0.2 +KAFKA_VERSION=2.4.0 SCALA_VERSION=2.12 setup: diff --git a/kafka/codec.py b/kafka/codec.py index aa9fc8291..8ca0728f6 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -10,12 +10,18 @@ _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) _XERIAL_V1_FORMAT = 'bccccccBii' +ZSTD_MAX_OUTPUT_SIZE = 1024 ** 3 try: import snappy except ImportError: snappy = None +try: + import zstandard as zstd +except ImportError: + zstd = None + try: import lz4.frame as lz4 @@ -58,6 +64,10 @@ def has_snappy(): return snappy is not None +def has_zstd(): + return zstd is not None + + def has_lz4(): if lz4 is not None: return True @@ -299,3 +309,18 @@ def lz4_decode_old_kafka(payload): payload[header_size:] ]) return lz4_decode(munged_payload) + + +def zstd_encode(payload): + if not zstd: + raise NotImplementedError("Zstd codec is not available") + return zstd.ZstdCompressor().compress(payload) + + +def zstd_decode(payload): + if not zstd: + raise NotImplementedError("Zstd codec is not available") + try: + return zstd.ZstdDecompressor().decompress(payload) + except zstd.ZstdError: + return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE) diff --git a/kafka/producer/future.py b/kafka/producer/future.py index f67db0979..07fa4adb4 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -38,7 +38,7 @@ def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, seri produce_future.add_errback(self.failure) def _produce_success(self, offset_and_timestamp): - offset, produce_timestamp_ms = offset_and_timestamp + offset, produce_timestamp_ms, log_start_offset = offset_and_timestamp # Unpacking from args tuple is minor speed optimization (relative_offset, timestamp_ms, checksum, @@ -51,7 +51,7 @@ def _produce_success(self, offset_and_timestamp): if offset != -1 and relative_offset is not None: offset += relative_offset tp = self._produce_future.topic_partition - metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, + metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, log_start_offset, checksum, serialized_key_size, serialized_value_size, serialized_header_size) self.success(metadata) @@ -67,5 +67,5 @@ def get(self, timeout=None): RecordMetadata = collections.namedtuple( - 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', + 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'log_start_offset', 'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size']) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 9509ab940..dba18015a 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -12,7 +12,7 @@ import kafka.errors as Errors from kafka.client_async import KafkaClient, selectors -from kafka.codec import has_gzip, has_snappy, has_lz4 +from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd from kafka.metrics import MetricConfig, Metrics from kafka.partitioner.default import DefaultPartitioner from kafka.producer.future import FutureRecordMetadata, FutureProduceResult @@ -119,7 +119,7 @@ class KafkaProducer(object): available guarantee. If unset, defaults to acks=1. compression_type (str): The compression type for all data generated by - the producer. Valid values are 'gzip', 'snappy', 'lz4', or None. + the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or None. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). Default: None. @@ -339,6 +339,7 @@ class KafkaProducer(object): 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP), 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY), 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4), + 'zstd': (has_zstd, DefaultRecordBatchBuilder.CODEC_ZSTD), None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE), } @@ -388,6 +389,9 @@ def __init__(self, **configs): if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' + if self.config['compression_type'] == 'zstd': + assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers' + # Check compression_type for library support ct = self.config['compression_type'] if ct not in self._COMPRESSORS: diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 0de5f98e7..051ae5cfc 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -68,16 +68,16 @@ def try_append(self, timestamp_ms, key, value, headers): sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1) return future - def done(self, base_offset=None, timestamp_ms=None, exception=None): + def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None): level = logging.DEBUG if exception is None else logging.WARNING log.log(level, "Produced messages to topic-partition %s with base offset" - " %s and error %s.", self.topic_partition, base_offset, - exception) # trace + " %s log start offset %s and error %s.", self.topic_partition, base_offset, + log_start_offset, exception) # trace if self.produce_future.is_done: log.warning('Batch is already closed -- ignoring batch.done()') return elif exception is None: - self.produce_future.success((base_offset, timestamp_ms)) + self.produce_future.success((base_offset, timestamp_ms, log_start_offset)) else: self.produce_future.failure(exception) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 705b58f9a..a95abd1ce 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -195,13 +195,18 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for topic, partitions in response.topics: for partition_info in partitions: + error_message = None if response.API_VERSION < 2: partition, error_code, offset = partition_info ts = None - else: + elif 2 <= response.API_VERSION <= 4: partition, error_code, offset, ts = partition_info + elif 5 <= response.API_VERSION <= 7: + partition, error_code, offset, ts, log_start_offset = partition_info + else: + partition, error_code, offset, ts, log_start_offset, _, error_message = partition_info tp = TopicPartition(topic, partition) - error = Errors.for_code(error_code) + error = error_message or Errors.for_code(error_code) batch = batches_by_partition[tp] self._complete_batch(batch, error, offset, ts) @@ -213,7 +218,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for batch in batches: self._complete_batch(batch, None, -1, None) - def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): + def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None): """Complete or retry the given batch of records. Arguments: @@ -221,6 +226,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): error (Exception): The error (or None if none) base_offset (int): The base offset assigned to the records if successful timestamp_ms (int, optional): The timestamp returned by the broker for this batch + log_start_offset (int): The start offset of the log at the time this produce response was created """ # Standardize no-error to None if error is Errors.NoError: @@ -240,7 +246,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): error = error(batch.topic_partition.topic) # tell the user the result of their request - batch.done(base_offset, timestamp_ms, error) + batch.done(base_offset, timestamp_ms, error, log_start_offset) self._accumulator.deallocate(batch) if error is not None: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) @@ -293,7 +299,15 @@ def _produce_request(self, node_id, acks, timeout, batches): produce_records_by_partition[topic][partition] = buf kwargs = {} - if self.config['api_version'] >= (0, 11): + if self.config['api_version'] >= (2, 1): + version = 7 + elif self.config['api_version'] >= (2, 0): + version = 6 + elif self.config['api_version'] >= (1, 1): + version = 5 + elif self.config['api_version'] >= (1, 0): + version = 4 + elif self.config['api_version'] >= (0, 11): version = 3 kwargs = dict(transactional_id=None) elif self.config['api_version'] >= (0, 10): diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index f4032b311..a32a53975 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -61,7 +61,6 @@ class ProduceResponse_v4(Response): API_VERSION = 4 SCHEMA = ProduceResponse_v3.SCHEMA - class ProduceResponse_v5(Response): API_KEY = 0 API_VERSION = 5 @@ -78,6 +77,50 @@ class ProduceResponse_v5(Response): ) +class ProduceResponse_v6(Response): + """ + The version number is bumped to indicate that on quota violation brokers send out responses before throttling. + """ + API_KEY = 0 + API_VERSION = 6 + SCHEMA = ProduceResponse_v5.SCHEMA + + +class ProduceResponse_v7(Response): + """ + V7 bumped up to indicate ZStandard capability. (see KIP-110) + """ + API_KEY = 0 + API_VERSION = 7 + SCHEMA = ProduceResponse_v6.SCHEMA + + +class ProduceResponse_v8(Response): + """ + V8 bumped up to add two new fields record_errors offset list and error_message + (See KIP-467) + """ + API_KEY = 0 + API_VERSION = 8 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('offset', Int64), + ('timestamp', Int64), + ('log_start_offset', Int64)), + ('record_errors', (Array( + ('batch_index', Int32), + ('batch_index_error_message', String('utf-8')) + ))), + ('error_message', String('utf-8')) + ))), + ('throttle_time_ms', Int32) + ) + + class ProduceRequest(Request): API_KEY = 0 @@ -147,11 +190,41 @@ class ProduceRequest_v5(ProduceRequest): SCHEMA = ProduceRequest_v4.SCHEMA +class ProduceRequest_v6(ProduceRequest): + """ + The version number is bumped to indicate that on quota violation brokers send out responses before throttling. + """ + API_VERSION = 6 + RESPONSE_TYPE = ProduceResponse_v6 + SCHEMA = ProduceRequest_v5.SCHEMA + + +class ProduceRequest_v7(ProduceRequest): + """ + V7 bumped up to indicate ZStandard capability. (see KIP-110) + """ + API_VERSION = 7 + RESPONSE_TYPE = ProduceResponse_v7 + SCHEMA = ProduceRequest_v6.SCHEMA + + +class ProduceRequest_v8(ProduceRequest): + """ + V8 bumped up to add two new fields record_errors offset list and error_message to PartitionResponse + (See KIP-467) + """ + API_VERSION = 8 + RESPONSE_TYPE = ProduceResponse_v8 + SCHEMA = ProduceRequest_v7.SCHEMA + + ProduceRequest = [ ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2, - ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5 + ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5, + ProduceRequest_v6, ProduceRequest_v7, ProduceRequest_v8, ] ProduceResponse = [ ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2, - ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5 + ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5, + ProduceResponse_v6, ProduceResponse_v7, ProduceResponse_v8, ] diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 07368bba9..a098c42a9 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -62,8 +62,8 @@ ) from kafka.errors import CorruptRecordException, UnsupportedCodecError from kafka.codec import ( - gzip_encode, snappy_encode, lz4_encode, - gzip_decode, snappy_decode, lz4_decode + gzip_encode, snappy_encode, lz4_encode, zstd_encode, + gzip_decode, snappy_decode, lz4_decode, zstd_decode ) import kafka.codec as codecs @@ -97,6 +97,7 @@ class DefaultRecordBase(object): CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 + CODEC_ZSTD = 0x04 TIMESTAMP_TYPE_MASK = 0x08 TRANSACTIONAL_MASK = 0x10 CONTROL_MASK = 0x20 @@ -111,6 +112,8 @@ def _assert_has_codec(self, compression_type): checker, name = codecs.has_snappy, "snappy" elif compression_type == self.CODEC_LZ4: checker, name = codecs.has_lz4, "lz4" + elif compression_type == self.CODEC_ZSTD: + checker, name = codecs.has_zstd, "zstd" if not checker(): raise UnsupportedCodecError( "Libraries for {} compression codec not found".format(name)) @@ -185,6 +188,8 @@ def _maybe_uncompress(self): uncompressed = snappy_decode(data.tobytes()) if compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) + if compression_type == self.CODEC_ZSTD: + uncompressed = zstd_decode(data.tobytes()) self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True @@ -517,6 +522,8 @@ def _maybe_compress(self): compressed = snappy_encode(data) elif self._compression_type == self.CODEC_LZ4: compressed = lz4_encode(data) + elif self._compression_type == self.CODEC_ZSTD: + compressed = zstd_encode(data) compressed_size = len(compressed) if len(data) <= compressed_size: # We did not get any benefit from compression, lets send diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index a6c4b51f7..fc2ef2d6b 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -117,7 +117,7 @@ class MemoryRecordsBuilder(object): def __init__(self, magic, compression_type, batch_size): assert magic in [0, 1, 2], "Not supported magic" - assert compression_type in [0, 1, 2, 3], "Not valid compression type" + assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type" if magic >= 2: self._builder = DefaultRecordBatchBuilder( magic=magic, compression_type=compression_type, diff --git a/requirements-dev.txt b/requirements-dev.txt index d2830905b..77b8b5134 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -15,3 +15,4 @@ pytest-mock==1.10.0 sphinx-rtd-theme==0.2.4 crc32c==1.7 py==1.8.0 +zstandard==0.13.0 \ No newline at end of file diff --git a/test/test_codec.py b/test/test_codec.py index 9eff888fe..9425bdead 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -7,11 +7,12 @@ from kafka.vendor.six.moves import range from kafka.codec import ( - has_snappy, has_lz4, + has_snappy, has_lz4, has_zstd, gzip_encode, gzip_decode, snappy_encode, snappy_decode, lz4_encode, lz4_decode, lz4_encode_old_kafka, lz4_decode_old_kafka, + zstd_encode, zstd_decode, ) from test.testutil import random_string @@ -21,7 +22,7 @@ def test_gzip(): for i in range(1000): b1 = random_string(100).encode('utf-8') b2 = gzip_decode(gzip_encode(b1)) - assert b1 == b2 + assert b1 == b2, "decompressed value differs from input value: %r vs %r" % (b2, b1) @pytest.mark.skipif(not has_snappy(), reason="Snappy not available") @@ -29,7 +30,7 @@ def test_snappy(): for i in range(1000): b1 = random_string(100).encode('utf-8') b2 = snappy_decode(snappy_encode(b1)) - assert b1 == b2 + assert b1 == b2, "decompressed value differs from input value: %r vs %r" % (b2, b1) @pytest.mark.skipif(not has_snappy(), reason="Snappy not available") @@ -90,7 +91,7 @@ def test_lz4(): b1 = random_string(100).encode('utf-8') b2 = lz4_decode(lz4_encode(b1)) assert len(b1) == len(b2) - assert b1 == b2 + assert b1 == b2, "decompressed value differs from input value: %r vs %r" % (b2, b1) @pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy', @@ -100,7 +101,7 @@ def test_lz4_old(): b1 = random_string(100).encode('utf-8') b2 = lz4_decode_old_kafka(lz4_encode_old_kafka(b1)) assert len(b1) == len(b2) - assert b1 == b2 + assert b1 == b2, "decompressed value differs from input value: %r vs %r" % (b2, b1) @pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy', @@ -112,4 +113,12 @@ def test_lz4_incremental(): b1 = random_string(100).encode('utf-8') * 50000 b2 = lz4_decode(lz4_encode(b1)) assert len(b1) == len(b2) - assert b1 == b2 + assert b1 == b2, "decompressed value differs from input value: %r vs %r" % (b2, b1) + + +@pytest.mark.skipif(not has_zstd(), reason="Zstd not available") +def test_zstd(): + for _ in range(1000): + b1 = random_string(100).encode('utf-8') + b2 = zstd_decode(zstd_encode(b1)) + assert b1 == b2, "decompressed value differs from input value: %r vs %r" % (b2, b1) diff --git a/test/test_producer.py b/test/test_producer.py index 9605adf58..604f638bb 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -23,21 +23,22 @@ def test_buffer_pool(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_end_to_end(kafka_broker, compression): - if compression == 'lz4': - # LZ4 requires 0.8.2 if env_kafka_version() < (0, 8, 2): - return - # python-lz4 crashes on older versions of pypy + pytest.skip('LZ4 requires 0.8.2') elif platform.python_implementation() == 'PyPy': - return + pytest.skip('python-lz4 crashes on older versions of pypy') + + if compression == 'zstd' and env_kafka_version() < (2, 1, 0): + pytest.skip("zstd requires kafka 2.1.0 or more") connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, max_block_ms=30000, + api_version=env_kafka_version(), compression_type=compression, value_serializer=str.encode) consumer = KafkaConsumer(bootstrap_servers=connect_str, @@ -81,11 +82,14 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): + if compression == 'zstd' and env_kafka_version() < (2, 1, 0): + pytest.skip('zstd requires 2.1.0 or more') connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, + api_version=env_kafka_version(), max_block_ms=30000, compression_type=compression) magic = producer._max_usable_produce_magic() @@ -124,10 +128,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if headers: assert record.serialized_header_size == 22 - # generated timestamp case is skipped for broker 0.9 and below if magic == 0: - return - + pytest.skip('generated timestamp case is skipped for broker 0.9 and below') send_time = time.time() * 1000 future = producer.send( topic, diff --git a/tox.ini b/tox.ini index 06403d6ed..596a9f211 100644 --- a/tox.ini +++ b/tox.ini @@ -15,6 +15,7 @@ deps = pytest-mock mock python-snappy + zstandard lz4 xxhash crc32c