-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Closed
Description
as we can see future.py has no locks, when we use kafka proudcer's method send and add callback, the callback may never called;
when execute success, success callback are called by sender thread, and we add callback in our's work thread;
as 1-5 steps, callback will never be called
# work thread
kafka_producer.send(topic, value).add_both(callback)
def success(self, value):
# called by sender
assert not self.is_done, 'Future is already complete'
#
# 1 if sender thread execute here and then stop and switch to work thread
#
self.value = value
self.is_done = True
if self._callbacks:
self._call_backs('callback', self._callbacks, self.value)
# 4 sender thread execute finished and switch to work thread
return self
def failure(self, e):
assert not self.is_done, 'Future is already complete'
self.exception = e if type(e) is not type else e()
assert isinstance(self.exception, BaseException), (
'future failed without an exception')
self.is_done = True
self._call_backs('errback', self._errbacks, self.exception)
return self
def add_callback(self, f, *args, **kwargs):
if args or kwargs:
f = functools.partial(f, *args, **kwargs)
#
# 2 work thread execute here and is_done is still False
#
if self.is_done and not self.exception:
self._call_backs('callback', [f], self.value)
else:
# 3 work thread stop here and been switch to sender thread
self._callbacks.append(f)
# 5 work thread add callback , but this call back will never be called
return self
Metadata
Metadata
Assignees
Labels
No labels