Skip to content

Commit b8a1ba8

Browse files
committed
Fix unfinished step in parallel flow
1 parent 9a2b1bb commit b8a1ba8

File tree

2 files changed

+47
-3
lines changed

2 files changed

+47
-3
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/support/state/SplitState.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Arrays;
2020
import java.util.Collection;
2121
import java.util.Collections;
22+
import java.util.List;
2223
import java.util.concurrent.ExecutionException;
2324
import java.util.concurrent.Future;
2425
import java.util.concurrent.FutureTask;
@@ -119,7 +120,7 @@ public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception
119120
FlowExecutionStatus parentSplitStatus = parentSplit == null ? null : parentSplit.handle(executor);
120121

121122
Collection<FlowExecution> results = new ArrayList<>();
122-
123+
List<Exception> exceptions = new ArrayList<>();
123124
// Could use a CompletionService here?
124125
for (Future<FlowExecution> task : tasks) {
125126
try {
@@ -129,14 +130,18 @@ public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception
129130
// Unwrap the expected exceptions
130131
Throwable cause = e.getCause();
131132
if (cause instanceof Exception) {
132-
throw (Exception) cause;
133+
exceptions.add((Exception) cause);
133134
}
134135
else {
135-
throw e;
136+
exceptions.add(e);
136137
}
137138
}
138139
}
139140

141+
if (!exceptions.isEmpty()) {
142+
throw exceptions.get(0);
143+
}
144+
140145
FlowExecutionStatus flowExecutionStatus = doAggregation(results, executor);
141146
if (parentSplitStatus != null) {
142147
return Collections.max(Arrays.asList(flowExecutionStatus, parentSplitStatus));

spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
package org.springframework.batch.core.job.builder;
1717

1818
import java.util.Arrays;
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.TimeUnit;
1921

2022
import javax.sql.DataSource;
2123

2224
import static org.junit.jupiter.api.Assertions.assertEquals;
2325

26+
import org.junit.jupiter.api.Assertions;
2427
import org.junit.jupiter.api.BeforeEach;
2528
import org.junit.jupiter.api.Test;
2629
import org.springframework.batch.core.BatchStatus;
@@ -45,6 +48,8 @@
4548
import org.springframework.batch.core.step.StepSupport;
4649
import org.springframework.batch.core.step.builder.StepBuilder;
4750
import org.springframework.batch.item.support.ListItemReader;
51+
import org.springframework.batch.repeat.RepeatStatus;
52+
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
4853
import org.springframework.beans.factory.annotation.Value;
4954
import org.springframework.context.ApplicationContext;
5055
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
@@ -419,4 +424,38 @@ public JdbcTransactionManager transactionManager(DataSource dataSource) {
419424

420425
}
421426

427+
@Test
428+
public void testBuildSplitWithParallelFlow() throws InterruptedException {
429+
CountDownLatch countDownLatch = new CountDownLatch(1);
430+
Step longExecutingStep = new StepBuilder("longExecutingStep", jobRepository).tasklet((stepContribution, b) -> {
431+
Thread.sleep(500L);
432+
return RepeatStatus.FINISHED;
433+
}, new ResourcelessTransactionManager()).build();
434+
435+
Step interruptedStep = new StepBuilder("interruptedStep", jobRepository).tasklet((stepContribution, b) -> {
436+
stepContribution.getStepExecution().setTerminateOnly();
437+
return RepeatStatus.FINISHED;
438+
}, new ResourcelessTransactionManager()).build();
439+
440+
Step nonExecutableStep = new StepBuilder("nonExecutableStep", jobRepository).tasklet((stepContribution, b) -> {
441+
countDownLatch.countDown();
442+
return RepeatStatus.FINISHED;
443+
}, new ResourcelessTransactionManager()).build();
444+
445+
Flow twoStepFlow = new FlowBuilder<SimpleFlow>("twoStepFlow").start(longExecutingStep)
446+
.next(nonExecutableStep)
447+
.build();
448+
Flow interruptedFlow = new FlowBuilder<SimpleFlow>("interruptedFlow").start(interruptedStep).build();
449+
450+
Flow splitFlow = new FlowBuilder<Flow>("splitFlow").split(new SimpleAsyncTaskExecutor())
451+
.add(interruptedFlow, twoStepFlow)
452+
.build();
453+
FlowJobBuilder jobBuilder = new JobBuilder("job", jobRepository).start(splitFlow).build();
454+
jobBuilder.preventRestart().build().execute(execution);
455+
456+
boolean isExecutedNonExecutableStep = countDownLatch.await(1, TimeUnit.SECONDS);
457+
assertEquals(BatchStatus.STOPPED, execution.getStatus());
458+
Assertions.assertFalse(isExecutedNonExecutableStep);
459+
}
460+
422461
}

0 commit comments

Comments
 (0)