From ab72da3db32556b04d45954da8cf6c462dc53434 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Wed, 12 Jun 2013 14:56:58 +0530 Subject: [PATCH 01/18] Add support to consume messages from specific partitions Currently the kafka SimpleConsumer consumes messages from all partitions. This commit will ensure that data is consumed only from partitions specified during init --- kafka/consumer.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index c9f12e1a7..467bd76aa 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -24,6 +24,7 @@ class SimpleConsumer(object): client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique topic: the topic to consume + partitions: An optional list of partitions to consume the data from auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume @@ -38,7 +39,7 @@ class SimpleConsumer(object): these triggers """ - def __init__(self, client, group, topic, auto_commit=True, + def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): self.client = client @@ -47,6 +48,9 @@ def __init__(self, client, group, topic, auto_commit=True, self.client._load_metadata_for_topics(topic) self.offsets = {} + if not partitions: + partitions = self.client.topic_partitions[topic] + # Variables for handling offset commits self.commit_lock = Lock() self.commit_timer = None @@ -73,14 +77,14 @@ def get_or_init_offset_callback(resp): # Uncomment for 0.8.1 # - #for partition in self.client.topic_partitions[topic]: + #for partition in partitions: # req = OffsetFetchRequest(topic, partition) # (offset,) = self.client.send_offset_fetch_request(group, [req], # callback=get_or_init_offset_callback, # fail_on_error=False) # self.offsets[partition] = offset - for partition in self.client.topic_partitions[topic]: + for partition in partitions: self.offsets[partition] = 0 def stop(self): @@ -126,13 +130,13 @@ def seek(self, offset, whence): else: raise ValueError("Unexpected value for `whence`, %d" % whence) - def pending(self, partitions=[]): + def pending(self, partitions=None): """ Gets the pending message count partitions: list of partitions to check for, default is to check all """ - if len(partitions) == 0: + if not partitions: partitions = self.offsets.keys() total = 0 @@ -159,7 +163,7 @@ def _timed_commit(self): # Once the commit is done, start the timer again self.commit_timer.start() - def commit(self, partitions=[]): + def commit(self, partitions=None): """ Commit offsets for this consumer @@ -173,7 +177,7 @@ def commit(self, partitions=[]): with self.commit_lock: reqs = [] - if len(partitions) == 0: # commit all partitions + if not partitions: # commit all partitions partitions = self.offsets.keys() for partition in partitions: From b578725fc338dc80cb82ad3471d488881f9dd785 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Mon, 24 Jun 2013 18:05:13 +0530 Subject: [PATCH 02/18] Add support for multi-process consumer --- README.md | 14 ++ kafka/consumer.py | 334 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 300 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 60cc74514..c6e3e6348 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,20 @@ producer.send("key2", "this methode") producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) ``` +# Multiprocess consumer +# This will split the number of partitions among two processes +consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", num_procs=2) + +# This will spawn processes such that each handles 2 partitions max +consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", + partitions_per_proc=2) + +for message in consumer: + print(message) + +for message in consumer.get_messages(count=5, block=True, timeout=4): + print(message) + ## Low level ```python diff --git a/kafka/consumer.py b/kafka/consumer.py index 467bd76aa..e6d95d06b 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,6 +1,7 @@ from itertools import izip_longest, repeat import logging from threading import Lock +from multiprocessing import Process, Queue, Event, Value from kafka.common import ( ErrorMapping, FetchRequest, @@ -17,36 +18,17 @@ AUTO_COMMIT_INTERVAL = 5000 -class SimpleConsumer(object): - """ - A simple consumer implementation that consumes all partitions for a topic - - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - partitions: An optional list of partitions to consume the data from - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - - Auto commit details: - If both auto_commit_every_n and auto_commit_every_t are set, they will - reset one another when one is triggered. These triggers simply call the - commit method on this class. A manual call to commit will also reset - these triggers - - """ - def __init__(self, client, group, topic, auto_commit=True, partitions=None, +class Consumer(object): + def __init__(self, client, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): + self.client = client self.topic = topic self.group = group self.client._load_metadata_for_topics(topic) self.offsets = {} + self.partition_info = False if not partitions: partitions = self.client.topic_partitions[topic] @@ -87,6 +69,123 @@ def get_or_init_offset_callback(resp): for partition in partitions: self.offsets[partition] = 0 + def provide_partition_info(self): + self.partition_info = True + + def _timed_commit(self): + """ + Commit offsets as part of timer + """ + self.commit() + + # Once the commit is done, start the timer again + self.commit_timer.start() + + def commit(self, partitions=None): + """ + Commit offsets for this consumer + + partitions: list of partitions to commit, default is to commit + all of them + """ + + # short circuit if nothing happened + if self.count_since_commit == 0: + return + + with self.commit_lock: + reqs = [] + if not partitions: # commit all partitions + partitions = self.offsets.keys() + + for partition in partitions: + offset = self.offsets[partition] + log.debug("Commit offset %d in SimpleConsumer: " + "group=%s, topic=%s, partition=%s" % + (offset, self.group, self.topic, partition)) + + reqs.append(OffsetCommitRequest(self.topic, partition, + offset, None)) + + resps = self.client.send_offset_commit_request(self.group, reqs) + for resp in resps: + assert resp.error == 0 + + self.count_since_commit = 0 + + def _auto_commit(self): + """ + Check if we have to commit based on number of messages and commit + """ + + # Check if we are supposed to do an auto-commit + if not self.auto_commit or self.auto_commit_every_n is None: + return + + if self.count_since_commit > self.auto_commit_every_n: + if self.commit_timer is not None: + self.commit_timer.stop() + self.commit() + self.commit_timer.start() + else: + self.commit() + + def pending(self, partitions=None): + """ + Gets the pending message count + + partitions: list of partitions to check for, default is to check all + """ + if not partitions: + partitions = self.offsets.keys() + + total = 0 + reqs = [] + + for partition in partitions: + reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + + resps = self.client.send_offset_request(reqs) + for resp in resps: + partition = resp.partition + pending = resp.offsets[0] + offset = self.offsets[partition] + total += pending - offset - (1 if offset > 0 else 0) + + return total + + +class SimpleConsumer(Consumer): + """ + A simple consumer implementation that consumes all partitions for a topic + + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + partitions: An optional list of partitions to consume the data from + + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + + """ + def __init__(self, client, group, topic, auto_commit=True, partitions=None, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL): + + super(SimpleConsumer, self).__init__(client, group, topic, + auto_commit, partitions, + auto_commit_every_n, + auto_commit_every_t) + def stop(self): if self.commit_timer is not None: self.commit_timer.stop() @@ -130,30 +229,6 @@ def seek(self, offset, whence): else: raise ValueError("Unexpected value for `whence`, %d" % whence) - def pending(self, partitions=None): - """ - Gets the pending message count - - partitions: list of partitions to check for, default is to check all - """ - if not partitions: - partitions = self.offsets.keys() - - total = 0 - reqs = [] - - for partition in partitions: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) - - resps = self.client.send_offset_request(reqs) - for resp in resps: - partition = resp.partition - pending = resp.offsets[0] - offset = self.offsets[partition] - total += pending - offset - (1 if offset > 0 else 0) - - return total - def _timed_commit(self): """ Commit offsets as part of timer @@ -230,7 +305,10 @@ def __iter__(self): for partition, it in iters.items(): try: - yield it.next() + if self.partition_info: + yield (partition, it.next()) + else: + yield it.next() except StopIteration: log.debug("Done iterating over partition %s" % partition) del iters[partition] @@ -250,6 +328,9 @@ def __iter_partition__(self, partition, offset): the end of this partition. """ + if offset != 0: + offset += 1 + while True: # TODO: configure fetch size req = FetchRequest(self.topic, partition, offset, 1024) @@ -268,3 +349,160 @@ def __iter_partition__(self, partition, offset): break else: offset = next_offset + 1 + + +class MultiProcessConsumer(Consumer): + """ + A consumer implementation that consumes partitions for a topic in + parallel from multiple partitions + + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + num_procs: Number of processes to start for consuming messages. + The available partitions will be divided among these processes + partitions_per_proc: Number of partitions to be allocated per process + (overrides num_procs) + + Auto commit details: + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers + + """ + def __init__(self, client, group, topic, auto_commit=True, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + num_procs=1, partitions_per_proc=0): + + # Initiate the base consumer class + super(MultiProcessConsumer, self).__init__(client, group, topic, + auto_commit, partitions, + auto_commit_every_n, + auto_commit_every_t) + + # Variables for managing and controlling the data flow from + # consumer child process to master + self.queue = Queue() # Child consumers dump messages into this + self.start = Event() # Indicates the consumers to start + self.exit = Event() # Requests the consumers to shutdown + self.pause = Event() # Requests the consumers to pause + self.size = Value('i', 0) # Indicator of number of messages to fetch + + partitions = self.offsets.keys() + + # If unspecified, start one consumer per partition + if not partitions_per_proc: + partitions_per_proc = round(len(partitions) * 1.0 / num_procs) + if partitions_per_proc < num_procs * 0.5: + partitions_per_proc += 1 + + self.procs = [] + + for slices in map(None, *[iter(partitions)] * int(partitions_per_proc)): + proc = Process(target=_self._consume, args=(slices,)) + proc.daemon = True + proc.start() + self.procs.append(proc) + + # We do not need a consumer instance anymore + consumer.stop() + + def _consume(self, slices): + + # We will start consumers without auto-commit. Auto-commit will be + # done by the master process. + consumer = SimpleConsumer(self.client, self.group, self.topic, + partitions=slices, + auto_commit=False, + auto_commit_every_n=0, + auto_commit_every_t=None) + + # Ensure that the consumer provides the partition information + consumer.provide_partition_info() + + while True: + self.start.wait() + if self.exit.isSet(): + break + + count = 0 + for partition, message in consumer: + self.queue.put((partition, message)) + count += 1 + + # We have reached the required size. The master might have + # more than what he needs. Wait for a while + if count == self.size.value: + self.pause.wait() + break + + consumer.stop() + + def stop(self): + # Set exit and start off all waiting consumers + self.exit.set() + self.start.set() + + for proc in self.procs: + proc.join() + proc.terminate() + + def __iter__(self): + # Trigger the consumer procs to start off + self.size.value = 0 + self.start.set() + self.pause.set() + + while not self.queue.empty(): + partition, message = self.queue.get() + yield message + + # Count, check and commit messages if necessary + self.offsets[partition] = message.offset + self.count_since_commit += 1 + self._auto_commit() + + self.start.clear() + + def get_messages(self, count=1, block=True, timeout=10): + messages = [] + + # Give a size hint to the consumers + self.size.value = count + self.pause.clear() + + while count > 0: + # Trigger consumption only if the queue is empty + # By doing this, we will ensure that consumers do not + # go into overdrive and keep consuming thousands of + # messages when the user might need only two + if self.queue.empty(): + self.start.set() + + try: + partition, message = self.queue.get(block, timeout) + except Queue.Empty: + break + + messages.append(message) + + # Count, check and commit messages if necessary + self.offsets[partition] = message.offset + self.count_since_commit += 1 + self._auto_commit() + count -= 1 + + self.size.value = 0 + + self.start.clear() + self.pause.set() + + return messages From e85190e6f6d8ea554df96b2b445924591bd561b6 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Mon, 24 Jun 2013 18:14:04 +0530 Subject: [PATCH 03/18] Minor bug fix --- kafka/consumer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index e6d95d06b..7f67cf20f 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -384,9 +384,10 @@ def __init__(self, client, group, topic, auto_commit=True, # Initiate the base consumer class super(MultiProcessConsumer, self).__init__(client, group, topic, - auto_commit, partitions, + auto_commit, auto_commit_every_n, - auto_commit_every_t) + auto_commit_every_t, + partitions=None) # Variables for managing and controlling the data flow from # consumer child process to master From c7dfebaa8d9ee6bc6af8f1d03557ed1e089a0f65 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 25 Jun 2013 11:36:34 +0530 Subject: [PATCH 04/18] Added more documentation and clean up duplicate code --- kafka/consumer.py | 162 ++++++++++++++++++++++------------------------ 1 file changed, 76 insertions(+), 86 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 7f67cf20f..2b77d005f 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -19,6 +19,14 @@ class Consumer(object): + """ + Base class to be used by other consumers. Not to be used directly + + This base class provides logic for + * initialization and fetching metadata of partitions + * Auto-commit logic + * APIs for fetching pending message count + """ def __init__(self, client, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): @@ -28,7 +36,6 @@ def __init__(self, client, topic, partitions=None, auto_commit=True, self.group = group self.client._load_metadata_for_topics(topic) self.offsets = {} - self.partition_info = False if not partitions: partitions = self.client.topic_partitions[topic] @@ -69,9 +76,6 @@ def get_or_init_offset_callback(resp): for partition in partitions: self.offsets[partition] = 0 - def provide_partition_info(self): - self.partition_info = True - def _timed_commit(self): """ Commit offsets as part of timer @@ -157,7 +161,8 @@ def pending(self, partitions=None): class SimpleConsumer(Consumer): """ - A simple consumer implementation that consumes all partitions for a topic + A simple consumer implementation that consumes all/specified partitions + for a topic client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique @@ -175,17 +180,25 @@ class SimpleConsumer(Consumer): reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers - """ def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): + # Indicates if partition info will be returned in messages + self.partition_info = False + super(SimpleConsumer, self).__init__(client, group, topic, auto_commit, partitions, auto_commit_every_n, auto_commit_every_t) + def provide_partition_info(self): + """ + Indicates that partition info must be returned by the consumer + """ + self.partition_info = True + def stop(self): if self.commit_timer is not None: self.commit_timer.stop() @@ -229,64 +242,6 @@ def seek(self, offset, whence): else: raise ValueError("Unexpected value for `whence`, %d" % whence) - def _timed_commit(self): - """ - Commit offsets as part of timer - """ - self.commit() - - # Once the commit is done, start the timer again - self.commit_timer.start() - - def commit(self, partitions=None): - """ - Commit offsets for this consumer - - partitions: list of partitions to commit, default is to commit - all of them - """ - - # short circuit if nothing happened - if self.count_since_commit == 0: - return - - with self.commit_lock: - reqs = [] - if not partitions: # commit all partitions - partitions = self.offsets.keys() - - for partition in partitions: - offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: " - "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) - - reqs.append(OffsetCommitRequest(self.topic, partition, - offset, None)) - - resps = self.client.send_offset_commit_request(self.group, reqs) - for resp in resps: - assert resp.error == 0 - - self.count_since_commit = 0 - - def _auto_commit(self): - """ - Check if we have to commit based on number of messages and commit - """ - - # Check if we are supposed to do an auto-commit - if not self.auto_commit or self.auto_commit_every_n is None: - return - - if self.count_since_commit > self.auto_commit_every_n: - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - self.commit_timer.start() - else: - self.commit() - def __iter__(self): """ Create an iterate per partition. Iterate through them calling next() @@ -354,7 +309,7 @@ def __iter_partition__(self, partition, offset): class MultiProcessConsumer(Consumer): """ A consumer implementation that consumes partitions for a topic in - parallel from multiple partitions + parallel using multiple processes client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique @@ -375,7 +330,6 @@ class MultiProcessConsumer(Consumer): reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers - """ def __init__(self, client, group, topic, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, @@ -392,55 +346,70 @@ def __init__(self, client, group, topic, auto_commit=True, # Variables for managing and controlling the data flow from # consumer child process to master self.queue = Queue() # Child consumers dump messages into this - self.start = Event() # Indicates the consumers to start + self.start = Event() # Indicates the consumers to start fetch self.exit = Event() # Requests the consumers to shutdown - self.pause = Event() # Requests the consumers to pause + self.pause = Event() # Requests the consumers to pause fetch self.size = Value('i', 0) # Indicator of number of messages to fetch partitions = self.offsets.keys() # If unspecified, start one consumer per partition + # The logic below ensures that + # * we do not cross the num_procs limit + # * we have an even distribution of partitions among processes if not partitions_per_proc: partitions_per_proc = round(len(partitions) * 1.0 / num_procs) if partitions_per_proc < num_procs * 0.5: partitions_per_proc += 1 - self.procs = [] + # The final set of chunks + chunks = map(None, *[iter(partitions)] * int(partitions_per_proc)) - for slices in map(None, *[iter(partitions)] * int(partitions_per_proc)): - proc = Process(target=_self._consume, args=(slices,)) + self.procs = [] + for chunk in chunks: + proc = Process(target=_self._consume, args=(chunk,)) proc.daemon = True proc.start() self.procs.append(proc) - # We do not need a consumer instance anymore - consumer.stop() - - def _consume(self, slices): + def _consume(self, partitions): + """ + A child process worker which consumes messages based on the + notifications given by the controller process + """ # We will start consumers without auto-commit. Auto-commit will be - # done by the master process. + # done by the master controller process. consumer = SimpleConsumer(self.client, self.group, self.topic, - partitions=slices, + partitions=partitions, auto_commit=False, - auto_commit_every_n=0, + auto_commit_every_n=None, auto_commit_every_t=None) # Ensure that the consumer provides the partition information consumer.provide_partition_info() while True: + # Wait till the controller indicates us to start consumption self.start.wait() + + # If we are asked to quit, do so if self.exit.isSet(): break + # Consume messages and add them to the queue. If the controller + # indicates a specific number of messages, follow that advice count = 0 + for partition, message in consumer: self.queue.put((partition, message)) count += 1 - # We have reached the required size. The master might have - # more than what he needs. Wait for a while + # We have reached the required size. The controller might have + # more than what he needs. Wait for a while. + # Without this logic, it is possible that we run into a big + # loop consuming all available messages before the controller + # can reset the 'start' event if count == self.size.value: self.pause.wait() break @@ -450,6 +419,7 @@ def _consume(self, slices): def stop(self): # Set exit and start off all waiting consumers self.exit.set() + self.pause.set() self.start.set() for proc in self.procs: @@ -457,13 +427,23 @@ def stop(self): proc.terminate() def __iter__(self): - # Trigger the consumer procs to start off + """ + Iterator to consume the messages available on this consumer + """ + # Trigger the consumer procs to start off. + # We will iterate till there are no more messages available self.size.value = 0 self.start.set() self.pause.set() - while not self.queue.empty(): - partition, message = self.queue.get() + while True: + try: + # We will block for a small while so that the consumers get + # a chance to run and put some messages in the queue + partition, message = self.queue.get(block=True, timeout=0.1) + except Queue.Empty: + break + yield message # Count, check and commit messages if necessary @@ -474,9 +454,20 @@ def __iter__(self): self.start.clear() def get_messages(self, count=1, block=True, timeout=10): + """ + Fetch the specified number of messages + + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If None, and block=True, the API will block infinitely. + If >0, API will block for specified time (in seconds) + """ messages = [] - # Give a size hint to the consumers + # Give a size hint to the consumers. Each consumer process will fetch + # a maximum of "count" messages. This will fetch more messages than + # necessary, but these will not be committed to kafka. Also, the extra + # messages can be provided in subsequent runs self.size.value = count self.pause.clear() @@ -484,7 +475,7 @@ def get_messages(self, count=1, block=True, timeout=10): # Trigger consumption only if the queue is empty # By doing this, we will ensure that consumers do not # go into overdrive and keep consuming thousands of - # messages when the user might need only two + # messages when the user might need only a few if self.queue.empty(): self.start.set() @@ -502,7 +493,6 @@ def get_messages(self, count=1, block=True, timeout=10): count -= 1 self.size.value = 0 - self.start.clear() self.pause.set() From 2e38a5273270df8959279973cbac69e5658ec9a9 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 25 Jun 2013 11:54:07 +0530 Subject: [PATCH 05/18] Added the modules in __init__.py --- kafka/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/__init__.py b/kafka/__init__.py index d2291692f..13af699d2 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -10,9 +10,10 @@ create_message, create_gzip_message, create_snappy_message ) from kafka.producer import SimpleProducer -from kafka.consumer import SimpleConsumer +from kafka.consumer import SimpleConsumer, MultiProcessConsumer __all__ = [ - 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'SimpleConsumer', + 'KafkaClient', 'KafkaConnection', 'SimpleProducer', + 'SimpleConsumer', 'MultiProcessConsumer', 'create_message', 'create_gzip_message', 'create_snappy_message' ] From b022be28de85634a642cbd4e0ca4ce89d46d21dd Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 25 Jun 2013 17:01:32 +0530 Subject: [PATCH 06/18] Implement blocking get_messages for SimpleConsumer The implementation is done by using simple options to Kafka Fetch Request Also in the SimpleConsumer iterator, update the offset before the message is yielded. This is so that the consumer state is not lost if certain cases. For eg: the message is yielded and consumed by the caller, but the caller does not come back into the generator again. The message will be consumed but the status is not updated in the consumer --- kafka/client.py | 10 +++-- kafka/consumer.py | 93 ++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 87 insertions(+), 16 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 11467980a..a1c21335d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -221,15 +221,19 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000, return out def send_fetch_request(self, payloads=[], fail_on_error=True, - callback=None): + callback=None, max_wait_time=100, min_bytes=4096): """ Encode and send a FetchRequest Payloads are grouped by topic and partition so they can be pipelined to the same brokers. """ - resps = self._send_broker_aware_request(payloads, - KafkaProtocol.encode_fetch_request, + + encoder = partial(KafkaProtocol.encode_fetch_request, + max_wait_time=max_wait_time, + min_bytes=min_bytes) + + resps = self._send_broker_aware_request(payloads, encoder, KafkaProtocol.decode_fetch_response) out = [] diff --git a/kafka/consumer.py b/kafka/consumer.py index 2b77d005f..c0906ad9d 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -17,6 +17,37 @@ AUTO_COMMIT_MSG_COUNT = 100 AUTO_COMMIT_INTERVAL = 5000 +FETCH_DEFAULT_BLOCK_TIMEOUT = 1 +FETCH_MAX_WAIT_TIME = 100 +FETCH_MIN_BYTES = 4096 + + +class FetchContext(object): + """ + Class for managing the state of a consumer during fetch + """ + def __init__(self, consumer, block, timeout): + self.consumer = consumer + self.block = block + + if block and not timeout: + timeout = FETCH_DEFAULT_BLOCK_TIMEOUT + + self.timeout = timeout * 1000 + + def __enter__(self): + """Set fetch values based on blocking status""" + if self.block: + self.consumer.fetch_max_wait_time = self.timeout + self.consumer.fetch_min_bytes = 1 + else: + self.consumer.fetch_min_bytes = 0 + + def __exit__(self, type, value, traceback): + """Reset values to default""" + self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME + self.consumer.fetch_min_bytes = FETCH_MIN_BYTES + class Consumer(object): """ @@ -27,7 +58,7 @@ class Consumer(object): * Auto-commit logic * APIs for fetching pending message count """ - def __init__(self, client, topic, partitions=None, auto_commit=True, + def __init__(self, client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): @@ -185,13 +216,15 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): - # Indicates if partition info will be returned in messages - self.partition_info = False + self.partition_info = False # Do not return partition info in msgs + self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME + self.fetch_min_bytes = FETCH_MIN_BYTES super(SimpleConsumer, self).__init__(client, group, topic, - auto_commit, partitions, - auto_commit_every_n, - auto_commit_every_t) + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) def provide_partition_info(self): """ @@ -242,6 +275,31 @@ def seek(self, offset, whence): else: raise ValueError("Unexpected value for `whence`, %d" % whence) + def get_messages(self, count=1, block=True, timeout=0.1): + """ + Fetch the specified number of messages + + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If None, and block=True, the API will block infinitely. + If >0, API will block for specified time (in seconds) + """ + messages = [] + iterator = self.__iter__() + + # HACK: This splits the timeout between available partitions + timeout = timeout * 1.0 / len(self.offsets) + + with FetchContext(self, block, timeout): + while count > 0: + try: + messages.append(next(iterator)) + except StopIteration as exp: + break + count -= 1 + + return messages + def __iter__(self): """ Create an iterate per partition. Iterate through them calling next() @@ -283,13 +341,23 @@ def __iter_partition__(self, partition, offset): the end of this partition. """ + # The offset that is stored in the consumer is the offset that + # we have consumed. In subsequent iterations, we are supposed to + # fetch the next message (that is from the next offset) + # However, for the 0th message, the offset should be as-is. + # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is + # problematic, since 0 is offset of a message which we have not yet + # consumed. if offset != 0: offset += 1 while True: # TODO: configure fetch size req = FetchRequest(self.topic, partition, offset, 1024) - (resp,) = self.client.send_fetch_request([req]) + + (resp,) = self.client.send_fetch_request([req], + max_wait_time=self.fetch_max_wait_time, + min_bytes=self.fetch_min_bytes) assert resp.topic == self.topic assert resp.partition == partition @@ -297,9 +365,8 @@ def __iter_partition__(self, partition, offset): next_offset = None for message in resp.messages: next_offset = message.offset - yield message - # update the internal state _after_ we yield the message self.offsets[partition] = message.offset + yield message if next_offset is None: break else: @@ -338,10 +405,10 @@ def __init__(self, client, group, topic, auto_commit=True, # Initiate the base consumer class super(MultiProcessConsumer, self).__init__(client, group, topic, - auto_commit, - auto_commit_every_n, - auto_commit_every_t, - partitions=None) + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) # Variables for managing and controlling the data flow from # consumer child process to master From 99da57f98a65a457481dcf5c1edcca95dfd464a5 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 25 Jun 2013 17:06:16 +0530 Subject: [PATCH 07/18] Added some comments about message state --- kafka/consumer.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/kafka/consumer.py b/kafka/consumer.py index c0906ad9d..ca66e87da 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -365,6 +365,13 @@ def __iter_partition__(self, partition, offset): next_offset = None for message in resp.messages: next_offset = message.offset + + # update the offset before the message is yielded. This is + # so that the consumer state is not lost in certain cases. + # For eg: the message is yielded and consumed by the caller, + # but the caller does not come back into the generator again. + # The message will be consumed but the status will not be + # updated in the consumer self.offsets[partition] = message.offset yield message if next_offset is None: From 65c8eb1f9f3a309f924cf469abb16af98bbe5d6d Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Tue, 25 Jun 2013 18:17:39 +0530 Subject: [PATCH 08/18] Got MultiProcessConsumer working Other changes * Put a message size restriction on the shared queue - to prevent message overload * Wait for a while after each process is started (in constructor) * Wait for a while in each child if the consumer does not return any messages Just to be nice to the CPU. * Control the start event more granularly - this prevents infinite loops if the control does not return to the generator. For eg: for msg in consumer: assert False * Update message status before yield --- kafka/consumer.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index ca66e87da..1fa51a200 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,7 +1,9 @@ from itertools import izip_longest, repeat import logging +import time from threading import Lock from multiprocessing import Process, Queue, Event, Value +from Queue import Empty from kafka.common import ( ErrorMapping, FetchRequest, @@ -412,14 +414,14 @@ def __init__(self, client, group, topic, auto_commit=True, # Initiate the base consumer class super(MultiProcessConsumer, self).__init__(client, group, topic, - partitions=partitions, + partitions=None, auto_commit=auto_commit, auto_commit_every_n=auto_commit_every_n, auto_commit_every_t=auto_commit_every_t) # Variables for managing and controlling the data flow from # consumer child process to master - self.queue = Queue() # Child consumers dump messages into this + self.queue = Queue(1024) # Child consumers dump messages into this self.start = Event() # Indicates the consumers to start fetch self.exit = Event() # Requests the consumers to shutdown self.pause = Event() # Requests the consumers to pause fetch @@ -441,9 +443,11 @@ def __init__(self, client, group, topic, auto_commit=True, self.procs = [] for chunk in chunks: - proc = Process(target=_self._consume, args=(chunk,)) + chunk = filter(lambda x: x is not None, list(chunk)) + proc = Process(target=self._consume, args=(chunk,)) proc.daemon = True proc.start() + time.sleep(0.2) self.procs.append(proc) def _consume(self, partitions): @@ -468,7 +472,7 @@ def _consume(self, partitions): self.start.wait() # If we are asked to quit, do so - if self.exit.isSet(): + if self.exit.is_set(): break # Consume messages and add them to the queue. If the controller @@ -488,6 +492,11 @@ def _consume(self, partitions): self.pause.wait() break + # In case we did not receive any message, give up the CPU for + # a while before we try again + if count == 0: + time.sleep(0.1) + consumer.stop() def stop(self): @@ -507,21 +516,22 @@ def __iter__(self): # Trigger the consumer procs to start off. # We will iterate till there are no more messages available self.size.value = 0 - self.start.set() self.pause.set() while True: + self.start.set() try: # We will block for a small while so that the consumers get # a chance to run and put some messages in the queue - partition, message = self.queue.get(block=True, timeout=0.1) - except Queue.Empty: + partition, message = self.queue.get(block=True, timeout=1) + except Empty: break - yield message - # Count, check and commit messages if necessary self.offsets[partition] = message.offset + self.start.clear() + yield message + self.count_since_commit += 1 self._auto_commit() @@ -555,7 +565,7 @@ def get_messages(self, count=1, block=True, timeout=10): try: partition, message = self.queue.get(block, timeout) - except Queue.Empty: + except Empty: break messages.append(message) From d6d7299b7dc00f852014e34df060de9268eddfae Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Thu, 27 Jun 2013 16:26:01 +0530 Subject: [PATCH 09/18] Fix a bug in seek. This was hidden because of another bug in offset management --- kafka/consumer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kafka/consumer.py b/kafka/consumer.py index b87ed1c21..67e1b10dd 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -259,6 +259,12 @@ def seek(self, offset, whence): reqs.append(OffsetRequest(self.topic, partition, -2, 1)) elif whence == 2: reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + + # The API returns back the next available offset + # For eg: if the current offset is 18, the API will return + # back 19. So, if we have to seek 5 points before, we will + # end up going back to 14, instead of 13. Adjust this + deltas[partition] -= 1 else: pass From b3fece508dff6a4fe8c31bc7c2282c114676646b Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Thu, 27 Jun 2013 17:47:26 +0530 Subject: [PATCH 10/18] Re-init the sockets in the new process --- kafka/client.py | 4 ++++ kafka/conn.py | 9 +++++++++ kafka/consumer.py | 4 +++- 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/kafka/client.py b/kafka/client.py index a1c21335d..525551ef3 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -176,6 +176,10 @@ def close(self): for conn in self.conns.values(): conn.close() + def reinit(self): + for conn in self.conns.values(): + conn.reinit() + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ diff --git a/kafka/conn.py b/kafka/conn.py index fce1fdcbf..01975e4d1 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -86,3 +86,12 @@ def recv(self, requestId): def close(self): "Close this connection" self._sock.close() + + def reinit(self): + """ + Re-initialize the socket connection + """ + self._sock.close() + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect((self.host, self.port)) + self._sock.settimeout(10) diff --git a/kafka/consumer.py b/kafka/consumer.py index 67e1b10dd..5ca90debb 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -445,7 +445,6 @@ def __init__(self, client, group, topic, auto_commit=True, proc = Process(target=self._consume, args=(chunk,)) proc.daemon = True proc.start() - time.sleep(0.2) self.procs.append(proc) def _consume(self, partitions): @@ -454,6 +453,9 @@ def _consume(self, partitions): notifications given by the controller process """ + # Make the child processes open separate socket connections + self.client.reinit() + # We will start consumers without auto-commit. Auto-commit will be # done by the master controller process. consumer = SimpleConsumer(self.client, self.group, self.topic, From 36b5f8154304a7fef437795250885230dff835b1 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Thu, 27 Jun 2013 17:47:47 +0530 Subject: [PATCH 11/18] Test cases for multi-process consumer and blocking APIs --- test/test_integration.py | 129 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 120 insertions(+), 9 deletions(-) diff --git a/test/test_integration.py b/test/test_integration.py index d607b7338..7908a34a3 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -1,5 +1,6 @@ import logging import unittest +from datetime import datetime from kafka import * # noqa from kafka.common import * # noqa @@ -273,7 +274,7 @@ def test_simple_producer(self): self.assertEquals(messages[0].message.value, "three") -class TestSimpleConsumer(unittest.TestCase): +class TestConsumer(unittest.TestCase): @classmethod def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() @@ -288,9 +289,9 @@ def tearDownClass(cls): # noqa cls.server2.close() cls.zk.close() - def test_consumer(self): + def test_simple_consumer(self): # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_consumer", 0, messages=[ + produce1 = ProduceRequest("test_simple_consumer", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -299,7 +300,7 @@ def test_consumer(self): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_consumer", 1, messages=[ + produce2 = ProduceRequest("test_simple_consumer", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -308,7 +309,7 @@ def test_consumer(self): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", "test_consumer") + consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer") all_messages = [] for message in consumer: all_messages.append(message) @@ -331,29 +332,139 @@ def test_consumer(self): self.assertEquals(len(all_messages), 13) + # Blocking API + start = datetime.now() + messages = consumer.get_messages(block=True, timeout=5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + self.assertEqual(len(messages), 0) + + # Send 10 messages + produce = ProduceRequest("test_simple_consumer", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + # Fetch 5 messages + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assertEqual(len(messages), 5) + + # Fetch 10 messages + start = datetime.now() + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assertEqual(len(messages), 5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + consumer.stop() - def test_pending(self): + def test_simple_consumer_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_pending", 0, messages=[ + produce1 = ProduceRequest("test_simple_pending", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + produce2 = ProduceRequest("test_simple_pending", 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + consumer = SimpleConsumer(self.client, "group1", "test_simple_pending") + self.assertEquals(consumer.pending(), 20) + self.assertEquals(consumer.pending(partitions=[0]), 10) + self.assertEquals(consumer.pending(partitions=[1]), 10) + consumer.stop() + + def test_multi_process_consumer(self): + # Produce 100 messages to partition 0 + produce1 = ProduceRequest("test_mpconsumer", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Produce 100 messages to partition 1 + produce2 = ProduceRequest("test_mpconsumer", 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Start a consumer + consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer") + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 200) + # Make sure there are no duplicates + self.assertEquals(len(all_messages), len(set(all_messages))) + + # Blocking API + start = datetime.now() + messages = consumer.get_messages(block=True, timeout=5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + self.assertEqual(len(messages), 0) + + # Send 10 messages + produce = ProduceRequest("test_mpconsumer", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + # Fetch 5 messages + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assertEqual(len(messages), 5) + + # Fetch 10 messages + start = datetime.now() + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assertEqual(len(messages), 5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + + consumer.stop() + + def test_multi_proc_pending(self): + # Produce 10 messages to partition 0 and 1 + produce1 = ProduceRequest("test_mppending", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_pending", 1, messages=[ + produce2 = ProduceRequest("test_mppending", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) + for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_pending") + consumer = MultiProcessConsumer(self.client, "group1", "test_mppending") self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) + consumer.stop() From c2a49c1370536e7155c34bb93fa14a23f367d5f4 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Thu, 27 Jun 2013 17:51:12 +0530 Subject: [PATCH 12/18] Add TODO comments --- kafka/consumer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka/consumer.py b/kafka/consumer.py index 5ca90debb..9e6a0eb08 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -523,6 +523,8 @@ def __iter__(self): try: # We will block for a small while so that the consumers get # a chance to run and put some messages in the queue + # TODO: This is a hack and will make the consumer block for + # at least one second. Need to find a better way of doing this partition, message = self.queue.get(block=True, timeout=1) except Empty: break From 6b76d9d682b2b7c6280dda84dbad07e6d856de0e Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Fri, 28 Jun 2013 13:07:10 +0530 Subject: [PATCH 13/18] Fix markup in README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c6e3e6348..544914854 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,6 @@ producer.send("key1", "some message") producer.send("key2", "this methode") producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) -``` # Multiprocess consumer # This will split the number of partitions among two processes @@ -70,6 +69,7 @@ for message in consumer: for message in consumer.get_messages(count=5, block=True, timeout=4): print(message) +``` ## Low level From 9a557ed0bcb5fa1374ac4314ceff5fa168ec2c71 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Fri, 28 Jun 2013 13:08:04 +0530 Subject: [PATCH 14/18] Fix markups --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 544914854..d42f2fc6b 100644 --- a/README.md +++ b/README.md @@ -55,8 +55,10 @@ producer.send("key1", "some message") producer.send("key2", "this methode") producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) +``` # Multiprocess consumer +```python # This will split the number of partitions among two processes consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", num_procs=2) From 7b2a08faed84296221164bcaca15af08dbb88581 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Fri, 28 Jun 2013 13:09:02 +0530 Subject: [PATCH 15/18] Minor markup fix --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d42f2fc6b..03b8d3c4b 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ producer.send("key2", "this methode") producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) ``` -# Multiprocess consumer +## Multiprocess consumer ```python # This will split the number of partitions among two processes consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", num_procs=2) From c13ee1df6ab2900ffe0bd48e6376993c0d312a70 Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Fri, 28 Jun 2013 13:59:06 +0530 Subject: [PATCH 16/18] Fix cases of single partition --- README.md | 2 ++ kafka/consumer.py | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 03b8d3c4b..83ec44748 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,8 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner) ## Multiprocess consumer ```python +from kafka.consume import MultiProcessConsumer + # This will split the number of partitions among two processes consumer = MultiProcessConsumer(kafka, "my-topic", "my-group", num_procs=2) diff --git a/kafka/consumer.py b/kafka/consumer.py index 9e6a0eb08..23a2f90bf 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -437,11 +437,12 @@ def __init__(self, client, group, topic, auto_commit=True, partitions_per_proc += 1 # The final set of chunks - chunks = map(None, *[iter(partitions)] * int(partitions_per_proc)) + chunker = lambda *x: [] + list(x) + chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) self.procs = [] for chunk in chunks: - chunk = filter(lambda x: x is not None, list(chunk)) + chunk = filter(lambda x: x is not None, chunk) proc = Process(target=self._consume, args=(chunk,)) proc.daemon = True proc.start() From c54a2edbaec1c4442cc63c8d3f0874b5882e90bb Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Fri, 28 Jun 2013 14:54:11 +0530 Subject: [PATCH 17/18] Add more cleanup in consumer.stop() --- kafka/consumer.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 23a2f90bf..6ceea72f8 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -159,6 +159,11 @@ def _auto_commit(self): if self.count_since_commit > self.auto_commit_every_n: self.commit() + def stop(self): + if self.commit_timer is not None: + self.commit_timer.stop() + self.commit() + def pending(self, partitions=None): """ Gets the pending message count @@ -226,11 +231,6 @@ def provide_partition_info(self): """ self.partition_info = True - def stop(self): - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - def seek(self, offset, whence): """ Alter the current offset in the consumer, similar to fseek @@ -510,6 +510,8 @@ def stop(self): proc.join() proc.terminate() + super(MultiProcessConsumer, self).stop() + def __iter__(self): """ Iterator to consume the messages available on this consumer From 1d278f0f60cb0a7b76fbc6b80c8e112a0deb2e0c Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Mon, 1 Jul 2013 10:33:50 +0530 Subject: [PATCH 18/18] Fix minor bug in offset management In the current patch get_messages(count=1) would return zero messages the first time it is invoked after a consumer was initialized. --- kafka/consumer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 6ceea72f8..4c64cf2d5 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,3 +1,4 @@ +from collections import defaultdict from itertools import izip_longest, repeat import logging import time @@ -218,6 +219,7 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = FETCH_MIN_BYTES + self.fetch_started = defaultdict(bool) # defaults to false super(SimpleConsumer, self).__init__(client, group, topic, partitions=partitions, @@ -348,7 +350,7 @@ def __iter_partition__(self, partition, offset): # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is # problematic, since 0 is offset of a message which we have not yet # consumed. - if offset != 0: + if self.fetch_started[partition]: offset += 1 while True: @@ -372,6 +374,7 @@ def __iter_partition__(self, partition, offset): # but the caller does not come back into the generator again. # The message will be consumed but the status will not be # updated in the consumer + self.fetch_started[partition] = True self.offsets[partition] = message.offset yield message if next_offset is None: