Skip to content

Add support for configuring SimpleAsyncTaskExecutor#rejectTasksWhenLimitReached #45155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public static class Simple {
*/
private Integer concurrencyLimit;

/**
* Specify whether to reject tasks when the concurrency limit has been reached.
*/
private boolean rejectTasksWhenLimitReached;

public Integer getConcurrencyLimit() {
return this.concurrencyLimit;
}
Expand All @@ -91,6 +96,14 @@ public void setConcurrencyLimit(Integer concurrencyLimit) {
this.concurrencyLimit = concurrencyLimit;
}

public boolean isRejectTasksWhenLimitReached() {
return this.rejectTasksWhenLimitReached;
}

public void setRejectTasksWhenLimitReached(boolean rejectTasksWhenLimitReached) {
this.rejectTasksWhenLimitReached = rejectTasksWhenLimitReached;
}

}

public static class Pool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ private SimpleAsyncTaskExecutorBuilder builder() {
builder = builder.taskDecorator(this.taskDecorator.getIfUnique());
TaskExecutionProperties.Simple simple = this.properties.getSimple();
builder = builder.concurrencyLimit(simple.getConcurrencyLimit());
builder = builder.rejectTasksWhenLimitReached(simple.isRejectTasksWhenLimitReached());
TaskExecutionProperties.Shutdown shutdown = this.properties.getShutdown();
if (shutdown.isAwaitTermination()) {
builder = builder.taskTerminationTimeout(shutdown.getAwaitTerminationPeriod());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,12 @@ void simpleAsyncTaskExecutorBuilderShouldReadProperties() {
this.contextRunner
.withPropertyValues("spring.task.execution.thread-name-prefix=mytest-",
"spring.task.execution.simple.concurrency-limit=1",
"spring.task.execution.simple.reject-tasks-when-limit-reached=true",
"spring.task.execution.shutdown.await-termination=true",
"spring.task.execution.shutdown.await-termination-period=30s")
.run(assertSimpleAsyncTaskExecutor((taskExecutor) -> {
assertThat(taskExecutor.getConcurrencyLimit()).isEqualTo(1);
assertThat(taskExecutor).hasFieldOrPropertyWithValue("rejectTasksWhenLimitReached", true);
assertThat(taskExecutor.getThreadNamePrefix()).isEqualTo("mytest-");
assertThat(taskExecutor).hasFieldOrPropertyWithValue("taskTerminationTimeout", 30000L);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* @author Stephane Nicoll
* @author Filip Hrisafov
* @author Moritz Halbritter
* @author Yanming Zhou
* @since 3.2.0
*/
public class SimpleAsyncTaskExecutorBuilder {
Expand All @@ -51,22 +52,25 @@ public class SimpleAsyncTaskExecutorBuilder {

private final Integer concurrencyLimit;

private final boolean rejectTasksWhenLimitReached;

private final TaskDecorator taskDecorator;

private final Set<SimpleAsyncTaskExecutorCustomizer> customizers;

private final Duration taskTerminationTimeout;

public SimpleAsyncTaskExecutorBuilder() {
this(null, null, null, null, null, null);
this(null, null, null, false, null, null, null);
}

private SimpleAsyncTaskExecutorBuilder(Boolean virtualThreads, String threadNamePrefix, Integer concurrencyLimit,
TaskDecorator taskDecorator, Set<SimpleAsyncTaskExecutorCustomizer> customizers,
Duration taskTerminationTimeout) {
boolean rejectTasksWhenLimitReached, TaskDecorator taskDecorator,
Set<SimpleAsyncTaskExecutorCustomizer> customizers, Duration taskTerminationTimeout) {
this.virtualThreads = virtualThreads;
this.threadNamePrefix = threadNamePrefix;
this.concurrencyLimit = concurrencyLimit;
this.rejectTasksWhenLimitReached = rejectTasksWhenLimitReached;
this.taskDecorator = taskDecorator;
this.customizers = customizers;
this.taskTerminationTimeout = taskTerminationTimeout;
Expand All @@ -79,7 +83,7 @@ private SimpleAsyncTaskExecutorBuilder(Boolean virtualThreads, String threadName
*/
public SimpleAsyncTaskExecutorBuilder threadNamePrefix(String threadNamePrefix) {
return new SimpleAsyncTaskExecutorBuilder(this.virtualThreads, threadNamePrefix, this.concurrencyLimit,
this.taskDecorator, this.customizers, this.taskTerminationTimeout);
this.rejectTasksWhenLimitReached, this.taskDecorator, this.customizers, this.taskTerminationTimeout);
}

/**
Expand All @@ -89,7 +93,7 @@ public SimpleAsyncTaskExecutorBuilder threadNamePrefix(String threadNamePrefix)
*/
public SimpleAsyncTaskExecutorBuilder virtualThreads(Boolean virtualThreads) {
return new SimpleAsyncTaskExecutorBuilder(virtualThreads, this.threadNamePrefix, this.concurrencyLimit,
this.taskDecorator, this.customizers, this.taskTerminationTimeout);
this.rejectTasksWhenLimitReached, this.taskDecorator, this.customizers, this.taskTerminationTimeout);
}

/**
Expand All @@ -99,7 +103,19 @@ public SimpleAsyncTaskExecutorBuilder virtualThreads(Boolean virtualThreads) {
*/
public SimpleAsyncTaskExecutorBuilder concurrencyLimit(Integer concurrencyLimit) {
return new SimpleAsyncTaskExecutorBuilder(this.virtualThreads, this.threadNamePrefix, concurrencyLimit,
this.taskDecorator, this.customizers, this.taskTerminationTimeout);
this.rejectTasksWhenLimitReached, this.taskDecorator, this.customizers, this.taskTerminationTimeout);
}

/**
* Specify whether to reject tasks when the concurrency limit has been reached.
* @param rejectTasksWhenLimitReached whether to reject tasks when the concurrency
* limit has been reached
* @return a new builder instance
* @since 3.5.0
*/
public SimpleAsyncTaskExecutorBuilder rejectTasksWhenLimitReached(boolean rejectTasksWhenLimitReached) {
return new SimpleAsyncTaskExecutorBuilder(this.virtualThreads, this.threadNamePrefix, this.concurrencyLimit,
rejectTasksWhenLimitReached, this.taskDecorator, this.customizers, this.taskTerminationTimeout);
}

/**
Expand All @@ -109,7 +125,7 @@ public SimpleAsyncTaskExecutorBuilder concurrencyLimit(Integer concurrencyLimit)
*/
public SimpleAsyncTaskExecutorBuilder taskDecorator(TaskDecorator taskDecorator) {
return new SimpleAsyncTaskExecutorBuilder(this.virtualThreads, this.threadNamePrefix, this.concurrencyLimit,
taskDecorator, this.customizers, this.taskTerminationTimeout);
this.rejectTasksWhenLimitReached, taskDecorator, this.customizers, this.taskTerminationTimeout);
}

/**
Expand All @@ -120,7 +136,7 @@ public SimpleAsyncTaskExecutorBuilder taskDecorator(TaskDecorator taskDecorator)
*/
public SimpleAsyncTaskExecutorBuilder taskTerminationTimeout(Duration taskTerminationTimeout) {
return new SimpleAsyncTaskExecutorBuilder(this.virtualThreads, this.threadNamePrefix, this.concurrencyLimit,
this.taskDecorator, this.customizers, taskTerminationTimeout);
this.rejectTasksWhenLimitReached, this.taskDecorator, this.customizers, taskTerminationTimeout);
}

/**
Expand Down Expand Up @@ -150,7 +166,8 @@ public SimpleAsyncTaskExecutorBuilder customizers(
Iterable<? extends SimpleAsyncTaskExecutorCustomizer> customizers) {
Assert.notNull(customizers, "'customizers' must not be null");
return new SimpleAsyncTaskExecutorBuilder(this.virtualThreads, this.threadNamePrefix, this.concurrencyLimit,
this.taskDecorator, append(null, customizers), this.taskTerminationTimeout);
this.rejectTasksWhenLimitReached, this.taskDecorator, append(null, customizers),
this.taskTerminationTimeout);
}

/**
Expand Down Expand Up @@ -178,7 +195,8 @@ public SimpleAsyncTaskExecutorBuilder additionalCustomizers(
Iterable<? extends SimpleAsyncTaskExecutorCustomizer> customizers) {
Assert.notNull(customizers, "'customizers' must not be null");
return new SimpleAsyncTaskExecutorBuilder(this.virtualThreads, this.threadNamePrefix, this.concurrencyLimit,
this.taskDecorator, append(this.customizers, customizers), this.taskTerminationTimeout);
this.rejectTasksWhenLimitReached, this.taskDecorator, append(this.customizers, customizers),
this.taskTerminationTimeout);
}

/**
Expand Down Expand Up @@ -218,6 +236,7 @@ public <T extends SimpleAsyncTaskExecutor> T configure(T taskExecutor) {
map.from(this.virtualThreads).to(taskExecutor::setVirtualThreads);
map.from(this.threadNamePrefix).whenHasText().to(taskExecutor::setThreadNamePrefix);
map.from(this.concurrencyLimit).to(taskExecutor::setConcurrencyLimit);
map.from(this.rejectTasksWhenLimitReached).to(taskExecutor::setRejectTasksWhenLimitReached);
map.from(this.taskDecorator).to(taskExecutor::setTaskDecorator);
map.from(this.taskTerminationTimeout).as(Duration::toMillis).to(taskExecutor::setTaskTerminationTimeout);
if (!CollectionUtils.isEmpty(this.customizers)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* @author Stephane Nicoll
* @author Filip Hrisafov
* @author Moritz Halbritter
* @author Yanming Zhou
*/
class SimpleAsyncTaskExecutorBuilderTests {

Expand All @@ -64,6 +65,12 @@ void concurrencyLimitShouldApply() {
assertThat(executor.getConcurrencyLimit()).isEqualTo(1);
}

@Test
void rejectTasksWhenLimitReachedShouldApply() {
SimpleAsyncTaskExecutor executor = this.builder.rejectTasksWhenLimitReached(true).build();
assertThat(executor).extracting("rejectTasksWhenLimitReached").isEqualTo(true);
}

@Test
void taskDecoratorShouldApply() {
TaskDecorator taskDecorator = mock(TaskDecorator.class);
Expand Down