Skip to content

Due to their daemonized nature, producers are never destroyed #323

@chmduquesne

Description

@chmduquesne

Hi,

I am writing a Flask app to push/pull messages to Kafka.

If you are not familiar with Flask, it is threaded by default, and objects are usually only valid during one request.

In my app:

  • a post to /<topic>/[<key>/] will push the posted content on kafka to the topic <topic> (possibly with the key <key> if provided)
  • a get on /<topic>/[<group>/] will retrieve the last 20 messages on kafka for the topic <topic> (possibly from the group <group>, otherwise with the default group name flasfka) if provided.

The relevant code is here: https://github.com/chmduquesne/kafka-http/blob/master/flasfka/api.py (this url may change, I will update the issue if it does)

This code will create a new producer for each request, but it seems like the kafka objects continue running forever even after the request is over. The problem is obvious when looking at the logs of the application. I launched it, then ran:

seq 1 20 | xargs -I % -n 1 curl -X POST --data-binary "test%" http://127.0.0.1:5000/my-topic/
watch -n 5 curl -s http://127.0.0.1:5000/my-topic/

Here is what happens when I kill the application:

 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
No handlers could be found for logger "kafka"
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:09] "POST /my-topic/ HTTP/1.1" 204 -
127.0.0.1 - - [16/Feb/2015 14:09:11] "GET /my-topic/ HTTP/1.1" 200 -
127.0.0.1 - - [16/Feb/2015 14:09:16] "GET /my-topic/ HTTP/1.1" 200 -
127.0.0.1 - - [16/Feb/2015 14:09:21] "GET /my-topic/ HTTP/1.1" 200 -
127.0.0.1 - - [16/Feb/2015 14:09:27] "GET /my-topic/ HTTP/1.1" 200 -
127.0.0.1 - - [16/Feb/2015 14:09:32] "GET /my-topic/ HTTP/1.1" 200 -
127.0.0.1 - - [16/Feb/2015 14:09:37] "GET /my-topic/ HTTP/1.1" 200 -
Process Process-20:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-19:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-18:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-17:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-16:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-15:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-14:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-13:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-12:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-11:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
Process Process-10:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-9:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-8:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-7:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-6:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-5:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
Process Process-4:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Process Process-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "build/bdist.linux-i686/egg/kafka/producer/base.py", line 52, in _send_upstream
    topic_partition, msg, key = queue.get(timeout=timeout)
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 131, in get
    if timeout < 0 or not self._poll(timeout):
KeyboardInterrupt
Exception in thread Thread-2 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
  File "/usr/lib/python2.7/threading.py", line 763, in run
  File "build/bdist.linux-i686/egg/kafka/util.py", line 133, in _timer
  File "/usr/lib/python2.7/threading.py", line 621, in wait
  File "/usr/lib/python2.7/threading.py", line 333, in wait
<type 'exceptions.TypeError'>: 'NoneType' object is not callable
Exception in thread Thread-4 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
  File "/usr/lib/python2.7/threading.py", line 763, in run
  File "build/bdist.linux-i686/egg/kafka/util.py", line 133, in _timer
  File "/usr/lib/python2.7/threading.py", line 621, in wait
  File "/usr/lib/python2.7/threading.py", line 333, in wait
<type 'exceptions.TypeError'>: 'NoneType' object is not callable
Exception in thread Thread-6 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
  File "/usr/lib/python2.7/threading.py", line 763, in run
  File "build/bdist.linux-i686/egg/kafka/util.py", line 133, in _timer
  File "/usr/lib/python2.7/threading.py", line 621, in wait
  File "/usr/lib/python2.7/threading.py", line 333, in wait
<type 'exceptions.TypeError'>: 'NoneType' object is not callable
Exception in thread Thread-1 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
  File "/usr/lib/python2.7/threading.py", line 763, in run
  File "build/bdist.linux-i686/egg/kafka/util.py", line 133, in _timer
  File "/usr/lib/python2.7/threading.py", line 621, in wait
  File "/usr/lib/python2.7/threading.py", line 333, in wait
<type 'exceptions.TypeError'>: 'NoneType' object is not callable
Exception in thread Thread-3 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
  File "/usr/lib/python2.7/threading.py", line 763, in run
  File "build/bdist.linux-i686/egg/kafka/util.py", line 133, in _timer
  File "/usr/lib/python2.7/threading.py", line 621, in wait
  File "/usr/lib/python2.7/threading.py", line 333, in wait
<type 'exceptions.TypeError'>: 'NoneType' object is not callable
Exception in thread Thread-5 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
  File "/usr/lib/python2.7/threading.py", line 763, in run
  File "build/bdist.linux-i686/egg/kafka/util.py", line 133, in _timer
  File "/usr/lib/python2.7/threading.py", line 621, in wait
  File "/usr/lib/python2.7/threading.py", line 333, in wait
<type 'exceptions.TypeError'>: 'NoneType' object is not callable

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions