Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 64 additions & 59 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,20 +231,19 @@ def coordinator(self):

Returns: the current coordinator id or None if it is unknown
"""
with self._lock:
if self.coordinator_id is None:
return None
elif self._client.is_disconnected(self.coordinator_id):
self.coordinator_dead('Node Disconnected')
return None
else:
return self.coordinator_id
if self.coordinator_id is None:
return None
elif self._client.is_disconnected(self.coordinator_id):
self.coordinator_dead('Node Disconnected')
return None
else:
return self.coordinator_id

def ensure_coordinator_ready(self):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
with self._lock:
with self._client._lock, self._lock:
while self.coordinator_unknown():

# Prior to 0.8.2 there was no group coordinator
Expand Down Expand Up @@ -274,17 +273,18 @@ def _reset_find_coordinator_future(self, result):
self._find_coordinator_future = None

def lookup_coordinator(self):
if self._find_coordinator_future is not None:
return self._find_coordinator_future

# If there is an error sending the group coordinator request
# then _reset_find_coordinator_future will immediately fire and
# set _find_coordinator_future = None
# To avoid returning None, we capture the future in a local variable
self._find_coordinator_future = self._send_group_coordinator_request()
future = self._find_coordinator_future
self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
return future
with self._client._lock, self._lock:
if self._find_coordinator_future is not None:
return self._find_coordinator_future

# If there is an error sending the group coordinator request
# then _reset_find_coordinator_future will immediately fire and
# set _find_coordinator_future = None
# To avoid returning None, we capture the future in a local variable
future = self._send_group_coordinator_request()
self._find_coordinator_future = future
self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
return future

def need_rejoin(self):
"""Check whether the group should be rejoined (e.g. if metadata changes)
Expand Down Expand Up @@ -487,7 +487,7 @@ def _handle_join_group_response(self, future, send_time, response):
log.debug("Received successful JoinGroup response for group %s: %s",
self.group_id, response)
self.sensors.join_latency.record((time.time() - send_time) * 1000)
with self._lock:
with self._client._lock, self._lock:
if self.state is not MemberState.REBALANCING:
# if the consumer was woken up before a rebalance completes,
# we may have already left the group. In this case, we do
Expand Down Expand Up @@ -663,7 +663,7 @@ def _handle_group_coordinator_response(self, future, response):

error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
with self._lock:
with self._client._lock, self._lock:
ok = self._client.cluster.add_group_coordinator(self.group_id, response)
if not ok:
# This could happen if coordinator metadata is different
Expand Down Expand Up @@ -693,11 +693,10 @@ def _handle_group_coordinator_response(self, future, response):

def coordinator_dead(self, error):
"""Mark the current coordinator as dead."""
with self._lock:
if self.coordinator_id is not None:
log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
self.coordinator_id, self.group_id, error)
self.coordinator_id = None
if self.coordinator_id is not None:
log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
self.coordinator_id, self.group_id, error)
self.coordinator_id = None

def generation(self):
"""Get the current generation state if the group is stable.
Expand Down Expand Up @@ -741,13 +740,13 @@ def __del__(self):
def close(self):
"""Close the coordinator, leave the current group,
and reset local generation / member_id"""
with self._lock:
with self._client._lock, self._lock:
self._close_heartbeat_thread()
self.maybe_leave_group()

def maybe_leave_group(self):
"""Leave the current group and reset local generation/memberId."""
with self._lock:
with self._client._lock, self._lock:
if (not self.coordinator_unknown()
and self.state is not MemberState.UNJOINED
and self._generation is not Generation.NO_GENERATION):
Expand Down Expand Up @@ -941,40 +940,46 @@ def _run_once(self):
self.disable()
return

# TODO: When consumer.wakeup() is implemented, we need to
# disable here to prevent propagating an exception to this
# heartbeat thread
self.coordinator._client.poll(timeout_ms=0)

if self.coordinator.coordinator_unknown():
future = self.coordinator.lookup_coordinator()
if not future.is_done or future.failed():
# the immediate future check ensures that we backoff
# properly in the case that no brokers are available
# to connect to (and the future is automatically failed).
# TODO: When consumer.wakeup() is implemented, we need to
# disable here to prevent propagating an exception to this
# heartbeat thread
#
# Release coordinator lock during client poll to avoid deadlocks
# if/when connection errback needs coordinator lock
self.coordinator._client.poll(timeout_ms=0)

if self.coordinator.coordinator_unknown():
future = self.coordinator.lookup_coordinator()
if not future.is_done or future.failed():
# the immediate future check ensures that we backoff
# properly in the case that no brokers are available
# to connect to (and the future is automatically failed).
with self.coordinator._lock:
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)

elif self.coordinator.heartbeat.session_timeout_expired():
# the session timeout has expired without seeing a
# successful heartbeat, so we should probably make sure
# the coordinator is still healthy.
log.warning('Heartbeat session expired, marking coordinator dead')
self.coordinator.coordinator_dead('Heartbeat session expired')

elif self.coordinator.heartbeat.poll_timeout_expired():
# the poll timeout has expired, which means that the
# foreground thread has stalled in between calls to
# poll(), so we explicitly leave the group.
log.warning('Heartbeat poll expired, leaving group')
self.coordinator.maybe_leave_group()

elif not self.coordinator.heartbeat.should_heartbeat():
# poll again after waiting for the retry backoff in case
# the heartbeat failed or the coordinator disconnected
log.log(0, 'Not ready to heartbeat, waiting')
elif self.coordinator.heartbeat.session_timeout_expired():
# the session timeout has expired without seeing a
# successful heartbeat, so we should probably make sure
# the coordinator is still healthy.
log.warning('Heartbeat session expired, marking coordinator dead')
self.coordinator.coordinator_dead('Heartbeat session expired')

elif self.coordinator.heartbeat.poll_timeout_expired():
# the poll timeout has expired, which means that the
# foreground thread has stalled in between calls to
# poll(), so we explicitly leave the group.
log.warning('Heartbeat poll expired, leaving group')
self.coordinator.maybe_leave_group()

elif not self.coordinator.heartbeat.should_heartbeat():
# poll again after waiting for the retry backoff in case
# the heartbeat failed or the coordinator disconnected
log.log(0, 'Not ready to heartbeat, waiting')
with self.coordinator._lock:
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)

else:
else:
with self.coordinator._client._lock, self.coordinator._lock:
self.coordinator.heartbeat.sent_heartbeat()
future = self.coordinator._send_heartbeat_request()
future.add_callback(self._handle_heartbeat_success)
Expand Down