1
- import tarfile
2
- import time
3
- from io import BytesIO
4
- from textwrap import dedent
5
-
6
1
from kafka import KafkaConsumer
7
2
from kafka .errors import KafkaError , UnrecognizedBrokerVersion , NoBrokersAvailable
8
3
12
7
13
8
class KafkaContainer (DockerContainer ):
14
9
KAFKA_PORT = 9093
15
- TC_START_SCRIPT = '/tc-start.sh'
16
10
17
11
def __init__ (self , image = "confluentinc/cp-kafka:5.4.3" , port_to_expose = KAFKA_PORT ):
18
12
super (KafkaContainer , self ).__init__ (image )
19
13
self .port_to_expose = port_to_expose
20
14
self .with_exposed_ports (self .port_to_expose )
21
- listeners = 'PLAINTEXT://0.0.0.0:{},BROKER://0.0.0.0:9092' .format (port_to_expose )
22
- self .with_env ('KAFKA_LISTENERS' , listeners )
23
- self .with_env ('KAFKA_LISTENER_SECURITY_PROTOCOL_MAP' ,
24
- 'BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT' )
25
- self .with_env ('KAFKA_INTER_BROKER_LISTENER_NAME' , 'BROKER' )
26
15
27
- self .with_env ('KAFKA_BROKER_ID' , '1' )
28
- self .with_env ('KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR' , '1' )
29
- self .with_env ('KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS' , '1' )
30
- self .with_env ('KAFKA_LOG_FLUSH_INTERVAL_MESSAGES' , '10000000' )
31
- self .with_env ('KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS' , '0' )
16
+ env = {
17
+ 'KAFKA_LISTENERS' : f'PLAINTEXT://0.0.0.0:{ port_to_expose } ,BROKER://0.0.0.0:9092' ,
18
+ 'KAFKA_LISTENER_SECURITY_PROTOCOL_MAP' : 'BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT' ,
19
+ 'KAFKA_INTER_BROKER_LISTENER_NAME' : 'BROKER' ,
20
+ 'KAFKA_BROKER_ID' : '1' ,
21
+ 'KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR' : '1' ,
22
+ 'KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS' : '1' ,
23
+ 'KAFKA_LOG_FLUSH_INTERVAL_MESSAGES' : '10000000' ,
24
+ 'KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS' : '0' ,
25
+ 'KAFKA_ZOOKEEPER_CONNECT' : 'localhost:2181' ,
26
+ }
27
+ for key , value in env .items ():
28
+ self .with_env (key , value )
29
+
30
+ # Start zookeeper first because it doesn't need any port mapping information.
31
+ self .with_command ("zookeeper-server-start /etc/kafka/zookeeper.properties" )
32
32
33
33
def get_bootstrap_server (self ):
34
34
host = self .get_container_host_ip ()
@@ -42,43 +42,20 @@ def _connect(self):
42
42
if not consumer .topics ():
43
43
raise KafkaError ("Unable to connect with kafka container!" )
44
44
45
- def tc_start (self ):
46
- port = self .get_exposed_port (self .port_to_expose )
47
- listeners = 'PLAINTEXT://localhost:{},BROKER://$(hostname -i):9092' .format (port )
48
- data = (
49
- dedent (
50
- """
51
- #!/bin/bash
52
- echo 'clientPort=2181' > zookeeper.properties
53
- echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties
54
- echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties
55
- zookeeper-server-start zookeeper.properties &
56
- export KAFKA_ZOOKEEPER_CONNECT='localhost:2181'
57
- export KAFKA_ADVERTISED_LISTENERS={}
58
- . /etc/confluent/docker/bash-config
59
- /etc/confluent/docker/configure
60
- /etc/confluent/docker/launch
61
- """ .format (listeners )
62
- )
63
- .strip ()
64
- .encode ('utf-8' )
65
- )
66
- self .create_file (data , KafkaContainer .TC_START_SCRIPT )
67
-
68
45
def start (self ):
69
- script = KafkaContainer .TC_START_SCRIPT
70
- command = f'bash -c "while [ ! -f { script } ]; do sleep 0.1; done; bash { script } "'
71
- self .with_command (command )
72
46
super ().start ()
73
- self .tc_start ()
47
+ # Set the environment variables for which we need to know the port mappings and configure
48
+ # kafka.
49
+ exposed_port = self .get_exposed_port (self .port_to_expose )
50
+ host = self .get_container_host_ip ()
51
+ advertised_listeners = f'PLAINTEXT://localhost:{ exposed_port } ,BROKER://{ host } :9092'
52
+ code , output = self ._container .exec_run (
53
+ f'sh -c \' KAFKA_ADVERTISED_LISTENERS="{ advertised_listeners } " '
54
+ '/etc/confluent/docker/configure\' '
55
+ )
56
+ assert code == 0 , output
57
+
58
+ # Start Kafka.
59
+ self ._container .exec_run ('/etc/confluent/docker/launch' , detach = True )
74
60
self ._connect ()
75
61
return self
76
-
77
- def create_file (self , content : bytes , path : str ):
78
- with BytesIO () as archive , tarfile .TarFile (fileobj = archive , mode = "w" ) as tar :
79
- tarinfo = tarfile .TarInfo (name = path )
80
- tarinfo .size = len (content )
81
- tarinfo .mtime = time .time ()
82
- tar .addfile (tarinfo , BytesIO (content ))
83
- archive .seek (0 )
84
- self .get_wrapped_container ().put_archive ("/" , archive )
0 commit comments