Skip to content

Commit 3bd280a

Browse files
authored
Support KRaft / 4.0 brokers in tests (#2559)
1 parent 4995e9b commit 3bd280a

File tree

7 files changed

+260
-36
lines changed

7 files changed

+260
-36
lines changed

.github/workflows/python-package.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ jobs:
3333
- "3.0.2"
3434
- "3.5.2"
3535
- "3.9.0"
36+
- "4.0.0"
3637
python:
3738
- "3.13"
3839
include:

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ kafka_scala_0_9_0_1=2.11
6868
kafka_scala_0_10_0_0=2.11
6969
kafka_scala_0_10_0_1=2.11
7070
kafka_scala_0_10_1_0=2.11
71+
kafka_scala_4_0_0=2.13
7172
scala_version=$(if $(SCALA_VERSION),$(SCALA_VERSION),$(if $(kafka_scala_$(subst .,_,$(1))),$(kafka_scala_$(subst .,_,$(1))),2.12))
7273

7374
kafka_artifact_name=kafka_$(call scala_version,$(1))-$(1).$(if $(filter 0.8.0,$(1)),tar.gz,tgz)

kafka/protocol/broker_api_versions.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,6 @@
6363

6464
(3, 9): {0: (0, 11), 1: (0, 17), 2: (0, 9), 3: (0, 12), 4: (0, 7), 5: (0, 4), 6: (0, 8), 7: (0, 3), 8: (0, 9), 9: (0, 9), 10: (0, 6), 11: (0, 9), 12: (0, 4), 13: (0, 5), 14: (0, 5), 15: (0, 5), 16: (0, 5), 17: (0, 1), 18: (0, 4), 19: (0, 7), 20: (0, 6), 21: (0, 2), 22: (0, 5), 23: (0, 4), 24: (0, 5), 25: (0, 4), 26: (0, 4), 27: (0, 1), 28: (0, 4), 29: (0, 3), 30: (0, 3), 31: (0, 3), 32: (0, 4), 33: (0, 2), 34: (0, 2), 35: (0, 4), 36: (0, 2), 37: (0, 3), 38: (0, 3), 39: (0, 2), 40: (0, 2), 41: (0, 3), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 56: (0, 3), 57: (0, 1), 58: (0, 0), 60: (0, 1), 61: (0, 0), 65: (0, 0), 66: (0, 1), 67: (0, 0), 68: (0, 0), 69: (0, 0)},
6565

66+
(4, 0): {0: (0, 12), 1: (4, 17), 2: (1, 10), 3: (0, 13), 8: (2, 9), 9: (1, 9), 10: (0, 6), 11: (2, 9), 12: (0, 4), 13: (0, 5), 14: (0, 5), 15: (0, 6), 16: (0, 5), 17: (0, 1), 18: (0, 4), 19: (2, 7), 20: (1, 6), 21: (0, 2), 22: (0, 5), 23: (2, 4), 24: (0, 5), 25: (0, 4), 26: (0, 5), 27: (1, 1), 28: (0, 5), 29: (1, 3), 30: (1, 3), 31: (1, 3), 32: (1, 4), 33: (0, 2), 34: (1, 2), 35: (1, 4), 36: (0, 2), 37: (0, 3), 38: (1, 3), 39: (1, 2), 40: (1, 2), 41: (1, 3), 42: (0, 2), 43: (0, 2), 44: (0, 1), 45: (0, 0), 46: (0, 0), 47: (0, 0), 48: (0, 1), 49: (0, 1), 50: (0, 0), 51: (0, 0), 55: (0, 2), 57: (0, 2), 60: (0, 2), 61: (0, 0), 64: (0, 0), 65: (0, 0), 66: (0, 1), 68: (0, 1), 69: (0, 1), 74: (0, 0), 75: (0, 0), 80: (0, 0), 81: (0, 0)},
67+
6668
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
############################# Server Basics #############################
17+
18+
# The role of this server. Setting this puts us in KRaft mode
19+
process.roles=broker,controller
20+
21+
# The node id associated with this instance's roles
22+
node.id={broker_id}
23+
24+
# List of controller endpoints used connect to the controller cluster
25+
controller.quorum.bootstrap.servers={controller_bootstrap_host}:{controller_port}
26+
27+
############################# Socket Server Settings #############################
28+
29+
# The address the socket server listens on.
30+
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
31+
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
32+
# with PLAINTEXT listener name, and port 9092.
33+
# FORMAT:
34+
# listeners = listener_name://host_name:port
35+
# EXAMPLE:
36+
# listeners = PLAINTEXT://your.host.name:9092
37+
#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
38+
listeners={transport}://{host}:{port},CONTROLLER://{host}:{controller_port}
39+
40+
# Name of listener used for communication between brokers.
41+
inter.broker.listener.name={transport}
42+
43+
{sasl_config}
44+
45+
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
46+
allow.everyone.if.no.acl.found=true
47+
48+
# Listener name, hostname and port the broker or the controller will advertise to clients.
49+
# If not set, it uses the value for "listeners".
50+
advertised.listeners={transport}://{host}:{port},CONTROLLER://{host}:{controller_port}
51+
52+
# A comma-separated list of the names of the listeners used by the controller.
53+
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
54+
# This is required if running in KRaft mode.
55+
controller.listener.names=CONTROLLER
56+
57+
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
58+
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
59+
60+
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
61+
num.network.threads=3
62+
63+
# The number of threads that the server uses for processing requests, which may include disk I/O
64+
num.io.threads=8
65+
66+
# The send buffer (SO_SNDBUF) used by the socket server
67+
socket.send.buffer.bytes=102400
68+
69+
# The receive buffer (SO_RCVBUF) used by the socket server
70+
socket.receive.buffer.bytes=102400
71+
72+
# The maximum size of a request that the socket server will accept (protection against OOM)
73+
socket.request.max.bytes=104857600
74+
75+
76+
############################# Log Basics #############################
77+
78+
# A comma separated list of directories under which to store log files
79+
log.dirs={tmp_dir}/kraft-combined-logs
80+
81+
# The default number of log partitions per topic. More partitions allow greater
82+
# parallelism for consumption, but this will also result in more files across
83+
# the brokers.
84+
num.partitions={partitions}
85+
default.replication.factor={replicas}
86+
87+
## Short Replica Lag -- Drops failed brokers out of ISR
88+
replica.lag.time.max.ms=1000
89+
replica.socket.timeout.ms=1000
90+
91+
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
92+
# This value is recommended to be increased for installations with data dirs located in RAID array.
93+
num.recovery.threads.per.data.dir=1
94+
95+
############################# Internal Topic Settings #############################
96+
# The replication factor for the group metadata internal topics "__consumer_offsets", "__share_group_state" and "__transaction_state"
97+
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
98+
offsets.topic.replication.factor=1
99+
share.coordinator.state.topic.replication.factor=1
100+
share.coordinator.state.topic.min.isr=1
101+
transaction.state.log.replication.factor=1
102+
transaction.state.log.min.isr=1
103+
104+
############################# Log Flush Policy #############################
105+
106+
# Messages are immediately written to the filesystem but by default we only fsync() to sync
107+
# the OS cache lazily. The following configurations control the flush of data to disk.
108+
# There are a few important trade-offs here:
109+
# 1. Durability: Unflushed data may be lost if you are not using replication.
110+
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
111+
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
112+
# The settings below allow one to configure the flush policy to flush data after a period of time or
113+
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
114+
115+
# The number of messages to accept before forcing a flush of data to disk
116+
#log.flush.interval.messages=10000
117+
118+
# The maximum amount of time a message can sit in a log before we force a flush
119+
#log.flush.interval.ms=1000
120+
121+
############################# Log Retention Policy #############################
122+
123+
# The following configurations control the disposal of log segments. The policy can
124+
# be set to delete segments after a period of time, or after a given size has accumulated.
125+
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
126+
# from the end of the log.
127+
128+
# The minimum age of a log file to be eligible for deletion due to age
129+
log.retention.hours=168
130+
131+
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
132+
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
133+
#log.retention.bytes=1073741824
134+
135+
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
136+
log.segment.bytes=1073741824
137+
138+
# The interval at which log segments are checked to see if they can be deleted according
139+
# to the retention policies
140+
log.retention.check.interval.ms=300000
141+
142+
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
143+
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
144+
log.cleaner.enable=false
145+
146+
# tune down offset topics to reduce setup time in tests
147+
offsets.commit.timeout.ms=500
148+
offsets.topic.num.partitions=2
149+
offsets.topic.replication.factor=1
150+
151+
# Allow shorter session timeouts for tests
152+
group.min.session.timeout.ms=1000
153+
154+
############################# Group Coordinator Settings #############################
155+
156+
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
157+
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
158+
# The default value for this is 3 seconds.
159+
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
160+
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
161+
group.initial.rebalance.delay.ms=0

test/conftest.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from test.testutil import env_kafka_version, random_string
1010
from test.fixtures import KafkaFixture, ZookeeperFixture
1111

12+
1213
@pytest.fixture(scope="module")
1314
def zookeeper():
1415
"""Return a Zookeeper fixture"""
@@ -23,35 +24,39 @@ def zookeeper():
2324

2425

2526
@pytest.fixture(scope="module")
26-
def kafka_broker(kafka_broker_factory, zookeeper):
27+
def kafka_broker(kafka_broker_factory):
2728
"""Return a Kafka broker fixture"""
2829
if "KAFKA_URI" in os.environ:
2930
parse = urlparse(os.environ["KAFKA_URI"])
3031
(host, port) = (parse.hostname, parse.port)
31-
return KafkaFixture.instance(0, zookeeper, host=host, port=port, external=True)
32+
return KafkaFixture.instance(0, host=host, port=port, external=True)
3233
else:
33-
return kafka_broker_factory()[0]
34+
return kafka_broker_factory()
3435

3536

3637
@pytest.fixture(scope="module")
37-
def kafka_broker_factory(zookeeper):
38+
def kafka_broker_factory():
3839
"""Return a Kafka broker fixture factory"""
3940
assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests'
4041

4142
_brokers = []
4243
def factory(**broker_params):
4344
params = {} if broker_params is None else broker_params.copy()
4445
params.setdefault('partitions', 4)
45-
num_brokers = params.pop('num_brokers', 1)
46-
brokers = tuple(KafkaFixture.instance(x, zookeeper, **params)
47-
for x in range(num_brokers))
48-
_brokers.extend(brokers)
49-
return brokers
46+
node_id = params.pop('node_id', 0)
47+
broker = KafkaFixture.instance(node_id, **params)
48+
_brokers.append(broker)
49+
return broker
5050

5151
yield factory
5252

53+
zks = set()
5354
for broker in _brokers:
55+
zks.add(broker.zookeeper)
5456
broker.close()
57+
for zk in zks:
58+
if zk:
59+
zk.close()
5560

5661

5762
@pytest.fixture
@@ -108,11 +113,13 @@ def factory(**kafka_producer_params):
108113
if _producer[0]:
109114
_producer[0].close()
110115

116+
111117
@pytest.fixture
112118
def kafka_admin_client(kafka_admin_client_factory):
113119
"""Return a KafkaAdminClient fixture"""
114120
yield kafka_admin_client_factory()
115121

122+
116123
@pytest.fixture
117124
def kafka_admin_client_factory(kafka_broker):
118125
"""Return a KafkaAdminClient factory fixture"""
@@ -128,6 +135,7 @@ def factory(**kafka_admin_client_params):
128135
if _admin_client[0]:
129136
_admin_client[0].close()
130137

138+
131139
@pytest.fixture
132140
def topic(kafka_broker, request):
133141
"""Return a topic fixture"""

0 commit comments

Comments
 (0)