3
3
from io import BytesIO
4
4
from textwrap import dedent
5
5
6
- from kafka import KafkaConsumer
7
- from kafka .errors import KafkaError , NoBrokersAvailable , UnrecognizedBrokerVersion
8
6
from testcontainers .core .container import DockerContainer
9
7
from testcontainers .core .utils import raise_for_deprecated_parameter
10
- from testcontainers .core .waiting_utils import wait_container_is_ready
8
+ from testcontainers .core .waiting_utils import wait_for_logs
11
9
12
10
13
11
class KafkaContainer (DockerContainer ):
@@ -47,13 +45,6 @@ def get_bootstrap_server(self) -> str:
47
45
port = self .get_exposed_port (self .port )
48
46
return f"{ host } :{ port } "
49
47
50
- @wait_container_is_ready (UnrecognizedBrokerVersion , NoBrokersAvailable , KafkaError , ValueError )
51
- def _connect (self ) -> None :
52
- bootstrap_server = self .get_bootstrap_server ()
53
- consumer = KafkaConsumer (group_id = "test" , bootstrap_servers = [bootstrap_server ])
54
- if not consumer .bootstrap_connected ():
55
- raise KafkaError ("Unable to connect with kafka container!" )
56
-
57
48
def tc_start (self ) -> None :
58
49
host = self .get_container_host_ip ()
59
50
port = self .get_exposed_port (self .port )
@@ -78,13 +69,13 @@ def tc_start(self) -> None:
78
69
)
79
70
self .create_file (data , KafkaContainer .TC_START_SCRIPT )
80
71
81
- def start (self ) -> "KafkaContainer" :
72
+ def start (self , timeout = 30 ) -> "KafkaContainer" :
82
73
script = KafkaContainer .TC_START_SCRIPT
83
74
command = f'sh -c "while [ ! -f { script } ]; do sleep 0.1; done; sh { script } "'
84
75
self .with_command (command )
85
76
super ().start ()
86
77
self .tc_start ()
87
- self . _connect ( )
78
+ wait_for_logs ( self , r".*\[KafkaServer id=\d+\] started.*" , timeout = timeout )
88
79
return self
89
80
90
81
def create_file (self , content : bytes , path : str ) -> None :
0 commit comments