Description
I have a consumer reading a single partition. The code is:
for message in self.consumer:
result = make_love(message) # up to 10 seconds; it's a long time, believe me!
make_baby(result)
try:
self.consumer.commit()
except IllegalGenerationError:
logging.error('Illegal generation caught! Skipping it, huh')
As far as I know, it goes as follows:
- Consumer picks the next message from the queue.
- Heavy processing routine is executed.
- GroupCoordintor performs rebalancing, thus incrementing the current generation number.
- A new object is created using
result
as an argument. - Consumer commits the offset of the successfully processed message.
- GroupCoordinator responses with
IllegalGenerationError
because while consumer has been processing the task, actual generation number has incremented so that the consumer becomes illegal. - Consumer reconnects, fetches the very message it has processed already (but failed to commit), resulting in spawning twin objects (undesired behaviour).
Meanwhile, the docs say the following:
The consumer connects to the co-ordinator and sends a HeartbeatRequest. If an IllegalGeneration error code is returned in the HeartbeatResponse, it indicates that the co-ordinator has initiated a rebalance. The consumer then stops fetching data, commits offsets and sends a JoinGroupRequest to it's co-ordinator broker. In the JoinGroupResponse, it receives the list of topic partitions that it should own and the new generation id for it's group. At this time, group management is done and the consumer starts fetching data and (optionally) committing offsets for the list of partitions it owns.
Of course, I can commit()
at first and make_baby()
thereafter, but in case of make_baby()
failure it would be at-most-once processing, so it's an undesired behavior, too.
The question is: should the consumer be able to successfully commit its offsets if it gets an IllegalGenerationError
? If not, how can I perform the exactly-once semantics?