Skip to content

Kafka python graceful shutdown of consumer #2042

@savitha-suresh

Description

@savitha-suresh

I am trying to gracefully shutdown a kafka consumer, but the script blocks with Stopping HeartBeat thread. How can i gracefully close the consumer on a SIGTERM with kafka-python. This is what i have done

import logger as logging
import time
import sys
from kafka import KafkaConsumer
import numpy as np
import signal


log = logging.getLogger(__name__)


class Cons:
    def __init__(self):

        signal.signal(signal.SIGINT, self.sigterm_handler)
        signal.signal(signal.SIGTERM, self.sigterm_handler)
        self.consumer = KafkaConsumer('dummy-topic', group_id='poll-test', bootstrap_servers=['b1'])

    def sigterm_handler(self, signum, frame):
        log.info("Sigterm handler")
        self.consumer.close(autocommit=False)
        sys.exit(0)



    def consume(self):
        try:
            while True:
                records = self.consumer.poll(timeout_ms=500, max_records=500)
                for topic_partition, consumer_records in records.items():
                    for record in consumer_records:
                        log.info("Got Record - {}".format(record))
                    #code to manually commit


        except ValueError as e:
            log.exception("exception")


if __name__ == '__main__':
    c=Cons()
    c.consume()

With debug logs enabled, this is the output i get and the code gets blocked on this.

^C2020-04-28 07:18:33,050 - MainThread - __main__ - INFO - Sigterm handler
2020-04-28 07:18:33,050 - MainThread - kafka.consumer.group - DEBUG - Closing the KafkaConsumer.
2020-04-28 07:18:33,051 - MainThread - kafka.coordinator - INFO - Stopping heartbeat thread

What is the reason behind this? and what is right way to close a consumer on SIGTERM or SIGINT?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions