-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Closed
Description
When using KafkaConsumer there is no way to get a callback when auto commit offset is enabled.
The existing code shows that _maybe_auto_commit_offset_async set's it's own callback that dead-ends and only logs information.
def _commit_offsets_async_on_complete(self, offsets, exception):
if exception is not None:
log.warning("Auto offset commit failed for group %s: %s",
self.group_id, exception)
if getattr(exception, 'retriable', False):
self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline)
else:
log.debug("Completed autocommit of offsets %s for group %s",
offsets, self.group_id)
def _maybe_auto_commit_offsets_async(self):
if self.config['enable_auto_commit']:
if self.coordinator_unknown():
self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000
elif time.time() > self.next_auto_commit_deadline:
self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
self.commit_offsets_async(self._subscription.all_consumed_offsets(),
self._commit_offsets_async_on_complete)
ilons, Kim-Hyung-Jin and bar-shiratech
Metadata
Metadata
Assignees
Labels
No labels