Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 33 additions & 29 deletions kafka/consumer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_t=auto_commit_every_t)

if max_buffer_size is not None and buffer_size > max_buffer_size:
raise ValueError("buffer_size (%d) is greater than "
"max_buffer_size (%d)" %
raise ValueError('buffer_size (%d) is greater than '
'max_buffer_size (%d)' %
(buffer_size, max_buffer_size))
self.buffer_size = buffer_size
self.max_buffer_size = max_buffer_size
Expand Down Expand Up @@ -227,7 +227,7 @@ def seek(self, offset, whence):
self.offsets[resp.partition] = \
resp.offsets[0] + deltas[resp.partition]
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
raise ValueError('Unexpected value for `whence`, %d' % whence)

# Reset queue and fetch offsets since they are invalid
self.fetch_offsets = self.offsets.copy()
Expand All @@ -250,35 +250,32 @@ def get_messages(self, count=1, block=True, timeout=0.1):
"""
messages = []
if timeout is not None:
max_time = time.time() + timeout
timeout += time.time()

new_offsets = {}
while count > 0 and (timeout is None or timeout > 0):
result = self._get_message(block, timeout, get_partition_info=True,
log.debug('getting %d messages', count)
while len(messages) < count:
block_time = timeout - time.time()
log.debug('calling _get_message block=%s timeout=%s', block, block_time)
result = self._get_message(block, block_time,
get_partition_info=True,
update_offset=False)
if result:
partition, message = result
if self.partition_info:
messages.append(result)
else:
messages.append(message)
new_offsets[partition] = message.offset + 1
count -= 1
else:
# Ran out of messages for the last request.
if not block:
# If we're not blocking, break.
break
log.debug('got %s from _get_messages', result)
if not result:
if block and (timeout is None or time.time() <= timeout):
continue
break

# If we have a timeout, reduce it to the
# appropriate value
if timeout is not None:
timeout = max_time - time.time()
partition, message = result
_msg = (partition, message) if self.partition_info else message
messages.append(_msg)
new_offsets[partition] = message.offset + 1

# Update and commit offsets if necessary
self.offsets.update(new_offsets)
self.count_since_commit += len(messages)
self._auto_commit()
log.debug('got %d messages: %s', len(messages), messages)
return messages

def get_message(self, block=True, timeout=0.1, get_partition_info=None):
Expand All @@ -292,10 +289,16 @@ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
If get_partition_info is True, returns (partition, message)
If get_partition_info is False, returns message
"""
if self.queue.empty():
start_at = time.time()
while self.queue.empty():
# We're out of messages, go grab some more.
log.debug('internal queue empty, fetching more messages')
with FetchContext(self, block, timeout):
self._fetch()

if not block or time.time() > (start_at + timeout):
break

try:
partition, message = self.queue.get_nowait()

Expand All @@ -314,6 +317,7 @@ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
else:
return message
except Empty:
log.debug('internal queue empty after fetch - returning None')
return None

def __iter__(self):
Expand Down Expand Up @@ -396,20 +400,20 @@ def _fetch(self):
except ConsumerFetchSizeTooSmall:
if (self.max_buffer_size is not None and
buffer_size == self.max_buffer_size):
log.error("Max fetch size %d too small",
log.error('Max fetch size %d too small',
self.max_buffer_size)
raise
if self.max_buffer_size is None:
buffer_size *= 2
else:
buffer_size = min(buffer_size * 2,
self.max_buffer_size)
log.warn("Fetch size too small, increase to %d (2x) "
"and retry", buffer_size)
log.warning('Fetch size too small, increase to %d (2x) '
'and retry', buffer_size)
retry_partitions[partition] = buffer_size
except ConsumerNoMoreData as e:
log.debug("Iteration was ended by %r", e)
log.debug('Iteration was ended by %r', e)
except StopIteration:
# Stop iterating through this partition
log.debug("Done iterating over partition %s" % partition)
log.debug('Done iterating over partition %s', partition)
partitions = retry_partitions