diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/support/state/SplitState.java b/spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/support/state/SplitState.java index 1caef3c1a0..795cd42d21 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/support/state/SplitState.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/job/flow/support/state/SplitState.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; @@ -119,7 +120,7 @@ public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception FlowExecutionStatus parentSplitStatus = parentSplit == null ? null : parentSplit.handle(executor); Collection results = new ArrayList<>(); - + List exceptions = new ArrayList<>(); // Could use a CompletionService here? for (Future task : tasks) { try { @@ -129,14 +130,18 @@ public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception // Unwrap the expected exceptions Throwable cause = e.getCause(); if (cause instanceof Exception) { - throw (Exception) cause; + exceptions.add((Exception) cause); } else { - throw e; + exceptions.add(e); } } } + if (!exceptions.isEmpty()) { + throw exceptions.get(0); + } + FlowExecutionStatus flowExecutionStatus = doAggregation(results, executor); if (parentSplitStatus != null) { return Collections.max(Arrays.asList(flowExecutionStatus, parentSplitStatus)); diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java index dcff2e0eb3..8a9fc00233 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java @@ -16,11 +16,14 @@ package org.springframework.batch.core.job.builder; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.sql.DataSource; import static org.junit.jupiter.api.Assertions.assertEquals; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.batch.core.BatchStatus; @@ -45,6 +48,8 @@ import org.springframework.batch.core.step.StepSupport; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.item.support.ListItemReader; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.batch.support.transaction.ResourcelessTransactionManager; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -419,4 +424,38 @@ public JdbcTransactionManager transactionManager(DataSource dataSource) { } + @Test + public void testBuildSplitWithParallelFlow() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + Step longExecutingStep = new StepBuilder("longExecutingStep", jobRepository).tasklet((stepContribution, b) -> { + Thread.sleep(500L); + return RepeatStatus.FINISHED; + }, new ResourcelessTransactionManager()).build(); + + Step interruptedStep = new StepBuilder("interruptedStep", jobRepository).tasklet((stepContribution, b) -> { + stepContribution.getStepExecution().setTerminateOnly(); + return RepeatStatus.FINISHED; + }, new ResourcelessTransactionManager()).build(); + + Step nonExecutableStep = new StepBuilder("nonExecutableStep", jobRepository).tasklet((stepContribution, b) -> { + countDownLatch.countDown(); + return RepeatStatus.FINISHED; + }, new ResourcelessTransactionManager()).build(); + + Flow twoStepFlow = new FlowBuilder("twoStepFlow").start(longExecutingStep) + .next(nonExecutableStep) + .build(); + Flow interruptedFlow = new FlowBuilder("interruptedFlow").start(interruptedStep).build(); + + Flow splitFlow = new FlowBuilder("splitFlow").split(new SimpleAsyncTaskExecutor()) + .add(interruptedFlow, twoStepFlow) + .build(); + FlowJobBuilder jobBuilder = new JobBuilder("job", jobRepository).start(splitFlow).build(); + jobBuilder.preventRestart().build().execute(execution); + + boolean isExecutedNonExecutableStep = countDownLatch.await(1, TimeUnit.SECONDS); + assertEquals(BatchStatus.STOPPED, execution.getStatus()); + Assertions.assertFalse(isExecutedNonExecutableStep); + } + }