diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index ebc6f6ba5..2600d7f69 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import collections import copy @@ -246,7 +246,7 @@ def _reset_offset(self, partition): else: log.debug("Could not find offset for partition %s since it is probably deleted" % (partition,)) - def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): + def _retrieve_offsets(self, timestamps, timeout_ms=None): """Fetch offset for each partition passed in ``timestamps`` map. Blocks until offsets are obtained, a non-retriable exception is raised @@ -266,29 +266,38 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): if not timestamps: return {} - start_time = time.time() - remaining_ms = timeout_ms + elapsed = 0.0 # noqa: F841 + begin = time.time() + def inner_timeout_ms(fallback=None): + if timeout_ms is None: + return fallback + elapsed = (time.time() - begin) * 1000 + if elapsed >= timeout_ms: + raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator') + ret = max(0, timeout_ms - elapsed) + if fallback is not None: + return min(ret, fallback) + return ret + timestamps = copy.copy(timestamps) - while remaining_ms > 0: + while True: if not timestamps: return {} future = self._send_list_offsets_requests(timestamps) - self._client.poll(future=future, timeout_ms=remaining_ms) + self._client.poll(future=future, timeout_ms=inner_timeout_ms()) if future.succeeded(): return future.value if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - elapsed_ms = (time.time() - start_time) * 1000 - remaining_ms = timeout_ms - elapsed_ms - if remaining_ms < 0: - break - if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() - self._client.poll(future=refresh_future, timeout_ms=remaining_ms) + self._client.poll(future=refresh_future, timeout_ms=inner_timeout_ms()) + + if not future.is_done: + break # Issue #1780 # Recheck partition existence after after a successful metadata refresh @@ -299,10 +308,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): log.debug("Removed partition %s from offsets retrieval" % (unknown_partition, )) timestamps.pop(unknown_partition) else: - time.sleep(self.config['retry_backoff_ms'] / 1000.0) - - elapsed_ms = (time.time() - start_time) * 1000 - remaining_ms = timeout_ms - elapsed_ms + time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000) raise Errors.KafkaTimeoutError( "Failed to get offsets by timestamps in %s ms" % (timeout_ms,)) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 0d4aedb88..fa67b4ddd 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -245,13 +245,16 @@ def ensure_coordinator_ready(self, timeout_ms=None): """ elapsed = 0.0 # noqa: F841 begin = time.time() - def inner_timeout_ms(): + def inner_timeout_ms(fallback=None): if timeout_ms is None: - return None + return fallback elapsed = (time.time() - begin) * 1000 if elapsed >= timeout_ms: raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator') - return max(0, timeout_ms - elapsed) + ret = max(0, timeout_ms - elapsed) + if fallback is not None: + return min(ret, fallback) + return ret with self._client._lock, self._lock: while self.coordinator_unknown(): @@ -275,7 +278,7 @@ def inner_timeout_ms(): metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update, timeout_ms=inner_timeout_ms()) else: - time.sleep(min(inner_timeout_ms(), self.config['retry_backoff_ms']) / 1000) + time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000) else: raise future.exception # pylint: disable-msg=raising-bad-type @@ -369,13 +372,16 @@ def ensure_active_group(self, timeout_ms=None): elapsed = 0.0 # noqa: F841 begin = time.time() - def inner_timeout_ms(): + def inner_timeout_ms(fallback=None): if timeout_ms is None: - return None + return fallback elapsed = (time.time() - begin) * 1000 if elapsed >= timeout_ms: - raise Errors.KafkaTimeoutError() - return max(0, timeout_ms - elapsed) + raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator') + ret = max(0, timeout_ms - elapsed) + if fallback is not None: + return min(ret, fallback) + return ret while self.need_rejoin() or self._rejoin_incomplete(): self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms()) @@ -399,7 +405,7 @@ def inner_timeout_ms(): while not self.coordinator_unknown(): if not self._client.in_flight_request_count(self.coordinator_id): break - self._client.poll(timeout_ms=min(200, inner_timeout_ms())) + self._client.poll(timeout_ms=inner_timeout_ms(200)) else: continue @@ -451,7 +457,7 @@ def inner_timeout_ms(): continue elif not future.retriable(): raise exception # pylint: disable-msg=raising-bad-type - time.sleep(min(inner_timeout_ms(), self.config['retry_backoff_ms']) / 1000) + time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000) def _rejoin_incomplete(self): return self.join_future is not None