diff --git a/.travis.yml b/.travis.yml index 8e2fdfedf..e8379248a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,6 +6,7 @@ python: - 2.7 - 3.4 - 3.7 + - 3.8 - pypy2.7-6.0 env: @@ -15,11 +16,13 @@ env: - KAFKA_VERSION=0.11.0.3 - KAFKA_VERSION=1.1.1 - KAFKA_VERSION=2.4.0 + - KAFKA_VERSION=2.5.0 addons: apt: packages: - libsnappy-dev + - libzstd-dev - openjdk-8-jdk cache: diff --git a/README.rst b/README.rst index bae567ba6..29e6935c4 100644 --- a/README.rst +++ b/README.rst @@ -2,7 +2,7 @@ Kafka Python client ------------------------ .. image:: https://img.shields.io/badge/kafka-2.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg - :target: https://kafka-python.readthedocs.io/compatibility.html + :target: https://kafka-python.readthedocs.io/en/master/compatibility.html .. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg :target: https://pypi.python.org/pypi/kafka-python .. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github diff --git a/benchmarks/README b/benchmarks/README.md similarity index 100% rename from benchmarks/README rename to benchmarks/README.md diff --git a/build_integration.sh b/build_integration.sh index 98b9b2766..c020b0fe2 100755 --- a/build_integration.sh +++ b/build_integration.sh @@ -1,6 +1,6 @@ #!/bin/bash -: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1"} +: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1 2.2.1 2.3.0 2.4.0 2.5.0"} : ${SCALA_VERSION:=2.11} : ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/} : ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git} @@ -33,12 +33,14 @@ pushd servers echo "-------------------------------------" echo "Checking kafka binaries for ${kafka}" echo - # kafka 0.8.0 is only available w/ scala 2.8.0 if [ "$kafka" == "0.8.0" ]; then KAFKA_ARTIFACT="kafka_2.8.0-${kafka}.tar.gz" + else if [ "$kafka" \> "2.4.0" ]; then + KAFKA_ARTIFACT="kafka_2.12-${kafka}.tgz" else KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}.tgz" fi + fi if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then if [ -f "${KAFKA_ARTIFACT}" ]; then echo "Using cached artifact: ${KAFKA_ARTIFACT}" diff --git a/docs/compatibility.rst b/docs/compatibility.rst index 93be6fd6e..ae152618e 100644 --- a/docs/compatibility.rst +++ b/docs/compatibility.rst @@ -16,6 +16,6 @@ Although kafka-python is tested and expected to work on recent broker versions, not all features are supported. Specifically, authentication codecs, and transactional producer/consumer support are not fully implemented. PRs welcome! -kafka-python is tested on python 2.7, 3.4, 3.7, and pypy2.7. +kafka-python is tested on python 2.7, 3.4, 3.7, 3.8 and pypy2.7. Builds and tests via Travis-CI. See https://travis-ci.org/dpkp/kafka-python diff --git a/docs/conf.py b/docs/conf.py index c7da0bc46..efa8d0807 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -12,7 +12,6 @@ # All configuration values have a default; values that are commented out # serve to show the default. -import sys import os # If extensions (or modules to document with autodoc) are in another directory, diff --git a/docs/index.rst b/docs/index.rst index fa6f93c50..9c46e3313 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -122,11 +122,12 @@ multiprocessing is recommended. Compression *********** -kafka-python supports gzip compression/decompression natively. To produce or -consume lz4 compressed messages, you should install python-lz4 (pip install lz4). -To enable snappy, install python-snappy (also requires snappy library). -See `Installation `_ for more information. +kafka-python supports multiple compression types: + - gzip : supported natively + - lz4 : requires `python-lz4 `_ installed + - snappy : requires the `python-snappy `_ package (which requires the snappy C library) + - zstd : requires the `python-zstandard `_ package installed Protocol ******** diff --git a/example.py b/example.py index dac97b751..9907450f6 100755 --- a/example.py +++ b/example.py @@ -1,15 +1,15 @@ #!/usr/bin/env python -import threading, logging, time -import multiprocessing +import threading, time -from kafka import KafkaConsumer, KafkaProducer +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer +from kafka.admin import NewTopic class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.stop_event = threading.Event() - + def stop(self): self.stop_event.set() @@ -23,14 +23,15 @@ def run(self): producer.close() -class Consumer(multiprocessing.Process): + +class Consumer(threading.Thread): def __init__(self): - multiprocessing.Process.__init__(self) - self.stop_event = multiprocessing.Event() - + threading.Thread.__init__(self) + self.stop_event = threading.Event() + def stop(self): self.stop_event.set() - + def run(self): consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', @@ -44,29 +45,38 @@ def run(self): break consumer.close() - - + + def main(): + # Create 'my-topic' Kafka topic + try: + admin = KafkaAdminClient(bootstrap_servers='localhost:9092') + + topic = NewTopic(name='my-topic', + num_partitions=1, + replication_factor=1) + admin.create_topics([topic]) + except Exception: + pass + tasks = [ Producer(), Consumer() ] + # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic for t in tasks: t.start() time.sleep(10) - + + # Stop threads for task in tasks: task.stop() for task in tasks: task.join() - - + + if __name__ == "__main__": - logging.basicConfig( - format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', - level=logging.INFO - ) main() diff --git a/kafka/admin/client.py b/kafka/admin/client.py index d0fa84560..1b91e1b80 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1,6 +1,6 @@ from __future__ import absolute_import -from collections import defaultdict +from collections import defaultdict, namedtuple import copy import logging import socket @@ -8,7 +8,10 @@ from . import ConfigResourceType from kafka.vendor import six +from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \ + ACLResourcePatternType from kafka.client_async import KafkaClient, selectors +from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, @@ -19,9 +22,8 @@ ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest -from kafka.structs import TopicPartition, OffsetAndMetadata -from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \ - ACLResourcePatternType +from kafka.protocol.types import Array +from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation from kafka.version import __version__ @@ -204,7 +206,7 @@ def __init__(self, **configs): self._client = KafkaClient(metrics=self._metrics, metric_group_prefix='admin', **self.config) - self._client.check_version() + self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) # Get auto-discovered version from client if necessary if self.config['api_version'] is None: @@ -271,7 +273,7 @@ def _refresh_controller_id(self): response = future.value controller_id = response.controller_id # verify the controller is new enough to support our requests - controller_version = self._client.check_version(controller_id) + controller_version = self._client.check_version(controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) if controller_version < (0, 10, 0): raise IncompatibleBrokerVersion( "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." @@ -1000,22 +1002,47 @@ def _describe_consumer_groups_process_response(self, response): """Process a DescribeGroupsResponse into a group description.""" if response.API_VERSION <= 3: assert len(response.groups) == 1 - # TODO need to implement converting the response tuple into - # a more accessible interface like a namedtuple and then stop - # hardcoding tuple indices here. Several Java examples, - # including KafkaAdminClient.java - group_description = response.groups[0] - error_code = group_description[0] + for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names): + if isinstance(response_field, Array): + described_groups = response.__dict__[response_name] + described_groups_field_schema = response_field.array_of + described_group = response.__dict__[response_name][0] + described_group_information_list = [] + protocol_type_is_consumer = False + for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields): + if group_information_name == 'protocol_type': + protocol_type = described_group_information + protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type) + if isinstance(group_information_field, Array): + member_information_list = [] + member_schema = group_information_field.array_of + for members in described_group_information: + member_information = [] + for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names): + if protocol_type_is_consumer: + if member_name == 'member_metadata' and member: + member_information.append(ConsumerProtocolMemberMetadata.decode(member)) + elif member_name == 'member_assignment' and member: + member_information.append(ConsumerProtocolMemberAssignment.decode(member)) + else: + member_information.append(member) + member_info_tuple = MemberInformation._make(member_information) + member_information_list.append(member_info_tuple) + described_group_information_list.append(member_information_list) + else: + described_group_information_list.append(described_group_information) + # Version 3 of the DescribeGroups API introduced the "authorized_operations" field. This will cause the namedtuple to fail + # Therefore, appending a placeholder of None in it. + if response.API_VERSION <=2: + described_group_information_list.append(None) + group_description = GroupInformation._make(described_group_information_list) + error_code = group_description.error_code error_type = Errors.for_code(error_code) # Java has the note: KAFKA-6789, we can retry based on the error code if error_type is not Errors.NoError: raise error_type( "DescribeGroupsResponse failed with response '{}'." .format(response)) - # TODO Java checks the group protocol type, and if consumer - # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes - # the members' partition assignments... that hasn't yet been - # implemented here so just return the raw struct results else: raise NotImplementedError( "Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient." diff --git a/kafka/client_async.py b/kafka/client_async.py index 5379153c2..58f22d4ec 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -2,7 +2,6 @@ import collections import copy -import functools import logging import random import socket @@ -202,10 +201,15 @@ def __init__(self, **configs): if key in configs: self.config[key] = configs[key] + # these properties need to be set on top of the initialization pipeline + # because they are used when __del__ method is called + self._closed = False + self._wake_r, self._wake_w = socket.socketpair() + self._selector = self.config['selector']() + self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False - self._selector = self.config['selector']() self._conns = Dict() # object to support weakrefs self._api_versions = None self._connecting = set() @@ -213,7 +217,6 @@ def __init__(self, **configs): self._refresh_on_disconnects = True self._last_bootstrap = 0 self._bootstrap_fails = 0 - self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0) self._wake_lock = threading.Lock() @@ -227,7 +230,6 @@ def __init__(self, **configs): self._selector.register(self._wake_r, selectors.EVENT_READ) self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms']) - self._closed = False self._sensors = None if self.config['metrics']: self._sensors = KafkaClientMetrics(self.config['metrics'], diff --git a/kafka/codec.py b/kafka/codec.py index aa9fc8291..917400e74 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -10,12 +10,18 @@ _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) _XERIAL_V1_FORMAT = 'bccccccBii' +ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024 try: import snappy except ImportError: snappy = None +try: + import zstandard as zstd +except ImportError: + zstd = None + try: import lz4.frame as lz4 @@ -58,6 +64,10 @@ def has_snappy(): return snappy is not None +def has_zstd(): + return zstd is not None + + def has_lz4(): if lz4 is not None: return True @@ -299,3 +309,18 @@ def lz4_decode_old_kafka(payload): payload[header_size:] ]) return lz4_decode(munged_payload) + + +def zstd_encode(payload): + if not zstd: + raise NotImplementedError("Zstd codec is not available") + return zstd.ZstdCompressor().compress(payload) + + +def zstd_decode(payload): + if not zstd: + raise NotImplementedError("Zstd codec is not available") + try: + return zstd.ZstdDecompressor().decompress(payload) + except zstd.ZstdError: + return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE) diff --git a/kafka/conn.py b/kafka/conn.py index c383123ca..5c7287568 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -24,9 +24,12 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.oauth.abstract import AbstractTokenProvider -from kafka.protocol.admin import SaslHandShakeRequest +from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2 from kafka.protocol.commit import OffsetFetchRequest +from kafka.protocol.offset import OffsetRequest +from kafka.protocol.produce import ProduceRequest from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.fetch import FetchRequest from kafka.protocol.parser import KafkaProtocol from kafka.protocol.types import Int32, Int8 from kafka.scram import ScramClient @@ -1166,6 +1169,13 @@ def _infer_broker_version_from_api_versions(self, api_versions): # in reverse order. As soon as we find one that works, return it test_cases = [ # format (, ) + ((2, 5, 0), DescribeAclsRequest_v2), + ((2, 4, 0), ProduceRequest[8]), + ((2, 3, 0), FetchRequest[11]), + ((2, 2, 0), OffsetRequest[5]), + ((2, 1, 0), FetchRequest[10]), + ((2, 0, 0), FetchRequest[8]), + ((1, 1, 0), FetchRequest[7]), ((1, 0, 0), MetadataRequest[5]), ((0, 11, 0), MetadataRequest[4]), ((0, 10, 2), OffsetFetchRequest[2]), diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 2c11eb945..e4f8c1838 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -293,7 +293,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): # Issue #1780 # Recheck partition existence after after a successful metadata refresh if refresh_future.succeeded() and isinstance(future.exception, Errors.StaleMetadata): - log.debug("Stale metadata was raised, and we now have an updated metadata. Rechecking partition existance") + log.debug("Stale metadata was raised, and we now have an updated metadata. Rechecking partition existence") unknown_partition = future.exception.args[0] # TopicPartition from StaleMetadata if self._client.cluster.leader_for_partition(unknown_partition) is None: log.debug("Removed partition %s from offsets retrieval" % (unknown_partition, )) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index b0e236a06..5e41309df 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -3,7 +3,6 @@ import abc import copy import logging -import sys import threading import time import weakref @@ -243,7 +242,7 @@ def ensure_coordinator_ready(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - with self._lock: + with self._client._lock, self._lock: while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -346,7 +345,7 @@ def _handle_join_failure(self, _): def ensure_active_group(self): """Ensure that the group is active (i.e. joined and synced)""" - with self._lock: + with self._client._lock, self._lock: if self._heartbeat_thread is None: self._start_heartbeat_thread() @@ -764,7 +763,7 @@ def close(self): def maybe_leave_group(self): """Leave the current group and reset local generation/memberId.""" - with self._lock: + with self._client._lock, self._lock: if (not self.coordinator_unknown() and self.state is not MemberState.UNJOINED and self._generation is not Generation.NO_GENERATION): @@ -947,6 +946,15 @@ def run(self): log.debug('Heartbeat thread closed') def _run_once(self): + with self.coordinator._client._lock, self.coordinator._lock: + if self.enabled and self.coordinator.state is MemberState.STABLE: + # TODO: When consumer.wakeup() is implemented, we need to + # disable here to prevent propagating an exception to this + # heartbeat thread + # must get client._lock, or maybe deadlock at heartbeat + # failure callbak in consumer poll + self.coordinator._client.poll(timeout_ms=0) + with self.coordinator._lock: if not self.enabled: log.debug('Heartbeat disabled. Waiting') @@ -962,11 +970,6 @@ def _run_once(self): self.disable() return - # TODO: When consumer.wakeup() is implemented, we need to - # disable here to prevent propagating an exception to this - # heartbeat thread - self.coordinator._client.poll(timeout_ms=0) - if self.coordinator.coordinator_unknown(): future = self.coordinator.lookup_coordinator() if not future.is_done or future.failed(): diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 9509ab940..cde26b008 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -12,7 +12,7 @@ import kafka.errors as Errors from kafka.client_async import KafkaClient, selectors -from kafka.codec import has_gzip, has_snappy, has_lz4 +from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd from kafka.metrics import MetricConfig, Metrics from kafka.partitioner.default import DefaultPartitioner from kafka.producer.future import FutureRecordMetadata, FutureProduceResult @@ -119,7 +119,7 @@ class KafkaProducer(object): available guarantee. If unset, defaults to acks=1. compression_type (str): The compression type for all data generated by - the producer. Valid values are 'gzip', 'snappy', 'lz4', or None. + the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or None. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). Default: None. @@ -339,6 +339,7 @@ class KafkaProducer(object): 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP), 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY), 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4), + 'zstd': (has_zstd, DefaultRecordBatchBuilder.CODEC_ZSTD), None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE), } @@ -388,6 +389,9 @@ def __init__(self, **configs): if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' + if self.config['compression_type'] == 'zstd': + assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers' + # Check compression_type for library support ct = self.config['compression_type'] if ct not in self._COMPRESSORS: @@ -445,6 +449,14 @@ def _unregister_cleanup(self): self._cleanup = None def __del__(self): + # Disable logger during destruction to avoid touching dangling references + class NullLogger(object): + def __getattr__(self, name): + return lambda *args: None + + global log + log = NullLogger() + self.close() def close(self, timeout=None): diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index b2694dc96..af88ea473 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -477,6 +477,13 @@ class DescribeAclsResponse_v1(Response): ('permission_type', Int8))))) ) + +class DescribeAclsResponse_v2(Response): + API_KEY = 29 + API_VERSION = 2 + SCHEMA = DescribeAclsResponse_v1.SCHEMA + + class DescribeAclsRequest_v0(Request): API_KEY = 29 API_VERSION = 0 @@ -490,6 +497,7 @@ class DescribeAclsRequest_v0(Request): ('permission_type', Int8) ) + class DescribeAclsRequest_v1(Request): API_KEY = 29 API_VERSION = 1 @@ -504,6 +512,17 @@ class DescribeAclsRequest_v1(Request): ('permission_type', Int8) ) + +class DescribeAclsRequest_v2(Request): + """ + Enable flexible version + """ + API_KEY = 29 + API_VERSION = 2 + RESPONSE_TYPE = DescribeAclsResponse_v2 + SCHEMA = DescribeAclsRequest_v1.SCHEMA + + DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1] DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1] @@ -862,3 +881,4 @@ class CreatePartitionsRequest_v1(Request): CreatePartitionsResponse = [ CreatePartitionsResponse_v0, CreatePartitionsResponse_v1, ] + diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index dd3f648cf..f367848ce 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -94,6 +94,72 @@ class FetchResponse_v6(Response): SCHEMA = FetchResponse_v5.SCHEMA +class FetchResponse_v7(Response): + """ + Add error_code and session_id to response + """ + API_KEY = 1 + API_VERSION = 7 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('session_id', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('last_stable_offset', Int64), + ('log_start_offset', Int64), + ('aborted_transactions', Array( + ('producer_id', Int64), + ('first_offset', Int64))), + ('message_set', Bytes))))) + ) + + +class FetchResponse_v8(Response): + API_KEY = 1 + API_VERSION = 8 + SCHEMA = FetchResponse_v7.SCHEMA + + +class FetchResponse_v9(Response): + API_KEY = 1 + API_VERSION = 9 + SCHEMA = FetchResponse_v7.SCHEMA + + +class FetchResponse_v10(Response): + API_KEY = 1 + API_VERSION = 10 + SCHEMA = FetchResponse_v7.SCHEMA + + +class FetchResponse_v11(Response): + API_KEY = 1 + API_VERSION = 11 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('session_id', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('last_stable_offset', Int64), + ('log_start_offset', Int64), + ('aborted_transactions', Array( + ('producer_id', Int64), + ('first_offset', Int64))), + ('preferred_read_replica', Int32), + ('message_set', Bytes))))) + ) + + class FetchRequest_v0(Request): API_KEY = 1 API_VERSION = 0 @@ -196,13 +262,125 @@ class FetchRequest_v6(Request): SCHEMA = FetchRequest_v5.SCHEMA +class FetchRequest_v7(Request): + """ + Add incremental fetch requests + """ + API_KEY = 1 + API_VERSION = 7 + RESPONSE_TYPE = FetchResponse_v7 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('session_id', Int32), + ('session_epoch', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))), + ('forgotten_topics_data', Array( + ('topic', String), + ('partitions', Array(Int32)) + )), + ) + + +class FetchRequest_v8(Request): + """ + bump used to indicate that on quota violation brokers send out responses before throttling. + """ + API_KEY = 1 + API_VERSION = 8 + RESPONSE_TYPE = FetchResponse_v8 + SCHEMA = FetchRequest_v7.SCHEMA + + +class FetchRequest_v9(Request): + """ + adds the current leader epoch (see KIP-320) + """ + API_KEY = 1 + API_VERSION = 9 + RESPONSE_TYPE = FetchResponse_v9 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('session_id', Int32), + ('session_epoch', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))), + ('forgotten_topics_data', Array( + ('topic', String), + ('partitions', Array(Int32)), + )), + ) + + +class FetchRequest_v10(Request): + """ + bumped up to indicate ZStandard capability. (see KIP-110) + """ + API_KEY = 1 + API_VERSION = 10 + RESPONSE_TYPE = FetchResponse_v10 + SCHEMA = FetchRequest_v9.SCHEMA + + +class FetchRequest_v11(Request): + """ + added rack ID to support read from followers (KIP-392) + """ + API_KEY = 1 + API_VERSION = 11 + RESPONSE_TYPE = FetchResponse_v11 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('session_id', Int32), + ('session_epoch', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))), + ('forgotten_topics_data', Array( + ('topic', String), + ('partitions', Array(Int32)) + )), + ('rack_id', String('utf-8')), + ) + + FetchRequest = [ FetchRequest_v0, FetchRequest_v1, FetchRequest_v2, FetchRequest_v3, FetchRequest_v4, FetchRequest_v5, - FetchRequest_v6 + FetchRequest_v6, FetchRequest_v7, FetchRequest_v8, + FetchRequest_v9, FetchRequest_v10, FetchRequest_v11, ] FetchResponse = [ FetchResponse_v0, FetchResponse_v1, FetchResponse_v2, FetchResponse_v3, FetchResponse_v4, FetchResponse_v5, - FetchResponse_v6 + FetchResponse_v6, FetchResponse_v7, FetchResponse_v8, + FetchResponse_v9, FetchResponse_v10, FetchResponse_v11, ] diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 31527bf63..4c5c031b8 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -3,8 +3,8 @@ import io import time -from kafka.codec import (has_gzip, has_snappy, has_lz4, - gzip_decode, snappy_decode, +from kafka.codec import (has_gzip, has_snappy, has_lz4, has_zstd, + gzip_decode, snappy_decode, zstd_decode, lz4_decode, lz4_decode_old_kafka) from kafka.protocol.frame import KafkaBytes from kafka.protocol.struct import Struct @@ -35,6 +35,7 @@ class Message(Struct): CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 + CODEC_ZSTD = 0x04 TIMESTAMP_TYPE_MASK = 0x08 HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2) @@ -119,7 +120,7 @@ def is_compressed(self): def decompress(self): codec = self.attributes & self.CODEC_MASK - assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4) + assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4, self.CODEC_ZSTD) if codec == self.CODEC_GZIP: assert has_gzip(), 'Gzip decompression unsupported' raw_bytes = gzip_decode(self.value) @@ -132,6 +133,9 @@ def decompress(self): raw_bytes = lz4_decode_old_kafka(self.value) else: raw_bytes = lz4_decode(self.value) + elif codec == self.CODEC_ZSTD: + assert has_zstd(), "ZSTD decompression unsupported" + raw_bytes = zstd_decode(self.value) else: raise Exception('This should be impossible') diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 3c254de40..1ed382b0d 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -53,6 +53,43 @@ class OffsetResponse_v2(Response): ) +class OffsetResponse_v3(Response): + """ + on quota violation, brokers send out responses before throttling + """ + API_KEY = 2 + API_VERSION = 3 + SCHEMA = OffsetResponse_v2.SCHEMA + + +class OffsetResponse_v4(Response): + """ + Add leader_epoch to response + """ + API_KEY = 2 + API_VERSION = 4 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('timestamp', Int64), + ('offset', Int64), + ('leader_epoch', Int32))))) + ) + + +class OffsetResponse_v5(Response): + """ + adds a new error code, OFFSET_NOT_AVAILABLE + """ + API_KEY = 2 + API_VERSION = 5 + SCHEMA = OffsetResponse_v4.SCHEMA + + class OffsetRequest_v0(Request): API_KEY = 2 API_VERSION = 0 @@ -105,5 +142,53 @@ class OffsetRequest_v2(Request): } -OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2] -OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2] +class OffsetRequest_v3(Request): + API_KEY = 2 + API_VERSION = 3 + RESPONSE_TYPE = OffsetResponse_v3 + SCHEMA = OffsetRequest_v2.SCHEMA + DEFAULTS = { + 'replica_id': -1 + } + + +class OffsetRequest_v4(Request): + """ + Add current_leader_epoch to request + """ + API_KEY = 2 + API_VERSION = 4 + RESPONSE_TYPE = OffsetResponse_v4 + SCHEMA = Schema( + ('replica_id', Int32), + ('isolation_level', Int8), # <- added isolation_level + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int64), + ('timestamp', Int64))))) + ) + DEFAULTS = { + 'replica_id': -1 + } + + +class OffsetRequest_v5(Request): + API_KEY = 2 + API_VERSION = 5 + RESPONSE_TYPE = OffsetResponse_v5 + SCHEMA = OffsetRequest_v4.SCHEMA + DEFAULTS = { + 'replica_id': -1 + } + + +OffsetRequest = [ + OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2, + OffsetRequest_v3, OffsetRequest_v4, OffsetRequest_v5, +] +OffsetResponse = [ + OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2, + OffsetResponse_v3, OffsetResponse_v4, OffsetResponse_v5, +] diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 07368bba9..a098c42a9 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -62,8 +62,8 @@ ) from kafka.errors import CorruptRecordException, UnsupportedCodecError from kafka.codec import ( - gzip_encode, snappy_encode, lz4_encode, - gzip_decode, snappy_decode, lz4_decode + gzip_encode, snappy_encode, lz4_encode, zstd_encode, + gzip_decode, snappy_decode, lz4_decode, zstd_decode ) import kafka.codec as codecs @@ -97,6 +97,7 @@ class DefaultRecordBase(object): CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 + CODEC_ZSTD = 0x04 TIMESTAMP_TYPE_MASK = 0x08 TRANSACTIONAL_MASK = 0x10 CONTROL_MASK = 0x20 @@ -111,6 +112,8 @@ def _assert_has_codec(self, compression_type): checker, name = codecs.has_snappy, "snappy" elif compression_type == self.CODEC_LZ4: checker, name = codecs.has_lz4, "lz4" + elif compression_type == self.CODEC_ZSTD: + checker, name = codecs.has_zstd, "zstd" if not checker(): raise UnsupportedCodecError( "Libraries for {} compression codec not found".format(name)) @@ -185,6 +188,8 @@ def _maybe_uncompress(self): uncompressed = snappy_decode(data.tobytes()) if compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) + if compression_type == self.CODEC_ZSTD: + uncompressed = zstd_decode(data.tobytes()) self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True @@ -517,6 +522,8 @@ def _maybe_compress(self): compressed = snappy_encode(data) elif self._compression_type == self.CODEC_LZ4: compressed = lz4_encode(data) + elif self._compression_type == self.CODEC_ZSTD: + compressed = zstd_encode(data) compressed_size = len(compressed) if len(data) <= compressed_size: # We did not get any benefit from compression, lets send diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index a6c4b51f7..fc2ef2d6b 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -117,7 +117,7 @@ class MemoryRecordsBuilder(object): def __init__(self, magic, compression_type, batch_size): assert magic in [0, 1, 2], "Not supported magic" - assert compression_type in [0, 1, 2, 3], "Not valid compression type" + assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type" if magic >= 2: self._builder = DefaultRecordBatchBuilder( magic=magic, compression_type=compression_type, diff --git a/kafka/scram.py b/kafka/scram.py index 684925caa..7f003750c 100644 --- a/kafka/scram.py +++ b/kafka/scram.py @@ -71,7 +71,6 @@ def create_salted_password(self, salt, iterations): ) def final_message(self): - client_final_no_proof = 'c=biws,r=' + self.nonce return 'c=biws,r={},p={}'.format(self.nonce, base64.b64encode(self.client_proof).decode('utf-8')) def process_server_final_message(self, server_final_message): diff --git a/kafka/structs.py b/kafka/structs.py index 9ab4f8bfa..bcb023670 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -1,27 +1,87 @@ +""" Other useful structs """ from __future__ import absolute_import from collections import namedtuple -# Other useful structs +"""A topic and partition tuple + +Keyword Arguments: + topic (str): A topic name + partition (int): A partition id +""" TopicPartition = namedtuple("TopicPartition", ["topic", "partition"]) + +"""A Kafka broker metadata used by admin tools. + +Keyword Arguments: + nodeID (int): The Kafka broker id. + host (str): The Kafka broker hostname. + port (int): The Kafka broker port. + rack (str): The rack of the broker, which is used to in rack aware + partition assignment for fault tolerance. + Examples: `RACK1`, `us-east-1d`. Default: None +""" BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port", "rack"]) + +"""A topic partition metadata describing the state in the MetadataResponse. + +Keyword Arguments: + topic (str): The topic name of the partition this metadata relates to. + partition (int): The id of the partition this metadata relates to. + leader (int): The id of the broker that is the leader for the partition. + replicas (List[int]): The ids of all brokers that contain replicas of the + partition. + isr (List[int]): The ids of all brokers that contain in-sync replicas of + the partition. + error (KafkaError): A KafkaError object associated with the request for + this partition metadata. +""" PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr", "error"]) + +"""The Kafka offset commit API + +The Kafka offset commit API allows users to provide additional metadata +(in the form of a string) when an offset is committed. This can be useful +(for example) to store information about which node made the commit, +what time the commit was made, etc. + +Keyword Arguments: + offset (int): The offset to be committed + metadata (str): Non-null metadata +""" OffsetAndMetadata = namedtuple("OffsetAndMetadata", # TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata) ["offset", "metadata"]) + +"""An offset and timestamp tuple + +Keyword Arguments: + offset (int): An offset + timestamp (int): The timestamp associated to the offset +""" OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", ["offset", "timestamp"]) +MemberInformation = namedtuple("MemberInformation", + ["member_id", "client_id", "client_host", "member_metadata", "member_assignment"]) + +GroupInformation = namedtuple("GroupInformation", + ["error_code", "group", "state", "protocol_type", "protocol", "members", "authorized_operations"]) + +"""Define retry policy for async producer -# Define retry policy for async producer -# Limit value: int >= 0, 0 means no retries +Keyword Arguments: + Limit (int): Number of retries. limit >= 0, 0 means no retries + backoff_ms (int): Milliseconds to backoff. + retry_on_timeouts: +""" RetryOptions = namedtuple("RetryOptions", ["limit", "backoff_ms", "retry_on_timeouts"]) diff --git a/requirements-dev.txt b/requirements-dev.txt index d2830905b..6c3a6668e 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,6 +1,6 @@ flake8==3.4.1 -pytest==3.10.0 -pytest-cov==2.6.0 +pytest==5.4.3 +pytest-cov==2.10.0 docker-py==1.10.6 coveralls==1.5.1 Sphinx==1.6.4 @@ -9,8 +9,8 @@ xxhash==1.3.0 python-snappy==0.5.3 tox==3.5.3 mock==3.0.5 -pylint==1.9.3 -pytest-pylint==0.12.3 +pylint==2.5.3 +pytest-pylint==0.17.0 pytest-mock==1.10.0 sphinx-rtd-theme==0.2.4 crc32c==1.7 diff --git a/servers/2.2.0/resources/kafka.properties b/servers/2.5.0/resources/kafka.properties similarity index 100% rename from servers/2.2.0/resources/kafka.properties rename to servers/2.5.0/resources/kafka.properties diff --git a/servers/2.2.0/resources/kafka_server_jaas.conf b/servers/2.5.0/resources/kafka_server_jaas.conf similarity index 100% rename from servers/2.2.0/resources/kafka_server_jaas.conf rename to servers/2.5.0/resources/kafka_server_jaas.conf diff --git a/servers/2.2.0/resources/log4j.properties b/servers/2.5.0/resources/log4j.properties similarity index 100% rename from servers/2.2.0/resources/log4j.properties rename to servers/2.5.0/resources/log4j.properties diff --git a/servers/2.2.0/resources/zookeeper.properties b/servers/2.5.0/resources/zookeeper.properties similarity index 100% rename from servers/2.2.0/resources/zookeeper.properties rename to servers/2.5.0/resources/zookeeper.properties diff --git a/setup.py b/setup.py index 8bc484c9a..5cb2e7273 100644 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ def run(cls): version=__version__, tests_require=test_require, + extras_require={"crc32c": ["crc32c"]}, cmdclass={"test": Tox}, packages=find_packages(exclude=['test']), author="Dana Powers", @@ -56,6 +57,7 @@ def run(cls): "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", "Programming Language :: Python :: Implementation :: PyPy", "Topic :: Software Development :: Libraries :: Python Modules", ] diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 37b140573..dc04537d5 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -1,10 +1,13 @@ import pytest -from test.testutil import env_kafka_version +from logging import info +from test.testutil import env_kafka_version, random_string +from threading import Event, Thread +from time import time, sleep -from kafka.errors import NoError from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) +from kafka.errors import (NoError, GroupCoordinatorNotAvailableError) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -138,3 +141,98 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): with pytest.raises(ValueError): configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') +def test_describe_consumer_group_does_not_exist(kafka_admin_client): + """Tests that the describe consumer group call fails if the group coordinator is not available + """ + with pytest.raises(GroupCoordinatorNotAvailableError): + group_description = kafka_admin_client.describe_consumer_groups(['test']) + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') +def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic): + """Tests that the describe consumer group call returns valid consumer group information + This test takes inspiration from the test 'test_group' in test_consumer_group.py. + """ + consumers = {} + stop = {} + threads = {} + random_group_id = 'test-group-' + random_string(6) + group_id_list = [random_group_id, random_group_id + '_2'] + generations = {group_id_list[0]: set(), group_id_list[1]: set()} + def consumer_thread(i, group_id): + assert i not in consumers + assert i not in stop + stop[i] = Event() + consumers[i] = kafka_consumer_factory(group_id=group_id) + while not stop[i].is_set(): + consumers[i].poll(20) + consumers[i].close() + consumers[i] = None + stop[i] = None + + num_consumers = 3 + for i in range(num_consumers): + group_id = group_id_list[i % 2] + t = Thread(target=consumer_thread, args=(i, group_id,)) + t.start() + threads[i] = t + + try: + timeout = time() + 35 + while True: + for c in range(num_consumers): + + # Verify all consumers have been created + if c not in consumers: + break + + # Verify all consumers have an assignment + elif not consumers[c].assignment(): + break + + # If all consumers exist and have an assignment + else: + + info('All consumers have assignment... checking for stable group') + # Verify all consumers are in the same generation + # then log state and break while loop + + for consumer in consumers.values(): + generations[consumer.config['group_id']].add(consumer._coordinator._generation.generation_id) + + is_same_generation = any([len(consumer_generation) == 1 for consumer_generation in generations.values()]) + + # New generation assignment is not complete until + # coordinator.rejoining = False + rejoining = any([consumer._coordinator.rejoining + for consumer in list(consumers.values())]) + + if not rejoining and is_same_generation: + break + else: + sleep(1) + assert time() < timeout, "timeout waiting for assignments" + + info('Group stabilized; verifying assignment') + output = kafka_admin_client.describe_consumer_groups(group_id_list) + assert len(output) == 2 + consumer_groups = set() + for consumer_group in output: + assert(consumer_group.group in group_id_list) + if consumer_group.group == group_id_list[0]: + assert(len(consumer_group.members) == 2) + else: + assert(len(consumer_group.members) == 1) + for member in consumer_group.members: + assert(member.member_metadata.subscription[0] == topic) + assert(member.member_assignment.assignment[0][0] == topic) + consumer_groups.add(consumer_group.group) + assert(sorted(list(consumer_groups)) == group_id_list) + finally: + info('Shutting down %s consumers', num_consumers) + for c in range(num_consumers): + info('Stopping consumer %s', c) + stop[c].set() + threads[c].join() + threads[c] = None diff --git a/test/test_codec.py b/test/test_codec.py index 9eff888fe..e05707451 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -7,11 +7,12 @@ from kafka.vendor.six.moves import range from kafka.codec import ( - has_snappy, has_lz4, + has_snappy, has_lz4, has_zstd, gzip_encode, gzip_decode, snappy_encode, snappy_decode, lz4_encode, lz4_decode, lz4_encode_old_kafka, lz4_decode_old_kafka, + zstd_encode, zstd_decode, ) from test.testutil import random_string @@ -113,3 +114,11 @@ def test_lz4_incremental(): b2 = lz4_decode(lz4_encode(b1)) assert len(b1) == len(b2) assert b1 == b2 + + +@pytest.mark.skipif(not has_zstd(), reason="Zstd not available") +def test_zstd(): + for _ in range(1000): + b1 = random_string(100).encode('utf-8') + b2 = zstd_decode(zstd_encode(b1)) + assert b1 == b2 diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 6e6bc9455..90b7ed203 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -6,14 +6,23 @@ from kafka.vendor.six.moves import range import kafka.codec -from kafka.errors import ( - KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError -) +from kafka.errors import UnsupportedCodecError, UnsupportedVersionError from kafka.structs import TopicPartition, OffsetAndTimestamp from test.testutil import Timer, assert_message_count, env_kafka_version, random_string +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +def test_kafka_version_infer(kafka_consumer_factory): + consumer = kafka_consumer_factory() + actual_ver_major_minor = env_kafka_version()[:2] + client = consumer._client + conn = list(client._conns.values())[0] + inferred_ver_major_minor = conn.check_version()[:2] + assert actual_ver_major_minor == inferred_ver_major_minor, \ + "Was expecting inferred broker version to be %s but was %s" % (actual_ver_major_minor, inferred_ver_major_minor) + + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer(kafka_consumer_factory, send_messages): """Test KafkaConsumer""" diff --git a/test/test_producer.py b/test/test_producer.py index 9605adf58..7263130d1 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -23,16 +23,16 @@ def test_buffer_pool(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_end_to_end(kafka_broker, compression): - if compression == 'lz4': - # LZ4 requires 0.8.2 if env_kafka_version() < (0, 8, 2): - return - # python-lz4 crashes on older versions of pypy + pytest.skip('LZ4 requires 0.8.2') elif platform.python_implementation() == 'PyPy': - return + pytest.skip('python-lz4 crashes on older versions of pypy') + + if compression == 'zstd' and env_kafka_version() < (2, 1, 0): + pytest.skip('zstd requires kafka 2.1.0 or newer') connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, @@ -81,8 +81,10 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): + if compression == 'zstd' and env_kafka_version() < (2, 1, 0): + pytest.skip('zstd requires 2.1.0 or more') connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, @@ -124,10 +126,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if headers: assert record.serialized_header_size == 22 - # generated timestamp case is skipped for broker 0.9 and below if magic == 0: - return - + pytest.skip('generated timestamp case is skipped for broker 0.9 and below') send_time = time.time() * 1000 future = producer.send( topic, diff --git a/tox.ini b/tox.ini index 06403d6ed..10e9911dc 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py{26,27,34,35,36,37,py}, docs +envlist = py{26,27,34,35,36,37,38,py}, docs [pytest] testpaths = kafka test @@ -8,13 +8,14 @@ log_format = %(created)f %(filename)-23s %(threadName)s %(message)s [testenv] deps = - pytest<4.0 + pytest pytest-cov - py{27,34,35,36,37,py}: pylint - py{27,34,35,36,37,py}: pytest-pylint + py{27,34,35,36,37,38,py}: pylint + py{27,34,35,36,37,38,py}: pytest-pylint pytest-mock mock python-snappy + zstandard lz4 xxhash crc32c