From 885d5d765002fadf8d903cd163c75f574d3af9ec Mon Sep 17 00:00:00 2001 From: David Ankin Date: Thu, 4 Jul 2024 13:23:21 -0400 Subject: [PATCH] fix(kafka): add a flag to limit to first hostname for use with networks --- .../kafka/testcontainers/kafka/__init__.py | 21 ++++++++++++++++- modules/kafka/tests/test_kafka.py | 23 +++++++++++++++++-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/modules/kafka/testcontainers/kafka/__init__.py b/modules/kafka/testcontainers/kafka/__init__.py index ea837be37..ccd7f5b77 100644 --- a/modules/kafka/testcontainers/kafka/__init__.py +++ b/modules/kafka/testcontainers/kafka/__init__.py @@ -1,6 +1,8 @@ import tarfile import time +from dataclasses import dataclass, field from io import BytesIO +from os import environ from textwrap import dedent from typing_extensions import Self @@ -14,7 +16,21 @@ __all__ = [ "KafkaContainer", "RedpandaContainer", + "kafka_config", ] +LIMIT_BROKER_ENV_VAR = "TC_KAFKA_LIMIT_BROKER_TO_FIRST_HOST" + + +@dataclass +class _KafkaConfig: + limit_broker_to_first_host: bool = field(default_factory=lambda: environ.get(LIMIT_BROKER_ENV_VAR) == "true") + """ + This option is useful for a setup with a network, + see testcontainers/testcontainers-python#637 for more details + """ + + +kafka_config = _KafkaConfig() class KafkaContainer(DockerContainer): @@ -136,7 +152,10 @@ def get_bootstrap_server(self) -> str: def tc_start(self) -> None: host = self.get_container_host_ip() port = self.get_exposed_port(self.port) - listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i):9092" + if kafka_config.limit_broker_to_first_host: + listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i | cut -d' ' -f1):9092" + else: + listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i):9092" data = ( dedent( f""" diff --git a/modules/kafka/tests/test_kafka.py b/modules/kafka/tests/test_kafka.py index eb1a48127..901f3f0c3 100644 --- a/modules/kafka/tests/test_kafka.py +++ b/modules/kafka/tests/test_kafka.py @@ -1,6 +1,8 @@ -from kafka import KafkaConsumer, KafkaProducer, TopicPartition +import pytest +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer, TopicPartition -from testcontainers.kafka import KafkaContainer +from testcontainers.core.network import Network +from testcontainers.kafka import KafkaContainer, kafka_config def test_kafka_producer_consumer(): @@ -20,6 +22,23 @@ def test_kafka_producer_consumer_custom_port(): produce_and_consume_kafka_message(container) +def test_kafka_on_networks(monkeypatch: pytest.MonkeyPatch): + """ + this test case comes from testcontainers/testcontainers-python#637 + """ + monkeypatch.setattr(kafka_config, "limit_broker_to_first_host", True) + + with Network() as network: + kafka_ctr = KafkaContainer() + kafka_ctr.with_network(network) + kafka_ctr.with_network_aliases("kafka") + + with kafka_ctr: + print("started") # Will not reach here and timeout + admin_client = KafkaAdminClient(bootstrap_servers=[kafka_ctr.get_bootstrap_server()]) + print(admin_client.describe_cluster()) + + def produce_and_consume_kafka_message(container): topic = "test-topic" bootstrap_server = container.get_bootstrap_server()