-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
@mumrah I took the branch based on #35 and #36 - and added support for something called drivers.
Basically, if you look at the code base, we use Queue(), Event(), sleep(), socket and multiprocessing.Process() to run the show - workers, multiple-consumers and co-ordination.
Now, Queue, Event, Pool, sleep, socket etc. are provided by multiprocessing, threading and gevent. So, the code can be used as-is (almost) for running it as different process, threads or gevent co-routines.
When SimpleConsumer or MultiConsumer is initialized, we can specify driver = thread / process / gevent and
- in SimpleConsumer - commit thread will run as a thread/process/gevent-coroutine
- In MultiConsumer, the various fetches (among which the partitions are distributed) will be run under different threads, processes or gevent co-routines.
The changes are very negligible. The only complexity is in managing the duplicate connections. When a consumer is initialized, an appropriate driver (KafkaDriver) instance is created, which contains the modules that must be used.
Most involved work was in test cases :-)
You can see the diffs here (on top of #35 and #36).
mahendra/kafka-python@mpcommit...gevent
Will send a pull request after they are merged. Later will provide driver support for Producer