Skip to content

Commit 274cad0

Browse files
committed
Fix metrics collection in FaultTolerantChunkProcessor
Before this commit, metrics were not collected in a fault-tolerant step. This commit updates the FaultTolerantChunkProcessor to collect metrics. For the record, chunk scanning is not covered for two reasons: 1. When scanning a chunk, there is a single item in each write operation, so it would be incorrect to report a metric called "chunk.write" for a single item. We could argue that it is a singleton chunk, but still.. If we want to time scanned (aka individual) items, we need a more fine grained timer called "scanned.item.write" for example. 2. The end result can be confusing and might distort the overall metrics view in case of errors (because of the noisy metrics of additional transactions for individual items). As a reminder, the goal of the "chunk.write" metric is to give an overview of the write operation time of the whole chunk and not to time each item individually (this could be done using an `ItemWriteListener` if needed). Resolves #3664
1 parent 0a0a2ec commit 274cad0

File tree

4 files changed

+93
-23
lines changed

4 files changed

+93
-23
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2019 the original author or authors.
2+
* Copyright 2006-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,11 +23,15 @@
2323
import java.util.concurrent.atomic.AtomicInteger;
2424
import java.util.concurrent.atomic.AtomicReference;
2525

26+
import io.micrometer.core.instrument.Tag;
27+
import io.micrometer.core.instrument.Timer;
2628
import org.apache.commons.logging.Log;
2729
import org.apache.commons.logging.LogFactory;
2830

2931
import org.springframework.batch.core.StepContribution;
32+
import org.springframework.batch.core.StepExecution;
3033
import org.springframework.batch.core.listener.StepListenerFailedException;
34+
import org.springframework.batch.core.metrics.BatchMetrics;
3135
import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
3236
import org.springframework.batch.core.step.skip.NonSkippableProcessException;
3337
import org.springframework.batch.core.step.skip.SkipLimitExceededException;
@@ -222,6 +226,8 @@ protected Chunk<O> transform(final StepContribution contribution, Chunk<I> input
222226

223227
@Override
224228
public O doWithRetry(RetryContext context) throws Exception {
229+
Timer.Sample sample = BatchMetrics.createTimerSample();
230+
String status = BatchMetrics.STATUS_SUCCESS;
225231
O output = null;
226232
try {
227233
count.incrementAndGet();
@@ -239,6 +245,7 @@ public O doWithRetry(RetryContext context) throws Exception {
239245
}
240246
}
241247
catch (Exception e) {
248+
status = BatchMetrics.STATUS_FAILURE;
242249
if (rollbackClassifier.classify(e)) {
243250
// Default is to rollback unless the classifier
244251
// allows us to continue
@@ -262,6 +269,9 @@ else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount()))
262269
e);
263270
}
264271
}
272+
finally {
273+
stopTimer(sample, contribution.getStepExecution(), "item.process", status, "Item processing");
274+
}
265275
if (output == null) {
266276
// No need to re-process filtered items
267277
iterator.remove();
@@ -332,10 +342,13 @@ public Object doWithRetry(RetryContext context) throws Exception {
332342

333343
if (!data.scanning()) {
334344
chunkMonitor.setChunkSize(inputs.size());
345+
Timer.Sample sample = BatchMetrics.createTimerSample();
346+
String status = BatchMetrics.STATUS_SUCCESS;
335347
try {
336348
doWrite(outputs.getItems());
337349
}
338350
catch (Exception e) {
351+
status = BatchMetrics.STATUS_FAILURE;
339352
if (rollbackClassifier.classify(e)) {
340353
throw e;
341354
}
@@ -348,6 +361,9 @@ public Object doWithRetry(RetryContext context) throws Exception {
348361
throw new ForceRollbackForWriteSkipException(
349362
"Force rollback on skippable exception so that skipped item can be located.", e);
350363
}
364+
finally {
365+
stopTimer(sample, contribution.getStepExecution(), "chunk.write", status, "Chunk writing");
366+
}
351367
contribution.incrementWriteCount(outputs.size());
352368
}
353369
else {

spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) thr
340340
return outputs;
341341
}
342342

343-
private void stopTimer(Timer.Sample sample, StepExecution stepExecution, String metricName, String status, String description) {
343+
protected void stopTimer(Timer.Sample sample, StepExecution stepExecution, String metricName, String status, String description) {
344344
sample.stop(BatchMetrics.createTimer(metricName, description + " duration",
345345
Tag.of("job.name", stepExecution.getJobExecution().getJobInstance().getJobName()),
346346
Tag.of("step.name", stepExecution.getStepName()),

spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessorTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.junit.Before;
2828
import org.junit.Test;
2929
import org.springframework.batch.core.JobExecution;
30+
import org.springframework.batch.core.JobInstance;
31+
import org.springframework.batch.core.JobParameters;
3032
import org.springframework.batch.core.StepContribution;
3133
import org.springframework.batch.core.StepExecution;
3234
import org.springframework.batch.core.listener.ItemListenerSupport;
@@ -55,7 +57,7 @@ public class FaultTolerantChunkProcessorTests {
5557
private FaultTolerantChunkProcessor<String, String> processor;
5658

5759
private StepContribution contribution = new StepExecution("foo",
58-
new JobExecution(0L)).createStepContribution();
60+
new JobExecution(new JobInstance(0L, "job"), new JobParameters())).createStepContribution();
5961

6062
@Before
6163
public void setUp() {

spring-batch-samples/src/test/java/org/springframework/batch/sample/metrics/BatchMetricsTests.java

Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,8 +37,6 @@
3737
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
3838
import org.springframework.batch.core.launch.JobLauncher;
3939
import org.springframework.batch.core.metrics.BatchMetrics;
40-
import org.springframework.batch.item.ItemReader;
41-
import org.springframework.batch.item.ItemWriter;
4240
import org.springframework.batch.item.support.ListItemReader;
4341
import org.springframework.batch.repeat.RepeatStatus;
4442
import org.springframework.context.ApplicationContext;
@@ -51,9 +49,12 @@
5149
import static org.junit.Assert.assertTrue;
5250
import static org.junit.Assert.fail;
5351

52+
/**
53+
* @author Mahmoud Ben Hassine
54+
*/
5455
public class BatchMetricsTests {
5556

56-
private static final int EXPECTED_SPRING_BATCH_METRICS = 6;
57+
private static final int EXPECTED_SPRING_BATCH_METRICS = 10;
5758

5859
@Test
5960
public void testCalculateDuration() {
@@ -147,6 +148,8 @@ public void testBatchMetrics() throws Exception {
147148
List<Meter> meters = Metrics.globalRegistry.getMeters();
148149
assertTrue(meters.size() >= EXPECTED_SPRING_BATCH_METRICS);
149150

151+
// Job metrics
152+
150153
try {
151154
Metrics.globalRegistry.get("spring.batch.job")
152155
.tag("name", "job")
@@ -164,6 +167,8 @@ public void testBatchMetrics() throws Exception {
164167
fail("There should be a meter of type LONG_TASK_TIMER named spring.batch.job.active" +
165168
" registered in the global registry: " + e.getMessage());
166169
}
170+
171+
// Step 1 (tasklet) metrics
167172

168173
try {
169174
Metrics.globalRegistry.get("spring.batch.step")
@@ -175,6 +180,8 @@ public void testBatchMetrics() throws Exception {
175180
fail("There should be a meter of type TIMER named spring.batch.step" +
176181
" registered in the global registry: " + e.getMessage());
177182
}
183+
184+
// Step 2 (simple chunk-oriented) metrics
178185

179186
try {
180187
Metrics.globalRegistry.get("spring.batch.step")
@@ -219,6 +226,52 @@ public void testBatchMetrics() throws Exception {
219226
fail("There should be a meter of type TIMER named spring.batch.chunk.write" +
220227
" registered in the global registry: " + e.getMessage());
221228
}
229+
230+
// Step 3 (fault-tolerant chunk-oriented) metrics
231+
232+
try {
233+
Metrics.globalRegistry.get("spring.batch.step")
234+
.tag("name", "step3")
235+
.tag("job.name", "job")
236+
.tag("status", "COMPLETED")
237+
.timer();
238+
} catch (Exception e) {
239+
fail("There should be a meter of type TIMER named spring.batch.step" +
240+
" registered in the global registry: " + e.getMessage());
241+
}
242+
243+
try {
244+
Metrics.globalRegistry.get("spring.batch.item.read")
245+
.tag("job.name", "job")
246+
.tag("step.name", "step3")
247+
.tag("status", "SUCCESS")
248+
.timer();
249+
} catch (Exception e) {
250+
fail("There should be a meter of type TIMER named spring.batch.item.read" +
251+
" registered in the global registry: " + e.getMessage());
252+
}
253+
254+
try {
255+
Metrics.globalRegistry.get("spring.batch.item.process")
256+
.tag("job.name", "job")
257+
.tag("step.name", "step3")
258+
.tag("status", "SUCCESS")
259+
.timer();
260+
} catch (Exception e) {
261+
fail("There should be a meter of type TIMER named spring.batch.item.process" +
262+
" registered in the global registry: " + e.getMessage());
263+
}
264+
265+
try {
266+
Metrics.globalRegistry.get("spring.batch.chunk.write")
267+
.tag("job.name", "job")
268+
.tag("step.name", "step3")
269+
.tag("status", "SUCCESS")
270+
.timer();
271+
} catch (Exception e) {
272+
fail("There should be a meter of type TIMER named spring.batch.chunk.write" +
273+
" registered in the global registry: " + e.getMessage());
274+
}
222275
}
223276

224277
@Configuration
@@ -241,25 +294,23 @@ public Step step1() {
241294
}
242295

243296
@Bean
244-
public ItemReader<Integer> itemReader() {
245-
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
246-
}
247-
248-
@Bean
249-
public ItemWriter<Integer> itemWriter() {
250-
return items -> {
251-
for (Integer item : items) {
252-
System.out.println("item = " + item);
253-
}
254-
};
297+
public Step step2() {
298+
return stepBuilderFactory.get("step2")
299+
.<Integer, Integer>chunk(2)
300+
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5)))
301+
.writer(items -> items.forEach(System.out::println))
302+
.build();
255303
}
256304

257305
@Bean
258-
public Step step2() {
259-
return stepBuilderFactory.get("step2")
260-
.<Integer, Integer>chunk(5)
261-
.reader(itemReader())
262-
.writer(itemWriter())
306+
public Step step3() {
307+
return stepBuilderFactory.get("step3")
308+
.<Integer, Integer>chunk(2)
309+
.reader(new ListItemReader<>(Arrays.asList(6, 7, 8, 9, 10)))
310+
.writer(items -> items.forEach(System.out::println))
311+
.faultTolerant()
312+
.skip(Exception.class)
313+
.skipLimit(3)
263314
.build();
264315
}
265316

@@ -268,6 +319,7 @@ public Job job() {
268319
return jobBuilderFactory.get("job")
269320
.start(step1())
270321
.next(step2())
322+
.next(step3())
271323
.build();
272324
}
273325
}

0 commit comments

Comments
 (0)