diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 96df685f4..c9055f95a 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -33,6 +33,7 @@ jobs: - "3.0.2" - "3.5.2" - "3.9.0" + - "4.0.0" python: - "3.13" include: diff --git a/Makefile b/Makefile index b9f199ef0..c0128e7e2 100644 --- a/Makefile +++ b/Makefile @@ -68,6 +68,7 @@ kafka_scala_0_9_0_1=2.11 kafka_scala_0_10_0_0=2.11 kafka_scala_0_10_0_1=2.11 kafka_scala_0_10_1_0=2.11 +kafka_scala_4_0_0=2.13 scala_version=$(if $(SCALA_VERSION),$(SCALA_VERSION),$(if $(kafka_scala_$(subst .,_,$(1))),$(kafka_scala_$(subst .,_,$(1))),2.12)) kafka_artifact_name=kafka_$(call scala_version,$(1))-$(1).$(if $(filter 0.8.0,$(1)),tar.gz,tgz) diff --git a/kafka/protocol/broker_api_versions.py b/kafka/protocol/broker_api_versions.py index 299ab547a..af142d07c 100644 --- a/kafka/protocol/broker_api_versions.py +++ b/kafka/protocol/broker_api_versions.py @@ -63,4 +63,6 @@ (3, 9): {0: (0, 11), 1: (0, 17), 2: (0, 9), 3: (0, 12), 4: (0, 7), 5: (0, 4), 6: (0, 8), 7: (0, 3), 8: (0, 9), 9: (0, 9), 10: (0, 6), 11: (0, 9), 12: (0, 4), 13: (0, 5), 14: (0, 5), 15: (0, 5), 16: (0, 5), 17: (0, 1), 18: (0, 4), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 5), 23: (0, 4), 24: (0, 5), 25: (0, 4), 26: (0, 4), 27: (0, 1), 28: (0, 4), 29: (0, 3), 30: (0, 3), 31: (0, 3), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 4), 36: (0, 2), 37: (0, 3), 38: (0, 3), 39: (0, 2), 40: (0, 2), 41: (0, 3), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 3), 57: (0, 1), 58: (0, 0), 60: (0, 1), 61: (0, 0), 65: (0, 0), 66: (0, 1), 67: (0, 0), 68: (0, 0), 69: (0, 0)}, + (4, 0): {0: (0, 12), 1: (4, 17), 2: (1, 10), 3: (0, 13), 8: (2, 9), 9: (1, 9), 10: (0, 6), 11: (2, 9), 12: (0, 4), 13: (0, 5), 14: (0, 5), 15: (0, 6), 16: (0, 5), 17: (0, 1), 18: (0, 4), 19: (2, 7), 20: (1, 6), 21: (0, 2), 22: (0, 5), 23: (2, 4), 24: (0, 5), 25: (0, 4), 26: (0, 5), 27: (1, 1), 28: (0, 5), 29: (1, 3), 30: (1, 3), 31: (1, 3), 32: (1, 4), 33: (0, 2), 34: (1, 2), 35: (1, 4), 36: (0, 2), 37: (0, 3), 38: (1, 3), 39: (1, 2), 40: (1, 2), 41: (1, 3), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 55: (0, 2), 57: (0, 2), 60: (0, 2), 61: (0, 0), 64: (0, 0), 65: (0, 0), 66: (0, 1), 68: (0, 1), 69: (0, 1), 74: (0, 0), 75: (0, 0), 80: (0, 0), 81: (0, 0)}, + } diff --git a/servers/4.0.0/resources/kafka.properties b/servers/4.0.0/resources/kafka.properties new file mode 100644 index 000000000..3dba393ba --- /dev/null +++ b/servers/4.0.0/resources/kafka.properties @@ -0,0 +1,161 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +############################# Server Basics ############################# + +# The role of this server. Setting this puts us in KRaft mode +process.roles=broker,controller + +# The node id associated with this instance's roles +node.id={broker_id} + +# List of controller endpoints used connect to the controller cluster +controller.quorum.bootstrap.servers={controller_bootstrap_host}:{controller_port} + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. +# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum. +# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(), +# with PLAINTEXT listener name, and port 9092. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +#listeners=PLAINTEXT://:9092,CONTROLLER://:9093 +listeners={transport}://{host}:{port},CONTROLLER://{host}:{controller_port} + +# Name of listener used for communication between brokers. +inter.broker.listener.name={transport} + +{sasl_config} + +authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer +allow.everyone.if.no.acl.found=true + +# Listener name, hostname and port the broker or the controller will advertise to clients. +# If not set, it uses the value for "listeners". +advertised.listeners={transport}://{host}:{port},CONTROLLER://{host}:{controller_port} + +# A comma-separated list of the names of the listeners used by the controller. +# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol +# This is required if running in KRaft mode. +controller.listener.names=CONTROLLER + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + +# The number of threads that the server uses for receiving requests from the network and sending responses to the network +num.network.threads=3 + +# The number of threads that the server uses for processing requests, which may include disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs={tmp_dir}/kraft-combined-logs + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={partitions} +default.replication.factor={replicas} + +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topics "__consumer_offsets", "__share_group_state" and "__transaction_state" +# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. +offsets.topic.replication.factor=1 +share.coordinator.state.topic.replication.factor=1 +share.coordinator.state.topic.min.isr=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log unless the remaining +# segments drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +# tune down offset topics to reduce setup time in tests +offsets.commit.timeout.ms=500 +offsets.topic.num.partitions=2 +offsets.topic.replication.factor=1 + +# Allow shorter session timeouts for tests +group.min.session.timeout.ms=1000 + +############################# Group Coordinator Settings ############################# + +# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. +# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. +# The default value for this is 3 seconds. +# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. +# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. +group.initial.rebalance.delay.ms=0 diff --git a/test/conftest.py b/test/conftest.py index ddd491517..4c4c503e7 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -9,6 +9,7 @@ from test.testutil import env_kafka_version, random_string from test.fixtures import KafkaFixture, ZookeeperFixture + @pytest.fixture(scope="module") def zookeeper(): """Return a Zookeeper fixture""" @@ -23,18 +24,18 @@ def zookeeper(): @pytest.fixture(scope="module") -def kafka_broker(kafka_broker_factory, zookeeper): +def kafka_broker(kafka_broker_factory): """Return a Kafka broker fixture""" if "KAFKA_URI" in os.environ: parse = urlparse(os.environ["KAFKA_URI"]) (host, port) = (parse.hostname, parse.port) - return KafkaFixture.instance(0, zookeeper, host=host, port=port, external=True) + return KafkaFixture.instance(0, host=host, port=port, external=True) else: - return kafka_broker_factory()[0] + return kafka_broker_factory() @pytest.fixture(scope="module") -def kafka_broker_factory(zookeeper): +def kafka_broker_factory(): """Return a Kafka broker fixture factory""" assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests' @@ -42,16 +43,20 @@ def kafka_broker_factory(zookeeper): def factory(**broker_params): params = {} if broker_params is None else broker_params.copy() params.setdefault('partitions', 4) - num_brokers = params.pop('num_brokers', 1) - brokers = tuple(KafkaFixture.instance(x, zookeeper, **params) - for x in range(num_brokers)) - _brokers.extend(brokers) - return brokers + node_id = params.pop('node_id', 0) + broker = KafkaFixture.instance(node_id, **params) + _brokers.append(broker) + return broker yield factory + zks = set() for broker in _brokers: + zks.add(broker.zookeeper) broker.close() + for zk in zks: + if zk: + zk.close() @pytest.fixture @@ -108,11 +113,13 @@ def factory(**kafka_producer_params): if _producer[0]: _producer[0].close() + @pytest.fixture def kafka_admin_client(kafka_admin_client_factory): """Return a KafkaAdminClient fixture""" yield kafka_admin_client_factory() + @pytest.fixture def kafka_admin_client_factory(kafka_broker): """Return a KafkaAdminClient factory fixture""" @@ -128,6 +135,7 @@ def factory(**kafka_admin_client_params): if _admin_client[0]: _admin_client[0].close() + @pytest.fixture def topic(kafka_broker, request): """Return a topic fixture""" diff --git a/test/fixtures.py b/test/fixtures.py index 9843d5a2b..dc41cc8e3 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -1,6 +1,7 @@ from __future__ import absolute_import, division import atexit +import base64 import logging import os import os.path @@ -11,6 +12,7 @@ import py from kafka.vendor.six.moves import range +from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer from kafka.errors import InvalidReplicationFactorError, KafkaTimeoutError @@ -220,17 +222,25 @@ class KafkaFixture(Fixture): broker_password = 'alice-secret' @classmethod - def instance(cls, broker_id, zookeeper, zk_chroot=None, - host=None, port=None, external=False, + def instance(cls, broker_id, zookeeper=None, zk_chroot=None, + host="localhost", port=None, external=False, transport='PLAINTEXT', replicas=1, partitions=4, sasl_mechanism=None, auto_create_topic=True, tmp_dir=None): - if zk_chroot is None: - zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") - if host is None: - host = "localhost" + # Kafka requries zookeeper prior to 4.0 release + if env_kafka_version() < (4, 0): + if zookeeper is None: + if "ZOOKEEPER_URI" in os.environ: + parse = urlparse(os.environ["ZOOKEEPER_URI"]) + (host, port) = (parse.hostname, parse.port) + zookeeper = ZookeeperFixture.instance(host=host, port=port, external=True) + elif not external: + zookeeper = ZookeeperFixture.instance() + if zk_chroot is None: + zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") + fixture = KafkaFixture(host, port, broker_id, - zookeeper, zk_chroot, + zookeeper=zookeeper, zk_chroot=zk_chroot, external=external, transport=transport, replicas=replicas, partitions=partitions, @@ -241,15 +251,23 @@ def instance(cls, broker_id, zookeeper, zk_chroot=None, fixture.open() return fixture - def __init__(self, host, port, broker_id, zookeeper, zk_chroot, + def __init__(self, host, port, broker_id, zookeeper=None, zk_chroot=None, replicas=1, partitions=2, transport='PLAINTEXT', sasl_mechanism=None, auto_create_topic=True, tmp_dir=None, external=False): super(KafkaFixture, self).__init__() self.host = host - self.port = port + self.controller_bootstrap_host = host + if port is None: + self.auto_port = True + self.port = get_open_port() + else: + self.auto_port = False + self.port = port + self.controller_port = self.port + 1 + self.cluster_id = self._gen_cluster_id() self.broker_id = broker_id self.auto_create_topic = auto_create_topic self.transport = transport.upper() @@ -262,15 +280,19 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, # TODO: checking for port connection would be better than scanning logs # until then, we need the pattern to work across all supported broker versions # The logging format changed slightly in 1.0.0 - self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,) - # Need to wait until the broker has fetched user configs from zookeeper in case we use scram as sasl mechanism - self.scram_pattern = r"Removing Produce quota for user %s" % (self.broker_user) + if env_kafka_version() < (4, 0): + self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,) + # Need to wait until the broker has fetched user configs from zookeeper in case we use scram as sasl mechanism + self.scram_pattern = r"Removing Produce quota for user %s" % (self.broker_user) + else: + self.start_pattern = r"\[KafkaRaftServer nodeId=%d\] Kafka Server started" % (broker_id,) + self.scram_pattern = r"Replayed UserScramCredentialRecord creating new entry for %s" % (self.broker_user,) self.zookeeper = zookeeper self.zk_chroot = zk_chroot # Add the attributes below for the template binding - self.zk_host = self.zookeeper.host - self.zk_port = self.zookeeper.port + self.zk_host = self.zookeeper.host if self.zookeeper else None + self.zk_port = self.zookeeper.port if self.zookeeper else None self.replicas = replicas self.partitions = partitions @@ -289,6 +311,9 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, self.sasl_config = '' self.jaas_config = '' + def _gen_cluster_id(self): + return base64.b64encode(uuid.uuid4().bytes).decode('utf-8').rstrip('=') + def _sasl_config(self): if not self.sasl_enabled: return '' @@ -400,12 +425,11 @@ def start(self): backoff = 1 end_at = time.time() + max_timeout tries = 1 - auto_port = (self.port is None) while time.time() < end_at: # We have had problems with port conflicts on travis # so we will try a different port on each retry # unless the fixture was passed a specific port - if auto_port: + if self.auto_port: self.port = get_open_port() self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) self.render_template(properties_template, properties, vars(self)) @@ -451,6 +475,9 @@ def open(self): self.tmp_dir.ensure(dir=True) self.tmp_dir.ensure('logs', dir=True) self.tmp_dir.ensure('data', dir=True) + properties = self.tmp_dir.join('kafka.properties') + properties_template = self.test_resource('kafka.properties') + self.render_template(properties_template, properties, vars(self)) self.out("Running local instance...") log.info(" host = %s", self.host) @@ -458,19 +485,26 @@ def open(self): log.info(" transport = %s", self.transport) log.info(" sasl_mechanism = %s", self.sasl_mechanism) log.info(" broker_id = %s", self.broker_id) - log.info(" zk_host = %s", self.zookeeper.host) - log.info(" zk_port = %s", self.zookeeper.port) + log.info(" zk_host = %s", self.zk_host) + log.info(" zk_port = %s", self.zk_port) log.info(" zk_chroot = %s", self.zk_chroot) log.info(" replicas = %s", self.replicas) log.info(" partitions = %s", self.partitions) log.info(" tmp_dir = %s", self.tmp_dir.strpath) - self._create_zk_chroot() + if self.zookeeper: + if self.zk_chroot: + self._create_zk_chroot() + # add user to zookeeper for the first server + if self.sasl_enabled and self.sasl_mechanism.startswith("SCRAM-SHA") and self.broker_id == 0: + self._add_scram_user() + + else: + # running in KRaft mode + self._format_log_dirs() + self.sasl_config = self._sasl_config() self.jaas_config = self._jaas_config() - # add user to zookeeper for the first server - if self.sasl_enabled and self.sasl_mechanism.startswith("SCRAM-SHA") and self.broker_id == 0: - self._add_scram_user() self.start() atexit.register(self.close) @@ -502,6 +536,21 @@ def dump_logs(self): super(KafkaFixture, self).dump_logs() self.zookeeper.dump_logs() + def _format_log_dirs(self): + self.out("Formatting log dirs for kraft bootstrapping") + args = self.run_script('kafka-storage.sh', 'format', '--standalone', '-t', self.cluster_id, '-c', self.tmp_dir.join("kafka.properties")) + if self.sasl_enabled and self.sasl_mechanism.startswith("SCRAM-SHA"): + args.extend(['--add-scram', '{}=[name={},password={}]'.format(self.sasl_mechanism, self.broker_user, self.broker_password)]) + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + self.out("Failed to format log dirs for kraft bootstrap!") + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to format log dirs!") + return True + def _send_request(self, request, timeout=None): def _failure(error): raise error @@ -541,8 +590,9 @@ def _create_topic(self, topic_name, num_partitions=None, replication_factor=None # Try different methods to create a topic, from the fastest to the slowest if self.auto_create_topic and num_partitions == self.partitions and replication_factor == self.replicas: self._create_topic_via_metadata(topic_name, timeout_ms) - elif env_kafka_version() >= (0, 10, 1, 0): + elif env_kafka_version() >= (0, 10, 1, 0) and env_kafka_version() < (4, 0): try: + # 4.0 brokers dropped support for CreateTopicsRequest v0 (TODO: pick from api_versions) self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) except InvalidReplicationFactorError: # wait and try again @@ -686,8 +736,8 @@ def get_api_versions(): def run_brokers(): logging.basicConfig(level=logging.ERROR) - zk = ZookeeperFixture.instance() - k = KafkaFixture.instance(0, zk) + k = KafkaFixture.instance(0) + zk = k.zookeeper print("Kafka", k.kafka_version, "running on port:", k.port) try: @@ -696,7 +746,8 @@ def run_brokers(): except KeyboardInterrupt: print("Bye!") k.close() - zk.close() + if zk: + zk.close() if __name__ == '__main__': diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py index 0f404da20..69323fb92 100644 --- a/test/test_sasl_integration.py +++ b/test/test_sasl_integration.py @@ -25,7 +25,7 @@ ] ) def sasl_kafka(request, kafka_broker_factory): - sasl_kafka = kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param)[0] + sasl_kafka = kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param) yield sasl_kafka sasl_kafka.child.dump_logs()