diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 852157811..a77beaf6e 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -5,6 +5,7 @@ import logging import threading import time +import warnings import weakref from kafka.vendor import six @@ -792,7 +793,7 @@ def coordinator_dead(self, error): self.coordinator_id, self.group_id, error) self.coordinator_id = None - def generation(self): + def generation_if_stable(self): """Get the current generation state if the group is stable. Returns: the current generation or None if the group is unjoined/rebalancing @@ -802,6 +803,15 @@ def generation(self): return None return self._generation + # deprecated + def generation(self): + warnings.warn("Function coordinator.generation() has been renamed to generation_if_stable()", + DeprecationWarning, stacklevel=2) + return self.generation_if_stable() + + def rebalance_in_progress(self): + return self.state is MemberState.REBALANCING + def reset_generation(self, member_id=UNKNOWN_MEMBER_ID): """Reset the generation and member_id because we have fallen out of the group.""" with self._lock: diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 3db00d72c..ddd413b82 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -608,6 +608,11 @@ def _send_offset_commit_request(self, offsets): if node_id is None: return Future().failure(Errors.CoordinatorNotAvailableError) + # Verify node is ready + if not self._client.ready(node_id, metadata_priority=False): + log.debug("Node %s not ready -- failing offset commit request", + node_id) + return Future().failure(Errors.NodeNotReadyError) # create the offset commit request offset_data = collections.defaultdict(dict) @@ -616,7 +621,7 @@ def _send_offset_commit_request(self, offsets): version = self._client.api_version(OffsetCommitRequest, max_version=6) if version > 1 and self._subscription.partitions_auto_assigned(): - generation = self.generation() + generation = self.generation_if_stable() else: generation = Generation.NO_GENERATION @@ -625,7 +630,18 @@ def _send_offset_commit_request(self, offsets): # and let the user rejoin the group in poll() if generation is None: log.info("Failing OffsetCommit request since the consumer is not part of an active group") - return Future().failure(Errors.CommitFailedError('Group rebalance in progress')) + if self.rebalance_in_progress(): + # if the client knows it is already rebalancing, we can use RebalanceInProgressError instead of + # CommitFailedError to indicate this is not a fatal error + return Future().failure(Errors.RebalanceInProgressError( + "Offset commit cannot be completed since the" + " consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance" + " by calling poll() and then retry the operation.")) + else: + return Future().failure(Errors.CommitFailedError( + "Offset commit cannot be completed since the" + " consumer is not part of an active group for auto partition assignment; it is likely that the consumer" + " was kicked out of the group.")) if version == 0: request = OffsetCommitRequest[version]( @@ -756,7 +772,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): # However, we do not need to reset generations and just request re-join, such that # if the caller decides to proceed and poll, it would still try to proceed and re-join normally. self.request_rejoin() - future.failure(Errors.CommitFailedError('Group rebalance in progress')) + future.failure(Errors.CommitFailedError(error_type())) return elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): @@ -765,7 +781,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): log.warning("OffsetCommit for group %s failed: %s", self.group_id, error) self.reset_generation() - future.failure(Errors.CommitFailedError()) + future.failure(Errors.CommitFailedError(error_type())) return else: log.error("Group %s failed to commit partition %s at offset" @@ -804,7 +820,7 @@ def _send_offset_fetch_request(self, partitions): return Future().failure(Errors.CoordinatorNotAvailableError) # Verify node is ready - if not self._client.ready(node_id): + if not self._client.ready(node_id, metadata_priority=False): log.debug("Node %s not ready -- failing offset fetch request", node_id) return Future().failure(Errors.NodeNotReadyError) diff --git a/kafka/errors.py b/kafka/errors.py index 898582615..ac4eadfec 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -24,14 +24,7 @@ class CommitFailedError(KafkaError): def __init__(self, *args): if not args: args = ("Commit cannot be completed since the group has already" - " rebalanced and assigned the partitions to another member." - " This means that the time between subsequent calls to poll()" - " was longer than the configured max_poll_interval_ms, which" - " typically implies that the poll loop is spending too much" - " time message processing. You can address this either by" - " increasing the rebalance timeout with max_poll_interval_ms," - " or by reducing the maximum size of batches returned in poll()" - " with max_poll_records.",) + " rebalanced and assigned the partitions to another member.",) super(CommitFailedError, self).__init__(*args)