3
3
from io import BytesIO
4
4
from textwrap import dedent
5
5
6
- from kafka import KafkaConsumer
7
- from kafka .errors import KafkaError , UnrecognizedBrokerVersion , NoBrokersAvailable
8
-
9
6
from testcontainers .core .container import DockerContainer
10
7
from testcontainers .core .utils import raise_for_deprecated_parameter
11
- from testcontainers .core .waiting_utils import wait_container_is_ready
8
+ from testcontainers .core .waiting_utils import wait_for_logs
12
9
13
10
14
11
class KafkaContainer (DockerContainer ):
@@ -49,13 +46,6 @@ def get_bootstrap_server(self) -> str:
49
46
port = self .get_exposed_port (self .port )
50
47
return f'{ host } :{ port } '
51
48
52
- @wait_container_is_ready (UnrecognizedBrokerVersion , NoBrokersAvailable , KafkaError , ValueError )
53
- def _connect (self ) -> None :
54
- bootstrap_server = self .get_bootstrap_server ()
55
- consumer = KafkaConsumer (group_id = 'test' , bootstrap_servers = [bootstrap_server ])
56
- if not consumer .bootstrap_connected ():
57
- raise KafkaError ("Unable to connect with kafka container!" )
58
-
59
49
def tc_start (self ) -> None :
60
50
host = self .get_container_host_ip ()
61
51
port = self .get_exposed_port (self .port )
@@ -80,13 +70,13 @@ def tc_start(self) -> None:
80
70
)
81
71
self .create_file (data , KafkaContainer .TC_START_SCRIPT )
82
72
83
- def start (self ) -> "KafkaContainer" :
73
+ def start (self , timeout = 30 ) -> "KafkaContainer" :
84
74
script = KafkaContainer .TC_START_SCRIPT
85
75
command = f'sh -c "while [ ! -f { script } ]; do sleep 0.1; done; sh { script } "'
86
76
self .with_command (command )
87
77
super ().start ()
88
78
self .tc_start ()
89
- self . _connect ( )
79
+ wait_for_logs ( self , r'.*\[KafkaServer id=\d+\] started.*' , timeout = timeout )
90
80
return self
91
81
92
82
def create_file (self , content : bytes , path : str ) -> None :
0 commit comments