Skip to content

Commit ee4debb

Browse files
authored
KAFKA-19128: Kafka Streams should not get offsets when close dirty (#19450)
Kafka Streams calls `prepareCommit()` in `Taskmanager#closeTaskDirty()`. However, the dirty task must not get committed and therefore, prepare-commit tasks such as getting offsets should not be needed as well. The only thing needed before closing a task dirty is flushing. Therefore, separating `flush` and `prepareCommit` could be a good fix. Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>
1 parent 36d2498 commit ee4debb

File tree

11 files changed

+107
-81
lines changed

11 files changed

+107
-81
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public boolean maybePunctuateSystemTime() {
180180
}
181181

182182
@Override
183-
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
183+
public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) {
184184
throw new UnsupportedOperationException("This task is read-only");
185185
}
186186

streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public void resume() {
179179
* or flushing state store get IO errors; such error should cause the thread to die
180180
*/
181181
@Override
182-
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
182+
public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) {
183183
switch (state()) {
184184
case CREATED:
185185
log.debug("Skipped preparing created task for commit");
@@ -189,15 +189,19 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
189189
case RUNNING:
190190
case SUSPENDED:
191191
// do not need to flush state store caches in pre-commit since nothing would be sent for standby tasks
192-
log.debug("Prepared {} task for committing", state());
192+
if (!clean) {
193+
log.debug("Skipped preparing {} standby task with id {} for commit since the task is getting closed dirty.", state(), id);
194+
} else {
195+
log.debug("Prepared {} task for committing", state());
196+
}
193197

194198
break;
195199

196200
default:
197201
throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing ");
198202
}
199203

200-
return Collections.emptyMap();
204+
return clean ? Collections.emptyMap() : null;
201205
}
202206

203207
@Override

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,6 @@ public void resume() {
417417
timeCurrentIdlingStarted = Optional.empty();
418418
}
419419

420-
421420
public void flush() {
422421
stateMgr.flushCache();
423422
recordCollector.flush();
@@ -429,7 +428,7 @@ public void flush() {
429428
* @return offsets that should be committed for this task
430429
*/
431430
@Override
432-
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
431+
public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean) {
433432
switch (state()) {
434433
case CREATED:
435434
case RESTORING:
@@ -444,6 +443,10 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
444443
//
445444
// TODO: this should be removed after we decouple caching with emitting
446445
flush();
446+
if (!clean) {
447+
log.debug("Skipped preparing {} task with id {} for commit since the task is getting closed dirty.", state(), id);
448+
return null;
449+
}
447450
hasPendingTxCommit = eosEnabled;
448451

449452
log.debug("Prepared {} task for committing", state());

streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ default boolean maybePunctuateSystemTime() {
201201
/**
202202
* @throws StreamsException fatal error, should close the thread
203203
*/
204-
Map<TopicPartition, OffsetAndMetadata> prepareCommit();
204+
Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean);
205205

206206
void postCommit(boolean enforceCheckpoint);
207207

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCo
142142
for (final Task task : tasksToCommit) {
143143
// we need to call commitNeeded first since we need to update committable offsets
144144
if (task.commitNeeded()) {
145-
final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
145+
final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit(true);
146146
if (!offsetAndMetadata.isEmpty()) {
147147
consumedOffsetsAndMetadata.put(task, offsetAndMetadata);
148148
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
278278
// we do not need to take the returned offsets since we are not going to commit anyways;
279279
// this call is only used for active tasks to flush the cache before suspending and
280280
// closing the topology
281-
task.prepareCommit();
281+
task.prepareCommit(false);
282282
} catch (final RuntimeException swallow) {
283283
log.warn("Error flushing cache for corrupted task {}. " +
284284
"Since the task is closing dirty, the following exception is swallowed: {}",
@@ -812,7 +812,7 @@ private Map<TaskId, RuntimeException> closeAndRecycleTasks(final Map<Task, Set<T
812812
// and their changelog positions should not change at all postCommit would not write the checkpoint again.
813813
// 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably
814814
// write the checkpoint file.
815-
final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
815+
final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit(true);
816816
if (!offsets.isEmpty()) {
817817
log.error("Task {} should have been committed when it was suspended, but it reports non-empty " +
818818
"offsets {} to commit; this means it failed during last commit and hence should be closed dirty",
@@ -1264,7 +1264,7 @@ private void prepareCommitAndAddOffsetsToMap(final Set<Task> tasksToPrepare,
12641264
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) {
12651265
for (final Task task : tasksToPrepare) {
12661266
try {
1267-
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
1267+
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(true);
12681268
if (!committableOffsets.isEmpty()) {
12691269
consumedOffsetsPerTask.put(task, committableOffsets);
12701270
}
@@ -1479,7 +1479,7 @@ private void closeTaskDirty(final Task task, final boolean removeFromTasksRegist
14791479
try {
14801480
// we call this function only to flush the case if necessary
14811481
// before suspending and closing the topology
1482-
task.prepareCommit();
1482+
task.prepareCommit(false);
14831483
} catch (final RuntimeException swallow) {
14841484
log.warn("Error flushing cache of dirty task {}. " +
14851485
"Since the task is closing dirty, the following exception is swallowed: {}",
@@ -1630,7 +1630,7 @@ private Collection<Task> tryCloseCleanActiveTasks(final Collection<Task> activeT
16301630
// first committing all tasks and then suspend and close them clean
16311631
for (final Task task : activeTasksToClose) {
16321632
try {
1633-
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
1633+
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(true);
16341634
tasksToCommit.add(task);
16351635
if (!committableOffsets.isEmpty()) {
16361636
consumedOffsetsAndMetadataPerTask.put(task, committableOffsets);
@@ -1719,7 +1719,7 @@ private Collection<Task> tryCloseCleanStandbyTasks(final Collection<Task> standb
17191719
// first committing and then suspend / close clean
17201720
for (final Task task : standbyTasksToClose) {
17211721
try {
1722-
task.prepareCommit();
1722+
task.prepareCommit(true);
17231723
task.postCommit(true);
17241724
task.suspend();
17251725
closeTaskClean(task);

streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public void shouldThrowIfCommittingOnIllegalState() {
213213
task.suspend();
214214
task.closeClean();
215215

216-
assertThrows(IllegalStateException.class, task::prepareCommit);
216+
assertThrows(IllegalStateException.class, () -> task.prepareCommit(true));
217217
}
218218

219219
@Test
@@ -261,13 +261,13 @@ public void shouldFlushAndCheckpointStateManagerOnCommit() {
261261

262262
task = createStandbyTask();
263263
task.initializeIfNeeded();
264-
task.prepareCommit();
264+
task.prepareCommit(true);
265265
task.postCommit(false); // this should not checkpoint
266266

267-
task.prepareCommit();
267+
task.prepareCommit(true);
268268
task.postCommit(false); // this should checkpoint
269269

270-
task.prepareCommit();
270+
task.prepareCommit(true);
271271
task.postCommit(false); // this should not checkpoint
272272

273273
verify(stateManager).checkpoint();
@@ -322,7 +322,7 @@ public void shouldSuspendAndCommitBeforeCloseClean() {
322322
task = createStandbyTask();
323323
task.initializeIfNeeded();
324324
task.suspend();
325-
task.prepareCommit();
325+
task.prepareCommit(true);
326326
task.postCommit(true);
327327
task.closeClean();
328328

@@ -360,7 +360,7 @@ public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() {
360360
// could commit if the offset advanced beyond threshold
361361
assertTrue(task.commitNeeded());
362362

363-
task.prepareCommit();
363+
task.prepareCommit(true);
364364
task.postCommit(true);
365365
}
366366

@@ -389,7 +389,7 @@ public void shouldThrowOnCloseCleanCheckpointError() {
389389
task = createStandbyTask();
390390
task.initializeIfNeeded();
391391

392-
task.prepareCommit();
392+
task.prepareCommit(true);
393393
assertThrows(RuntimeException.class, () -> task.postCommit(true));
394394

395395
assertEquals(RUNNING, task.state());

0 commit comments

Comments
 (0)