Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,14 @@ def _unpack_message_set(self, tp, records):
try:
batch = records.next_batch()
while batch is not None:

# LegacyRecordBatch cannot access either base_offset or last_offset_delta
try:
self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \
batch.last_offset_delta
except AttributeError:
pass

for record in batch:
key_size = len(record.key) if record.key is not None else -1
value_size = len(record.value) if record.value is not None else -1
Expand Down Expand Up @@ -643,6 +651,17 @@ def _create_fetch_requests(self):

for partition in self._fetchable_partitions():
node_id = self._client.cluster.leader_for_partition(partition)

# advance position for any deleted compacted messages if required
if self._subscriptions.assignment[partition].last_offset_from_message_batch:
next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position:
log.debug(
"Advance position for partition %s from %s to %s (last message batch location plus one)"
" to correct for deleted compacted messages",
partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header)
self._subscriptions.assignment[partition].position = next_offset_from_batch_header

position = self._subscriptions.assignment[partition].position

# fetch if there is a leader and no in-flight requests
Expand Down
5 changes: 5 additions & 0 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ def __init__(self):
self._position = None # offset exposed to the user
self.highwater = None
self.drop_pending_message_set = False
# The last message offset hint available from a message batch with
# magic=2 which includes deleted compacted messages
self.last_offset_from_message_batch = None

def _set_position(self, offset):
assert self.has_valid_position, 'Valid position required'
Expand All @@ -396,6 +399,7 @@ def await_reset(self, strategy):
self.awaiting_reset = True
self.reset_strategy = strategy
self._position = None
self.last_offset_from_message_batch = None
self.has_valid_position = False

def seek(self, offset):
Expand All @@ -404,6 +408,7 @@ def seek(self, offset):
self.reset_strategy = None
self.has_valid_position = True
self.drop_pending_message_set = True
self.last_offset_from_message_batch = None

def pause(self):
self.paused = True
Expand Down
4 changes: 4 additions & 0 deletions kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ def crc(self):
def attributes(self):
return self._header_data[5]

@property
def last_offset_delta(self):
return self._header_data[6]

@property
def compression_type(self):
return self.attributes & self.CODEC_MASK
Expand Down