@@ -117,34 +117,34 @@ def test_load_metadata(self, protocol, conn):
117
117
]
118
118
119
119
topics = [
120
- TopicMetadata ('topic_1' , NO_ERROR , [
121
- PartitionMetadata ('topic_1' , 0 , 1 , [1 , 2 ], [1 , 2 ], NO_ERROR )
120
+ TopicMetadata (b 'topic_1' , NO_ERROR , [
121
+ PartitionMetadata (b 'topic_1' , 0 , 1 , [1 , 2 ], [1 , 2 ], NO_ERROR )
122
122
]),
123
- TopicMetadata ('topic_noleader' , NO_ERROR , [
124
- PartitionMetadata ('topic_noleader' , 0 , - 1 , [], [],
123
+ TopicMetadata (b 'topic_noleader' , NO_ERROR , [
124
+ PartitionMetadata (b 'topic_noleader' , 0 , - 1 , [], [],
125
125
NO_LEADER ),
126
- PartitionMetadata ('topic_noleader' , 1 , - 1 , [], [],
126
+ PartitionMetadata (b 'topic_noleader' , 1 , - 1 , [], [],
127
127
NO_LEADER ),
128
128
]),
129
- TopicMetadata ('topic_no_partitions' , NO_LEADER , []),
130
- TopicMetadata ('topic_unknown' , UNKNOWN_TOPIC_OR_PARTITION , []),
131
- TopicMetadata ('topic_3' , NO_ERROR , [
132
- PartitionMetadata ('topic_3' , 0 , 0 , [0 , 1 ], [0 , 1 ], NO_ERROR ),
133
- PartitionMetadata ('topic_3' , 1 , 1 , [1 , 0 ], [1 , 0 ], NO_ERROR ),
134
- PartitionMetadata ('topic_3' , 2 , 0 , [0 , 1 ], [0 , 1 ], NO_ERROR )
129
+ TopicMetadata (b 'topic_no_partitions' , NO_LEADER , []),
130
+ TopicMetadata (b 'topic_unknown' , UNKNOWN_TOPIC_OR_PARTITION , []),
131
+ TopicMetadata (b 'topic_3' , NO_ERROR , [
132
+ PartitionMetadata (b 'topic_3' , 0 , 0 , [0 , 1 ], [0 , 1 ], NO_ERROR ),
133
+ PartitionMetadata (b 'topic_3' , 1 , 1 , [1 , 0 ], [1 , 0 ], NO_ERROR ),
134
+ PartitionMetadata (b 'topic_3' , 2 , 0 , [0 , 1 ], [0 , 1 ], NO_ERROR )
135
135
])
136
136
]
137
137
protocol .decode_metadata_response .return_value = MetadataResponse (brokers , topics )
138
138
139
139
# client loads metadata at init
140
140
client = KafkaClient (hosts = ['broker_1:4567' ])
141
141
self .assertDictEqual ({
142
- TopicAndPartition ('topic_1' , 0 ): brokers [1 ],
143
- TopicAndPartition ('topic_noleader' , 0 ): None ,
144
- TopicAndPartition ('topic_noleader' , 1 ): None ,
145
- TopicAndPartition ('topic_3' , 0 ): brokers [0 ],
146
- TopicAndPartition ('topic_3' , 1 ): brokers [1 ],
147
- TopicAndPartition ('topic_3' , 2 ): brokers [0 ]},
142
+ TopicAndPartition (b 'topic_1' , 0 ): brokers [1 ],
143
+ TopicAndPartition (b 'topic_noleader' , 0 ): None ,
144
+ TopicAndPartition (b 'topic_noleader' , 1 ): None ,
145
+ TopicAndPartition (b 'topic_3' , 0 ): brokers [0 ],
146
+ TopicAndPartition (b 'topic_3' , 1 ): brokers [1 ],
147
+ TopicAndPartition (b 'topic_3' , 2 ): brokers [0 ]},
148
148
client .topics_to_brokers )
149
149
150
150
# if we ask for metadata explicitly, it should raise errors
@@ -156,6 +156,7 @@ def test_load_metadata(self, protocol, conn):
156
156
157
157
# This should not raise
158
158
client .load_metadata_for_topics ('topic_no_leader' )
159
+ client .load_metadata_for_topics (b'topic_no_leader' )
159
160
160
161
@patch ('kafka.client.KafkaConnection' )
161
162
@patch ('kafka.client.KafkaProtocol' )
@@ -169,11 +170,11 @@ def test_has_metadata_for_topic(self, protocol, conn):
169
170
]
170
171
171
172
topics = [
172
- TopicMetadata ('topic_still_creating' , NO_LEADER , []),
173
- TopicMetadata ('topic_doesnt_exist' , UNKNOWN_TOPIC_OR_PARTITION , []),
174
- TopicMetadata ('topic_noleaders' , NO_ERROR , [
175
- PartitionMetadata ('topic_noleaders' , 0 , - 1 , [], [], NO_LEADER ),
176
- PartitionMetadata ('topic_noleaders' , 1 , - 1 , [], [], NO_LEADER ),
173
+ TopicMetadata (b 'topic_still_creating' , NO_LEADER , []),
174
+ TopicMetadata (b 'topic_doesnt_exist' , UNKNOWN_TOPIC_OR_PARTITION , []),
175
+ TopicMetadata (b 'topic_noleaders' , NO_ERROR , [
176
+ PartitionMetadata (b 'topic_noleaders' , 0 , - 1 , [], [], NO_LEADER ),
177
+ PartitionMetadata (b 'topic_noleaders' , 1 , - 1 , [], [], NO_LEADER ),
177
178
]),
178
179
]
179
180
protocol .decode_metadata_response .return_value = MetadataResponse (brokers , topics )
@@ -188,8 +189,8 @@ def test_has_metadata_for_topic(self, protocol, conn):
188
189
self .assertTrue (client .has_metadata_for_topic ('topic_noleaders' ))
189
190
190
191
@patch ('kafka.client.KafkaConnection' )
191
- @patch ('kafka.client.KafkaProtocol' )
192
- def test_ensure_topic_exists (self , protocol , conn ):
192
+ @patch ('kafka.client.KafkaProtocol.decode_metadata_response ' )
193
+ def test_ensure_topic_exists (self , decode_metadata_response , conn ):
193
194
194
195
conn .recv .return_value = 'response' # anything but None
195
196
@@ -199,14 +200,14 @@ def test_ensure_topic_exists(self, protocol, conn):
199
200
]
200
201
201
202
topics = [
202
- TopicMetadata ('topic_still_creating' , NO_LEADER , []),
203
- TopicMetadata ('topic_doesnt_exist' , UNKNOWN_TOPIC_OR_PARTITION , []),
204
- TopicMetadata ('topic_noleaders' , NO_ERROR , [
205
- PartitionMetadata ('topic_noleaders' , 0 , - 1 , [], [], NO_LEADER ),
206
- PartitionMetadata ('topic_noleaders' , 1 , - 1 , [], [], NO_LEADER ),
203
+ TopicMetadata (b 'topic_still_creating' , NO_LEADER , []),
204
+ TopicMetadata (b 'topic_doesnt_exist' , UNKNOWN_TOPIC_OR_PARTITION , []),
205
+ TopicMetadata (b 'topic_noleaders' , NO_ERROR , [
206
+ PartitionMetadata (b 'topic_noleaders' , 0 , - 1 , [], [], NO_LEADER ),
207
+ PartitionMetadata (b 'topic_noleaders' , 1 , - 1 , [], [], NO_LEADER ),
207
208
]),
208
209
]
209
- protocol . decode_metadata_response .return_value = MetadataResponse (brokers , topics )
210
+ decode_metadata_response .return_value = MetadataResponse (brokers , topics )
210
211
211
212
client = KafkaClient (hosts = ['broker_1:4567' ])
212
213
@@ -218,6 +219,7 @@ def test_ensure_topic_exists(self, protocol, conn):
218
219
219
220
# This should not raise
220
221
client .ensure_topic_exists ('topic_noleaders' , timeout = 1 )
222
+ client .ensure_topic_exists (b'topic_noleaders' , timeout = 1 )
221
223
222
224
@patch ('kafka.client.KafkaConnection' )
223
225
@patch ('kafka.client.KafkaProtocol' )
@@ -269,8 +271,8 @@ def test_get_leader_for_unassigned_partitions(self, protocol, conn):
269
271
]
270
272
271
273
topics = [
272
- TopicMetadata ('topic_no_partitions' , NO_LEADER , []),
273
- TopicMetadata ('topic_unknown' , UNKNOWN_TOPIC_OR_PARTITION , []),
274
+ TopicMetadata (b 'topic_no_partitions' , NO_LEADER , []),
275
+ TopicMetadata (b 'topic_unknown' , UNKNOWN_TOPIC_OR_PARTITION , []),
274
276
]
275
277
protocol .decode_metadata_response .return_value = MetadataResponse (brokers , topics )
276
278
@@ -279,10 +281,10 @@ def test_get_leader_for_unassigned_partitions(self, protocol, conn):
279
281
self .assertDictEqual ({}, client .topics_to_brokers )
280
282
281
283
with self .assertRaises (LeaderNotAvailableError ):
282
- client ._get_leader_for_partition ('topic_no_partitions' , 0 )
284
+ client ._get_leader_for_partition (b 'topic_no_partitions' , 0 )
283
285
284
286
with self .assertRaises (UnknownTopicOrPartitionError ):
285
- client ._get_leader_for_partition ('topic_unknown' , 0 )
287
+ client ._get_leader_for_partition (b 'topic_unknown' , 0 )
286
288
287
289
@patch ('kafka.client.KafkaConnection' )
288
290
@patch ('kafka.client.KafkaProtocol' )
0 commit comments