Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion modules/kafka/testcontainers/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import tarfile
import time
from dataclasses import dataclass, field
from io import BytesIO
from os import environ
from textwrap import dedent

from typing_extensions import Self
Expand All @@ -14,7 +16,21 @@
__all__ = [
"KafkaContainer",
"RedpandaContainer",
"kafka_config",
]
LIMIT_BROKER_ENV_VAR = "TC_KAFKA_LIMIT_BROKER_TO_FIRST_HOST"


@dataclass
class _KafkaConfig:
limit_broker_to_first_host: bool = field(default_factory=lambda: environ.get(LIMIT_BROKER_ENV_VAR) == "true")
"""
This option is useful for a setup with a network,
see testcontainers/testcontainers-python#637 for more details
"""


kafka_config = _KafkaConfig()


class KafkaContainer(DockerContainer):
Expand Down Expand Up @@ -136,7 +152,10 @@ def get_bootstrap_server(self) -> str:
def tc_start(self) -> None:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.port)
listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i):9092"
if kafka_config.limit_broker_to_first_host:
listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i | cut -d' ' -f1):9092"
else:
listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i):9092"
data = (
dedent(
f"""
Expand Down
23 changes: 21 additions & 2 deletions modules/kafka/tests/test_kafka.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
import pytest
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer, TopicPartition

from testcontainers.kafka import KafkaContainer
from testcontainers.core.network import Network
from testcontainers.kafka import KafkaContainer, kafka_config


def test_kafka_producer_consumer():
Expand All @@ -20,6 +22,23 @@ def test_kafka_producer_consumer_custom_port():
produce_and_consume_kafka_message(container)


def test_kafka_on_networks(monkeypatch: pytest.MonkeyPatch):
"""
this test case comes from testcontainers/testcontainers-python#637
"""
monkeypatch.setattr(kafka_config, "limit_broker_to_first_host", True)

with Network() as network:
kafka_ctr = KafkaContainer()
kafka_ctr.with_network(network)
kafka_ctr.with_network_aliases("kafka")

with kafka_ctr:
print("started") # Will not reach here and timeout
admin_client = KafkaAdminClient(bootstrap_servers=[kafka_ctr.get_bootstrap_server()])
print(admin_client.describe_cluster())


def produce_and_consume_kafka_message(container):
topic = "test-topic"
bootstrap_server = container.get_bootstrap_server()
Expand Down