From f7dc19d11c89e3950f3c5ce0b3755d9f6f543da0 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Fri, 28 Jun 2013 15:04:33 +0530 Subject: [PATCH 1/5] Remove multi-threaded auto-commit. Use process instead This is done using shared memory objects provided by multiprocessing. Also fix a bug in auto-commit for - auto_commit_every_n --- kafka/conn.py | 3 +- kafka/consumer.py | 159 ++++++++++++++++++++++++++++++++-------------- kafka/util.py | 48 -------------- 3 files changed, 114 insertions(+), 96 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 01975e4d1..4c2f240d9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,11 +1,10 @@ import logging import socket import struct -from threading import local log = logging.getLogger("kafka") -class KafkaConnection(local): +class KafkaConnection(object): """ A socket connection to a single Kafka broker diff --git a/kafka/consumer.py b/kafka/consumer.py index 6ceea72f8..6f08d52d9 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,19 +1,15 @@ from itertools import izip_longest, repeat import logging import time -from threading import Lock -from multiprocessing import Process, Queue, Event, Value from Queue import Empty +from multiprocessing import Process, Queue, Event, Value, Array, \ + current_process from kafka.common import ( ErrorMapping, FetchRequest, OffsetRequest, OffsetFetchRequest, OffsetCommitRequest ) -from kafka.util import ( - ReentrantTimer -) - log = logging.getLogger("kafka") AUTO_COMMIT_MSG_COUNT = 100 @@ -51,6 +47,40 @@ def __exit__(self, type, value, traceback): self.consumer.fetch_min_bytes = FETCH_MIN_BYTES +class Offsets(dict): + """ + A dictionary of partitions=>offsets. The dict is such that the entries + are shared over multiprocessing + """ + def __init__(self, *args, **kwargs): + super(Offsets, self).__init__(*args, **kwargs) + self.length = len(self) * 2 + self.array = Array('i', self.length) + self.__syncup() + + def __syncup(self): + i = 0 + for k, v in self.items(): + self.array[i] = k + self.array[i+1] = v + i += 2 + + def __setitem__(self, key, value): + super(Offsets, self).__setitem__(key, value) + self.__syncup() + + def shareditems(self, keys=None): + if keys is None: + keys = self.keys() + + for i in range(self.length): + if i % 2 == 0: + k = self.array[i] + else: + if k in keys: + yield k, self.array[i] + + class Consumer(object): """ Base class to be used by other consumers. Not to be used directly @@ -68,25 +98,20 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, self.topic = topic self.group = group self.client._load_metadata_for_topics(topic) - self.offsets = {} + offsets = {} if not partitions: partitions = self.client.topic_partitions[topic] # Variables for handling offset commits - self.commit_lock = Lock() + self.commit_queue = Queue() + self.commit_event = Event() self.commit_timer = None - self.count_since_commit = 0 + self.count_since_commit = Value('i', 0) self.auto_commit = auto_commit self.auto_commit_every_n = auto_commit_every_n self.auto_commit_every_t = auto_commit_every_t - # Set up the auto-commit timer - if auto_commit is True and auto_commit_every_t is not None: - self.commit_timer = ReentrantTimer(auto_commit_every_t, - self.commit) - self.commit_timer.start() - def get_or_init_offset_callback(resp): if resp.error == ErrorMapping.NO_ERROR: return resp.offset @@ -104,48 +129,89 @@ def get_or_init_offset_callback(resp): # (offset,) = self.client.send_offset_fetch_request(group, [req], # callback=get_or_init_offset_callback, # fail_on_error=False) - # self.offsets[partition] = offset + # offsets[partition] = offset for partition in partitions: - self.offsets[partition] = 0 + offsets[partition] = 0 + + # Set this as a shared object + self.offsets = Offsets(offsets) + + # Start committer only in the master/controller + if not current_process().daemon: + self.commit_timer = Process(target=self._committer, + args=(self.offsets,)) + self.commit_timer.daemon = True + self.commit_timer.start() + + def _committer(self, offsets): + """ + The process thread which takes care of committing + """ + self.client.reinit() + self.offsets = offsets + timeout = self.auto_commit_every_t + + if timeout is not None: + timeout /= 1000.0 + + while True: + try: + partitions = self.commit_queue.get(timeout=timeout) + if partitions == -1: + break + notify = True + except Empty: + # A timeout has happened. Do a commit + partitions = None + notify = False + + self._commit(partitions) - def commit(self, partitions=None): + if notify: + self.commit_event.set() + + def commit(self, partitions=None, block=True, timeout=None): """ Commit offsets for this consumer partitions: list of partitions to commit, default is to commit all of them + block: If set, the API will block for commit to happen + timeout: The time in seconds for the API to block """ + self.commit_event.clear() + self.commit_queue.put(partitions) - # short circuit if nothing happened. This check is kept outside - # to prevent un-necessarily acquiring a lock for checking the state - if self.count_since_commit == 0: - return + if block: + self.commit_event.wait(timeout) + + def _commit(self, partitions=None): + """ + Commit offsets for this consumer - with self.commit_lock: - # Do this check again, just in case the state has changed - # during the lock acquiring timeout - if self.count_since_commit == 0: - return + partitions: list of partitions to commit, default is to commit + all of them + """ - reqs = [] - if not partitions: # commit all partitions - partitions = self.offsets.keys() + # short circuit if nothing happened. + if self.count_since_commit.value == 0: + return - for partition in partitions: - offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: " - "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) + reqs = [] + for partition, offset in self.offsets.shareditems(keys=partitions): + log.debug("Commit offset %d in SimpleConsumer: " + "group=%s, topic=%s, partition=%s" % + (offset, self.group, self.topic, partition)) - reqs.append(OffsetCommitRequest(self.topic, partition, - offset, None)) + reqs.append(OffsetCommitRequest(self.topic, partition, + offset, None)) - resps = self.client.send_offset_commit_request(self.group, reqs) - for resp in resps: - assert resp.error == 0 + resps = self.client.send_offset_commit_request(self.group, reqs) + for resp in resps: + assert resp.error == 0 - self.count_since_commit = 0 + self.count_since_commit.value = 0 def _auto_commit(self): """ @@ -156,13 +222,14 @@ def _auto_commit(self): if not self.auto_commit or self.auto_commit_every_n is None: return - if self.count_since_commit > self.auto_commit_every_n: + if self.count_since_commit.value >= self.auto_commit_every_n: self.commit() def stop(self): if self.commit_timer is not None: - self.commit_timer.stop() self.commit() + self.commit_queue.put(-1) + self.commit_timer.join() def pending(self, partitions=None): """ @@ -330,7 +397,7 @@ def __iter__(self): continue # Count, check and commit messages if necessary - self.count_since_commit += 1 + self.count_since_commit.value += 1 self._auto_commit() def __iter_partition__(self, partition, offset): @@ -537,7 +604,7 @@ def __iter__(self): self.start.clear() yield message - self.count_since_commit += 1 + self.count_since_commit.value += 1 self._auto_commit() self.start.clear() @@ -577,7 +644,7 @@ def get_messages(self, count=1, block=True, timeout=10): # Count, check and commit messages if necessary self.offsets[partition] = message.offset - self.count_since_commit += 1 + self.count_since_commit.value += 1 self._auto_commit() count -= 1 diff --git a/kafka/util.py b/kafka/util.py index 11178f556..8318ac5c3 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,7 +1,6 @@ from collections import defaultdict from itertools import groupby import struct -from threading import Thread, Event def write_int_string(s): @@ -72,50 +71,3 @@ class BufferUnderflowError(Exception): class ChecksumError(Exception): pass - - -class ReentrantTimer(object): - """ - A timer that can be restarted, unlike threading.Timer - (although this uses threading.Timer) - - t: timer interval in milliseconds - fn: a callable to invoke - args: tuple of args to be passed to function - kwargs: keyword arguments to be passed to function - """ - def __init__(self, t, fn, *args, **kwargs): - - if t <= 0: - raise ValueError('Invalid timeout value') - - if not callable(fn): - raise ValueError('fn must be callable') - - self.thread = None - self.t = t / 1000.0 - self.fn = fn - self.args = args - self.kwargs = kwargs - self.active = None - - def _timer(self, active): - while not active.wait(self.t): - self.fn(*self.args, **self.kwargs) - - def start(self): - if self.thread is not None: - self.stop() - - self.active = Event() - self.thread = Thread(target=self._timer, args=(self.active,)) - self.thread.daemon = True # So the app exits when main thread exits - self.thread.start() - - def stop(self): - if self.thread is None: - return - - self.active.set() - self.thread.join(self.t + 1) - self.timer = None From 82fbba253fc9f01c616980635b4f9eb620010b9c Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Fri, 4 Oct 2013 13:00:58 +0530 Subject: [PATCH 2/5] Avoid doing an un-necessary commit on stop() --- kafka/consumer.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 62a672962..990b887eb 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -229,7 +229,12 @@ def _auto_commit(self): def stop(self): if self.commit_timer is not None: - self.commit() + # We will do an auto commit only if configured to do so + # Else, it is the responsibility of the caller to commit before + # stopping + if self.auto_commit: + self.commit() + self.commit_queue.put(-1) self.commit_timer.join() From 6e8ad21b0ce3583433bec5e61eb155da4e83fa89 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 8 Oct 2013 11:10:43 +0530 Subject: [PATCH 3/5] Ensure that the multi-process committer will work in windows --- kafka/consumer.py | 122 +++++++++++++++++++++++++--------------------- 1 file changed, 66 insertions(+), 56 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 0968b5465..0dbb9331d 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -83,6 +83,64 @@ def shareditems(self, keys=None): yield k, self.array[i] +def _commit(client, group, topic, count, offsets, partitions=None): + """ + Commit offsets for this consumer + + partitions: list of partitions to commit, default is to commit + all of them + """ + + # short circuit if nothing happened. + if count.value == 0: + return + + reqs = [] + for partition, offset in offsets.shareditems(keys=partitions): + log.debug("Commit offset %d in SimpleConsumer: " + "group=%s, topic=%s, partition=%s" % + (offset, group, topic, partition)) + + reqs.append(OffsetCommitRequest(topic, partition, offset, None)) + + resps = client.send_offset_commit_request(group, reqs) + for resp in resps: + assert resp.error == 0 + + count.value = 0 + + +def _committer(client, group, topic, timeout, queue, event, count, offsets): + """ + The process thread which takes care of committing + + NOTE: Ideally, this should have been a method inside the Consumer + class. However, multiprocessing module has issues in windows. The + functionality breaks unless this function is kept outside of a class + """ + client.reinit() + + if timeout is not None: + timeout /= 1000.0 + + while True: + try: + partitions = queue.get(timeout=timeout) + if partitions == -1: + break + notify = True + except Empty: + # A timeout has happened. Do a commit + partitions = None + notify = False + + # Try and commit the offsets + _commit(client, group, topic, count, offsets, partitions) + + if notify: + event.set() + + class Consumer(object): """ Base class to be used by other consumers. Not to be used directly @@ -141,38 +199,17 @@ def get_or_init_offset_callback(resp): # Start committer only in the master/controller if not current_process().daemon: - self.commit_timer = Process(target=self._committer, - args=(self.offsets,)) + args = (client.copy(), group, topic, + auto_commit_every_t, + self.commit_queue, + self.commit_event, + self.count_since_commit, + self.offsets) + + self.commit_timer = Process(target=_committer, args=args) self.commit_timer.daemon = True self.commit_timer.start() - def _committer(self, offsets): - """ - The process thread which takes care of committing - """ - self.client.reinit() - self.offsets = offsets - timeout = self.auto_commit_every_t - - if timeout is not None: - timeout /= 1000.0 - - while True: - try: - partitions = self.commit_queue.get(timeout=timeout) - if partitions == -1: - break - notify = True - except Empty: - # A timeout has happened. Do a commit - partitions = None - notify = False - - self._commit(partitions) - - if notify: - self.commit_event.set() - def commit(self, partitions=None, block=True, timeout=None): """ Commit offsets for this consumer @@ -188,33 +225,6 @@ def commit(self, partitions=None, block=True, timeout=None): if block: self.commit_event.wait(timeout) - def _commit(self, partitions=None): - """ - Commit offsets for this consumer - - partitions: list of partitions to commit, default is to commit - all of them - """ - - # short circuit if nothing happened. - if self.count_since_commit.value == 0: - return - - reqs = [] - for partition, offset in self.offsets.shareditems(keys=partitions): - log.debug("Commit offset %d in SimpleConsumer: " - "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) - - reqs.append(OffsetCommitRequest(self.topic, partition, - offset, None)) - - resps = self.client.send_offset_commit_request(self.group, reqs) - for resp in resps: - assert resp.error == 0 - - self.count_since_commit.value = 0 - def _auto_commit(self): """ Check if we have to commit based on number of messages and commit From cf3469d5688f4eeadc90912e9cdc235fdbb7e620 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 8 Oct 2013 11:32:20 +0530 Subject: [PATCH 4/5] prevent committer from periodic wakeup if auto_commit is false --- kafka/consumer.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 0dbb9331d..bc0ebeab7 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -169,8 +169,13 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, self.commit_timer = None self.count_since_commit = Value('i', 0) self.auto_commit = auto_commit - self.auto_commit_every_n = auto_commit_every_n - self.auto_commit_every_t = auto_commit_every_t + + if auto_commit: + self.auto_commit_every_n = auto_commit_every_n + self.auto_commit_every_t = auto_commit_every_t + else: + self.auto_commit_every_n = None + self.auto_commit_every_t = None def get_or_init_offset_callback(resp): if resp.error == ErrorMapping.NO_ERROR: From bdd106d255883a4ce5bde054d209aad6e84fce94 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 8 Oct 2013 11:36:34 +0530 Subject: [PATCH 5/5] prevent committer from periodic wakeup if auto_commit is false --- kafka/consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index bc0ebeab7..091c78970 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -205,7 +205,7 @@ def get_or_init_offset_callback(resp): # Start committer only in the master/controller if not current_process().daemon: args = (client.copy(), group, topic, - auto_commit_every_t, + self.auto_commit_every_t, self.commit_queue, self.commit_event, self.count_since_commit,