Skip to content

GH-4708 : in multi-threaded step, enforce the first chunk to be written by first thread #4710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.batch.repeat.support;

import java.util.concurrent.CountDownLatch;

import org.springframework.batch.repeat.RepeatCallback;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.RepeatException;
Expand Down Expand Up @@ -59,6 +61,12 @@ public class TaskExecutorRepeatTemplate extends RepeatTemplate {

private TaskExecutor taskExecutor = new SyncTaskExecutor();

/**
* A latch to ensure to manage the first chunk by the the first thread. This is
* specifically required to manage data with record separators like JSON.
*/
private final CountDownLatch latch = new CountDownLatch(1);

/**
* Public setter for the throttle limit. The throttle limit is the largest number of
* concurrent tasks that can be executing at one time - if a new task arrives and the
Expand Down Expand Up @@ -110,7 +118,7 @@ protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callb
* Wrap the callback in a runnable that will add its result to the queue when
* it is ready.
*/
runnable = new ExecutingRunnable(callback, context, queue);
runnable = new ExecutingRunnable(callback, context, queue, latch);

/*
* Tell the runnable that it can expect a result. This could have been
Expand All @@ -130,6 +138,13 @@ protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callb
*/
update(context);

/*
* Wait for the first chunk to be managed before to create other threads. This
* will ensure to correctly write first data chunk with record separators like
* JSON.
*/
latch.await();

/*
* Keep going until we get a result that is finished, or early termination...
*/
Expand Down Expand Up @@ -216,14 +231,17 @@ private class ExecutingRunnable implements Runnable, ResultHolder {

private volatile Throwable error;

public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue) {
private CountDownLatch latch;

public ExecutingRunnable(RepeatCallback callback, RepeatContext context, ResultQueue<ResultHolder> queue,
CountDownLatch latch) {

super();

this.callback = callback;
this.context = context;
this.queue = queue;

this.latch = latch;
}

/**
Expand Down Expand Up @@ -272,6 +290,11 @@ public void run() {

queue.put(this);

/*
* If this is the first chunk, then release the latch so that other
* threads can be created.
*/
this.latch.countDown();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2023 the original author or authors.
* Copyright 2006-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.batch.repeat.support;

import java.util.ArrayList;

import org.junit.jupiter.api.BeforeEach;

import org.springframework.batch.item.Chunk;
Expand All @@ -42,12 +44,16 @@ abstract class AbstractTradeBatchTests {

Resource resource = new ClassPathResource("trades.csv", getClass());

protected TradeWriter processor = new TradeWriter();
protected TradeWriter processor;

protected TradeItemReader provider;

protected ArrayList<Trade> output;

@BeforeEach
void setUp() throws Exception {
output = new ArrayList<>();
processor = new TradeWriter(output);
provider = new TradeItemReader(resource);
provider.open(new ExecutionContext());
}
Expand Down Expand Up @@ -79,10 +85,17 @@ protected static class TradeWriter implements ItemWriter<Trade> {

int count = 0;

private ArrayList<Trade> out;

public TradeWriter(ArrayList<Trade> out) {
this.out = out;
}

// This has to be synchronized because we are going to test the state
// (count) at the end of a concurrent batch run.
@Override
public synchronized void write(Chunk<? extends Trade> data) {
out.addAll(data.getItems());
count++;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.repeat.support;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* Tests for concurrent behaviour in repeat template, dedicated to the first chunk, that
* must be managed first when output format has separator between items, like JSON.
*
* @author Gerald Lelarge
*
*/
class TaskExecutorRepeatTemplateFirstChunkTests extends AbstractTradeBatchTests {

private TaskExecutorRepeatTemplate template;

private int chunkSize = 5;

private final ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();

@BeforeEach
void setUp() throws Exception {

super.setUp();

threadPool.setMaxPoolSize(10);
threadPool.setCorePoolSize(10);
threadPool.setQueueCapacity(0);
threadPool.afterPropertiesSet();

template = new TaskExecutorRepeatTemplate();
template.setTaskExecutor(threadPool);
// Limit the number of threads to 2
template.setThrottleLimit(2);
// Limit the number of items to read to be able to test the second item from the
// output. If the chunkSize is greater than 2, the test could fail.
template.setCompletionPolicy(new SimpleCompletionPolicy(chunkSize));
}

@AfterEach
void tearDown() {
threadPool.destroy();
}

/**
* Test method for {@link TaskExecutorRepeatTemplate#iterate(RepeatCallback)}. Repeat
* the tests 20 times to increase the probability of detecting a concurrency.
*/
@Test
@RepeatedTest(value = 20)
void testExecute() {

// given
template.iterate(new ItemReaderRepeatCallback<>(provider, processor));

// then
// The first element is the first item of the input trades.csv.
assertEquals("UK21341EAH45", output.get(0).getIsin());
// The others can have different orders.
for (int i = 1; i < output.size(); i++) {
assertNotEquals("UK21341EAH45", output.get(i).getIsin());
}
assertEquals(chunkSize, processor.count);
}

}