@@ -267,15 +267,11 @@ def reinit(self):
267
267
268
268
def reset_topic_metadata (self , * topics ):
269
269
for topic in topics :
270
- try :
271
- partitions = self .topic_partitions [topic ]
272
- except KeyError :
273
- continue
274
-
275
- for partition in partitions :
276
- self .topics_to_brokers .pop (TopicAndPartition (topic , partition ), None )
277
-
278
- del self .topic_partitions [topic ]
270
+ for topic_partition in list (self .topics_to_brokers .keys ()):
271
+ if topic_partition .topic == topic :
272
+ del self .topics_to_brokers [topic_partition ]
273
+ if topic in self .topic_partitions :
274
+ del self .topic_partitions [topic ]
279
275
280
276
def reset_all_metadata (self ):
281
277
self .topics_to_brokers .clear ()
@@ -339,10 +335,17 @@ def load_metadata_for_topics(self, *topics):
339
335
(a single partition w/o a leader, for example)
340
336
"""
341
337
topics = [kafka_bytestring (t ) for t in topics ]
338
+
339
+ if topics :
340
+ for topic in topics :
341
+ self .reset_topic_metadata (topic )
342
+ else :
343
+ self .reset_all_metadata ()
344
+
342
345
resp = self .send_metadata_request (topics )
343
346
344
- log .debug ("Broker metadata: %s" , resp .brokers )
345
- log .debug ("Topic metadata: %s" , resp .topics )
347
+ log .debug ("Received new broker metadata: %s" , resp .brokers )
348
+ log .debug ("Received new topic metadata: %s" , resp .topics )
346
349
347
350
self .brokers = dict ([(broker .nodeId , broker )
348
351
for broker in resp .brokers ])
@@ -351,8 +354,6 @@ def load_metadata_for_topics(self, *topics):
351
354
topic = topic_metadata .topic
352
355
partitions = topic_metadata .partitions
353
356
354
- self .reset_topic_metadata (topic )
355
-
356
357
# Errors expected for new topics
357
358
try :
358
359
kafka .common .check_error (topic_metadata )
0 commit comments