Skip to content

Commit 0869ba9

Browse files
committed
Add Kraft to Kafka containers
1 parent 090bd0d commit 0869ba9

File tree

3 files changed

+130
-9
lines changed

3 files changed

+130
-9
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from typing import Callable
2+
3+
from packaging.version import Version
4+
5+
6+
class ComparableVersion:
7+
def __init__(self, version):
8+
self.version = Version(version)
9+
10+
def __lt__(self, other: str):
11+
return self._apply_op(other, lambda x, y: x < y)
12+
13+
def __le__(self, other: str):
14+
return self._apply_op(other, lambda x, y: x <= y)
15+
16+
def __eq__(self, other: str):
17+
return self._apply_op(other, lambda x, y: x == y)
18+
19+
def __ne__(self, other: str):
20+
return self._apply_op(other, lambda x, y: x != y)
21+
22+
def __gt__(self, other: str):
23+
return self._apply_op(other, lambda x, y: x > y)
24+
25+
def __ge__(self, other: str):
26+
return self._apply_op(other, lambda x, y: x >= y)
27+
28+
def _apply_op(self, other: str, op: Callable[[Version, Version], bool]):
29+
other = Version(other)
30+
return op(self.version, other)

modules/kafka/testcontainers/kafka/__init__.py

Lines changed: 94 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1+
import base64
12
import tarfile
23
import time
4+
import uuid
35
from io import BytesIO
46
from textwrap import dedent
57

8+
from typing_extensions import Self
9+
610
from testcontainers.core.container import DockerContainer
711
from testcontainers.core.utils import raise_for_deprecated_parameter
12+
from testcontainers.core.version import ComparableVersion
813
from testcontainers.core.waiting_utils import wait_for_logs
914
from testcontainers.kafka._redpanda import RedpandaContainer
1015

@@ -29,15 +34,22 @@ class KafkaContainer(DockerContainer):
2934
"""
3035

3136
TC_START_SCRIPT = "/tc-start.sh"
37+
MIN_KRAFT_TAG = "7.0.0"
3238

3339
def __init__(self, image: str = "confluentinc/cp-kafka:7.6.0", port: int = 9093, **kwargs) -> None:
3440
raise_for_deprecated_parameter(kwargs, "port_to_expose", "port")
3541
super().__init__(image, **kwargs)
3642
self.port = port
43+
self.kraft_enabled = False
44+
self.wait_for = r".*\[KafkaServer id=\d+\] started.*"
45+
self.boot_command = ""
46+
self.cluster_id = self._random_uuid()
47+
self.listeners = f"PLAINTEXT://0.0.0.0:{self.port},BROKER://0.0.0.0:9092"
48+
self.security_protocol_map = "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
49+
3750
self.with_exposed_ports(self.port)
38-
listeners = f"PLAINTEXT://0.0.0.0:{self.port},BROKER://0.0.0.0:9092"
39-
self.with_env("KAFKA_LISTENERS", listeners)
40-
self.with_env("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
51+
self.with_env("KAFKA_LISTENERS", self.listeners)
52+
self.with_env("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", self.security_protocol_map)
4153
self.with_env("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
4254

4355
self.with_env("KAFKA_BROKER_ID", "1")
@@ -46,6 +58,82 @@ def __init__(self, image: str = "confluentinc/cp-kafka:7.6.0", port: int = 9093,
4658
self.with_env("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "10000000")
4759
self.with_env("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
4860

61+
def with_kraft(self) -> Self:
62+
self._verify_min_kraft_version()
63+
self.kraft_enabled = True
64+
return self
65+
66+
def _verify_min_kraft_version(self):
67+
actual_version = self.image.split(":")[-1]
68+
69+
if ComparableVersion(actual_version) < self.MIN_KRAFT_TAG:
70+
raise ValueError(
71+
f"Provided Confluent Platform's version {actual_version} "
72+
f"is not supported in Kraft mode"
73+
f" (must be {self.MIN_KRAFT_TAG} or above)"
74+
)
75+
76+
def with_cluster_id(self, cluster_id: str) -> Self:
77+
self.cluster_id = cluster_id
78+
return self
79+
80+
@classmethod
81+
def _random_uuid(cls):
82+
uuid_value = uuid.uuid4()
83+
uuid_bytes = uuid_value.bytes
84+
base64_encoded_uuid = base64.b64encode(uuid_bytes)
85+
86+
return base64_encoded_uuid.decode()
87+
88+
def configure(self):
89+
if self.kraft_enabled:
90+
self._configure_kraft()
91+
else:
92+
self._configure_zookeeper()
93+
94+
def _configure_kraft(self) -> None:
95+
self.wait_for = r".*Kafka Server started.*"
96+
97+
self.with_env("CLUSTER_ID", self.cluster_id)
98+
self.with_env("KAFKA_NODE_ID", 1)
99+
self.with_env(
100+
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
101+
f"{self.security_protocol_map},CONTROLLER:PLAINTEXT",
102+
)
103+
self.with_env(
104+
"KAFKA_LISTENERS",
105+
f"{self.listeners},CONTROLLER://0.0.0.0:9094",
106+
)
107+
self.with_env("KAFKA_PROCESS_ROLES", "broker,controller")
108+
109+
network_alias = self._get_network_alias()
110+
controller_quorum_voters = f"1@{network_alias}:9094"
111+
self.with_env("KAFKA_CONTROLLER_QUORUM_VOTERS", controller_quorum_voters)
112+
self.with_env("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
113+
114+
self.boot_command = f"""
115+
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
116+
echo 'kafka-storage format --ignore-formatted -t {self.cluster_id} -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
117+
"""
118+
119+
def _get_network_alias(self):
120+
if self._network:
121+
return next(
122+
iter(self._network_aliases or [self._network.name or self._kwargs.get("network", [])]),
123+
None,
124+
)
125+
126+
return "localhost"
127+
128+
def _configure_zookeeper(self) -> None:
129+
self.boot_command = """
130+
echo 'clientPort=2181' > zookeeper.properties
131+
echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties
132+
echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties
133+
zookeeper-server-start zookeeper.properties &
134+
export KAFKA_ZOOKEEPER_CONNECT='localhost:2181'
135+
"""
136+
49137
def get_bootstrap_server(self) -> str:
50138
host = self.get_container_host_ip()
51139
port = self.get_exposed_port(self.port)
@@ -59,11 +147,7 @@ def tc_start(self) -> None:
59147
dedent(
60148
f"""
61149
#!/bin/bash
62-
echo 'clientPort=2181' > zookeeper.properties
63-
echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties
64-
echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties
65-
zookeeper-server-start zookeeper.properties &
66-
export KAFKA_ZOOKEEPER_CONNECT='localhost:2181'
150+
{self.boot_command}
67151
export KAFKA_ADVERTISED_LISTENERS={listeners}
68152
. /etc/confluent/docker/bash-config
69153
/etc/confluent/docker/configure
@@ -78,10 +162,11 @@ def tc_start(self) -> None:
78162
def start(self, timeout=30) -> "KafkaContainer":
79163
script = KafkaContainer.TC_START_SCRIPT
80164
command = f'sh -c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"'
165+
self.configure()
81166
self.with_command(command)
82167
super().start()
83168
self.tc_start()
84-
wait_for_logs(self, r".*\[KafkaServer id=\d+\] started.*", timeout=timeout)
169+
wait_for_logs(self, self.wait_for, timeout=timeout)
85170
return self
86171

87172
def create_file(self, content: bytes, path: str) -> None:

modules/kafka/tests/test_kafka.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ def test_kafka_producer_consumer():
88
produce_and_consume_kafka_message(container)
99

1010

11+
def test_kafka_with_kraft_producer_consumer():
12+
with KafkaContainer().with_kraft() as container:
13+
assert container.kraft_enabled
14+
produce_and_consume_kafka_message(container)
15+
16+
1117
def test_kafka_producer_consumer_custom_port():
1218
with KafkaContainer(port=9888) as container:
1319
assert container.port == 9888

0 commit comments

Comments
 (0)