From ac66fe9159723c6707258bbb4c6727cf66a78219 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Tue, 3 Mar 2015 15:32:32 +0300 Subject: [PATCH 1/3] Fixing distribution for MP Consumer --- kafka/consumer/multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 4dc04dcde..40aecf89c 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -131,7 +131,7 @@ def __init__(self, client, group, topic, auto_commit=True, # * we have an even distribution of partitions among processes if not partitions_per_proc: partitions_per_proc = round(len(partitions) * 1.0 / num_procs) - if partitions_per_proc < num_procs * 0.5: + if partitions_per_proc * num_procs < len(partitions): partitions_per_proc += 1 # The final set of chunks From 4bab2fa5d1bc67e18b2f7791ff5fbb8e73143a5e Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Wed, 11 Mar 2015 13:51:07 +0300 Subject: [PATCH 2/3] Cleaned code for MP consumer chunking --- kafka/consumer/multiprocess.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 40aecf89c..db59f7b90 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -123,26 +123,25 @@ def __init__(self, client, group, topic, auto_commit=True, self.pause = Event() # Requests the consumers to pause fetch self.size = Value('i', 0) # Indicator of number of messages to fetch - partitions = self.offsets.keys() + partitions = list(self.offsets.keys()) - # If unspecified, start one consumer per partition + # By default, start one consumer process for all partitions # The logic below ensures that # * we do not cross the num_procs limit # * we have an even distribution of partitions among processes - if not partitions_per_proc: - partitions_per_proc = round(len(partitions) * 1.0 / num_procs) - if partitions_per_proc * num_procs < len(partitions): - partitions_per_proc += 1 + + if partitions_per_proc: + num_procs = len(partitions) / partitions_per_proc + if num_procs * partitions_per_proc < len(partitions): + num_procs += 1 # The final set of chunks - chunker = lambda *x: [] + list(x) - chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) + chunks = [partitions[proc::num_procs] for proc in range(num_procs)] self.procs = [] for chunk in chunks: - chunk = filter(lambda x: x is not None, chunk) args = (client.copy(), - group, topic, list(chunk), + group, topic, chunk, self.queue, self.start, self.exit, self.pause, self.size) From 01ea3bf968c76a5f7a1999cfca36766d9bbff5e7 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Thu, 12 Mar 2015 11:33:07 +0300 Subject: [PATCH 3/3] Used thread-safe dict.copy().keys() for MP consumer partitions --- kafka/consumer/multiprocess.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index db59f7b90..bec3100ae 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -123,7 +123,10 @@ def __init__(self, client, group, topic, auto_commit=True, self.pause = Event() # Requests the consumers to pause fetch self.size = Value('i', 0) # Indicator of number of messages to fetch - partitions = list(self.offsets.keys()) + # dict.keys() returns a view in py3 + it's not a thread-safe operation + # http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3 + # It's safer to copy dict as it only runs during the init. + partitions = list(self.offsets.copy().keys()) # By default, start one consumer process for all partitions # The logic below ensures that