Skip to content

Kafka consumer failed payloads #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 10, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 61 additions & 37 deletions kafka/consumer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,26 @@ def configure(self, **configs):

if self._config['auto_commit_enable']:
if not self._config['group_id']:
raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
raise KafkaConfigurationError(
'KafkaConsumer configured to auto-commit '
'without required consumer group (group_id)'
)

# Check auto-commit configuration
if self._config['auto_commit_enable']:
logger.info("Configuring consumer to auto-commit offsets")
self._reset_auto_commit()

if not self._config['bootstrap_servers']:
raise KafkaConfigurationError('bootstrap_servers required to '
'configure KafkaConsumer')

self._client = KafkaClient(self._config['bootstrap_servers'],
client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0))
raise KafkaConfigurationError(
'bootstrap_servers required to configure KafkaConsumer'
)

self._client = KafkaClient(
self._config['bootstrap_servers'],
client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0)
)

def set_topic_partitions(self, *topics):
"""
Expand Down Expand Up @@ -163,12 +169,12 @@ def set_topic_partitions(self, *topics):
# Consume topic1-all; topic2-partition2; topic3-partition0
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})

# Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
# Consume topic1-0 starting at offset 12, and topic2-1 at offset 45
# using tuples --
kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45))

# using dict --
kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 })

"""
self._topics = []
Expand Down Expand Up @@ -216,8 +222,10 @@ def set_topic_partitions(self, *topics):
for partition in value:
self._consume_topic_partition(topic, partition)
else:
raise KafkaConfigurationError('Unknown topic type (dict key must be '
'int or list/tuple of ints)')
raise KafkaConfigurationError(
'Unknown topic type '
'(dict key must be int or list/tuple of ints)'
)

# (topic, partition): offset
elif isinstance(key, tuple):
Expand Down Expand Up @@ -316,26 +324,30 @@ def fetch_messages(self):
raise KafkaConfigurationError('No topics or partitions configured')

if not self._offsets.fetch:
raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')
raise KafkaConfigurationError(
'No fetch offsets found when calling fetch_messages'
)

fetches = [FetchRequest(topic, partition,
self._offsets.fetch[(topic, partition)],
max_bytes)
for (topic, partition) in self._topics]

# client.send_fetch_request will collect topic/partition requests by leader
# and send each group as a single FetchRequest to the correct broker
try:
responses = self._client.send_fetch_request(fetches,
max_wait_time=max_wait_time,
min_bytes=min_bytes,
fail_on_error=False)
except FailedPayloadsError:
logger.warning('FailedPayloadsError attempting to fetch data from kafka')
self._refresh_metadata_on_error()
return
# send_fetch_request will batch topic/partition requests by leader
responses = self._client.send_fetch_request(
fetches,
max_wait_time=max_wait_time,
min_bytes=min_bytes,
fail_on_error=False
)

for resp in responses:

if isinstance(resp, FailedPayloadsError):
logger.warning('FailedPayloadsError attempting to fetch data')
self._refresh_metadata_on_error()
continue

topic = kafka_bytestring(resp.topic)
partition = resp.partition
try:
Expand Down Expand Up @@ -381,7 +393,8 @@ def fetch_messages(self):
logger.debug('message offset less than fetched offset '
'skipping: %s', msg)
continue
# Only increment fetch offset if we safely got the message and deserialized
# Only increment fetch offset
# if we safely got the message and deserialized
self._offsets.fetch[(topic, partition)] = offset + 1

# Then yield to user
Expand All @@ -394,10 +407,12 @@ def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offse
topic (str): topic for offset request
partition (int): partition for offset request
request_time_ms (int): Used to ask for all messages before a
certain time (ms). There are two special values. Specify -1 to receive the latest
offset (i.e. the offset of the next coming message) and -2 to receive the earliest
available offset. Note that because offsets are pulled in descending order, asking for
the earliest offset will always return you a single element.
certain time (ms). There are two special values.
Specify -1 to receive the latest offset (i.e. the offset of the
next coming message) and -2 to receive the earliest available
offset. Note that because offsets are pulled in descending
order, asking for the earliest offset will always return you a
single element.
max_num_offsets (int): Maximum offsets to include in the OffsetResponse

Returns:
Expand Down Expand Up @@ -497,7 +512,10 @@ def commit(self):
"""
if not self._config['group_id']:
logger.warning('Cannot commit without a group_id!')
raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)')
raise KafkaConfigurationError(
'Attempted to commit offsets '
'without a configured consumer group (group_id)'
)

# API supports storing metadata with each commit
# but for now it is unused
Expand All @@ -521,13 +539,17 @@ def commit(self):
if commit_offset == self._offsets.commit[topic_partition]:
continue

commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata))
commits.append(
OffsetCommitRequest(topic_partition[0], topic_partition[1],
commit_offset, metadata)
)

if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']),
commits,
fail_on_error=False)
resps = self._client.send_offset_commit_request(
kafka_bytestring(self._config['group_id']), commits,
fail_on_error=False
)

for r in resps:
check_error(r)
Expand Down Expand Up @@ -724,9 +746,11 @@ def _reset_message_iterator(self):
#

def __repr__(self):
return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
for topic_partition in
self._topics])
return '<{0} topics=({1})>'.format(
self.__class__.__name__,
'|'.join(["%s-%d" % topic_partition
for topic_partition in self._topics])
)

#
# other private methods
Expand Down