25
25
DEFAULT_CONSUMER_CONFIG = {
26
26
'client_id' : __name__ ,
27
27
'group_id' : None ,
28
- 'metadata_broker_list ' : None ,
28
+ 'bootstrap_servers ' : [] ,
29
29
'socket_timeout_ms' : 30 * 1000 ,
30
30
'fetch_message_max_bytes' : 1024 * 1024 ,
31
31
'auto_offset_reset' : 'largest' ,
47
47
'rebalance_backoff_ms' : 2000 ,
48
48
}
49
49
50
+ DEPRECATED_CONFIG_KEYS = {
51
+ 'metadata_broker_list' : 'bootstrap_servers' ,
52
+ }
50
53
51
54
class KafkaConsumer (object ):
52
55
"""
@@ -56,7 +59,7 @@ class KafkaConsumer(object):
56
59
57
60
# A very basic 'tail' consumer, with no stored offset management
58
61
kafka = KafkaConsumer('topic1',
59
- metadata_broker_list =['localhost:9092'])
62
+ bootstrap_servers =['localhost:9092'])
60
63
for m in kafka:
61
64
print m
62
65
@@ -75,7 +78,7 @@ class KafkaConsumer(object):
75
78
# more advanced consumer -- multiple topics w/ auto commit offset
76
79
# management
77
80
kafka = KafkaConsumer('topic1', 'topic2',
78
- metadata_broker_list =['localhost:9092'],
81
+ bootstrap_servers =['localhost:9092'],
79
82
group_id='my_consumer_group',
80
83
auto_commit_enable=True,
81
84
auto_commit_interval_ms=30 * 1000,
@@ -120,7 +123,7 @@ class KafkaConsumer(object):
120
123
fetch_min_bytes=1,
121
124
fetch_wait_max_ms=100,
122
125
refresh_leader_backoff_ms=200,
123
- metadata_broker_list=None ,
126
+ bootstrap_servers=[] ,
124
127
socket_timeout_ms=30*1000,
125
128
auto_offset_reset='largest',
126
129
deserializer_class=lambda msg: msg,
@@ -149,7 +152,7 @@ def configure(self, **configs):
149
152
fetch_min_bytes=1,
150
153
fetch_wait_max_ms=100,
151
154
refresh_leader_backoff_ms=200,
152
- metadata_broker_list=None ,
155
+ bootstrap_servers=[] ,
153
156
socket_timeout_ms=30*1000,
154
157
auto_offset_reset='largest',
155
158
deserializer_class=lambda msg: msg,
@@ -161,6 +164,7 @@ def configure(self, **configs):
161
164
Configuration parameters are described in more detail at
162
165
http://kafka.apache.org/documentation.html#highlevelconsumerapi
163
166
"""
167
+ configs = self ._deprecate_configs (** configs )
164
168
self ._config = {}
165
169
for key in DEFAULT_CONSUMER_CONFIG :
166
170
self ._config [key ] = configs .pop (key , DEFAULT_CONSUMER_CONFIG [key ])
@@ -178,11 +182,11 @@ def configure(self, **configs):
178
182
logger .info ("Configuring consumer to auto-commit offsets" )
179
183
self ._reset_auto_commit ()
180
184
181
- if self ._config ['metadata_broker_list' ] is None :
182
- raise KafkaConfigurationError ('metadata_broker_list required to '
185
+ if not self ._config ['bootstrap_servers' ] :
186
+ raise KafkaConfigurationError ('bootstrap_servers required to '
183
187
'configure KafkaConsumer' )
184
188
185
- self ._client = KafkaClient (self ._config ['metadata_broker_list ' ],
189
+ self ._client = KafkaClient (self ._config ['bootstrap_servers ' ],
186
190
client_id = self ._config ['client_id' ],
187
191
timeout = (self ._config ['socket_timeout_ms' ] / 1000.0 ))
188
192
@@ -751,3 +755,17 @@ def __repr__(self):
751
755
return '<KafkaConsumer topics=(%s)>' % ', ' .join (["%s-%d" % topic_partition
752
756
for topic_partition in
753
757
self ._topics ])
758
+
759
+ #
760
+ # other private methods
761
+ #
762
+
763
+ def _deprecate_configs (self , ** configs ):
764
+ for old , new in six .iteritems (DEPRECATED_CONFIG_KEYS ):
765
+ if old in configs :
766
+ logger .warning ('Deprecated Kafka Consumer configuration: %s. '
767
+ 'Please use %s instead.' , old , new )
768
+ old_value = configs .pop (old )
769
+ if new not in configs :
770
+ configs [new ] = old_value
771
+ return configs
0 commit comments