1
1
/*
2
- * Copyright 2018 the original author or authors.
2
+ * Copyright 2021 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
21
21
import org .springframework .batch .core .JobExecution ;
22
22
import org .springframework .batch .core .JobParameters ;
23
23
import org .springframework .batch .core .StepExecution ;
24
+ import org .springframework .batch .core .configuration .annotation .BatchConfigurer ;
25
+ import org .springframework .batch .core .configuration .annotation .DefaultBatchConfigurer ;
24
26
import org .springframework .batch .core .configuration .annotation .EnableBatchProcessing ;
25
27
import org .springframework .batch .core .configuration .annotation .JobBuilderFactory ;
26
28
import org .springframework .batch .core .configuration .annotation .StepBuilderFactory ;
27
29
import org .springframework .batch .core .launch .JobLauncher ;
28
30
import org .springframework .batch .core .listener .JobExecutionListenerSupport ;
29
31
import org .springframework .batch .core .step .tasklet .TaskletStep ;
30
32
import org .springframework .batch .item .ItemReader ;
31
- import org .springframework .batch .item .ItemWriter ;
32
33
import org .springframework .beans .factory .annotation .Autowired ;
33
34
import org .springframework .context .ApplicationContext ;
34
35
import org .springframework .context .annotation .AnnotationConfigApplicationContext ;
@@ -58,13 +59,14 @@ public class MultiThreadedTaskletStepIntegrationTests {
58
59
@ Test
59
60
public void testMultiThreadedTaskletExecutionWhenNoErrors () throws Exception {
60
61
// given
61
- Class [] configurationClasses = {JobConfiguration .class , TransactionManagerConfiguration .class };
62
+ Class <?> [] configurationClasses = {JobConfiguration .class , TransactionManagerConfiguration .class };
62
63
ApplicationContext context = new AnnotationConfigApplicationContext (configurationClasses );
63
64
JobLauncher jobLauncher = context .getBean (JobLauncher .class );
64
65
Job job = context .getBean (Job .class );
66
+ JobParameters jobParameters = new JobParameters ();
65
67
66
68
// when
67
- JobExecution jobExecution = jobLauncher .run (job , new JobParameters () );
69
+ JobExecution jobExecution = jobLauncher .run (job , jobParameters );
68
70
69
71
// then
70
72
assertNotNull (jobExecution );
@@ -77,13 +79,14 @@ public void testMultiThreadedTaskletExecutionWhenNoErrors() throws Exception {
77
79
@ Test
78
80
public void testMultiThreadedTaskletExecutionWhenCommitFails () throws Exception {
79
81
// given
80
- Class [] configurationClasses = {JobConfiguration .class , CommitFailingTransactionManagerConfiguration .class };
82
+ Class <?> [] configurationClasses = {JobConfiguration .class , CommitFailingTransactionManagerConfiguration .class };
81
83
ApplicationContext context = new AnnotationConfigApplicationContext (configurationClasses );
82
84
JobLauncher jobLauncher = context .getBean (JobLauncher .class );
83
85
Job job = context .getBean (Job .class );
86
+ JobParameters jobParameters = new JobParameters ();
84
87
85
88
// when
86
- JobExecution jobExecution = jobLauncher .run (job , new JobParameters () );
89
+ JobExecution jobExecution = jobLauncher .run (job , jobParameters );
87
90
88
91
// then
89
92
assertNotNull (jobExecution );
@@ -98,13 +101,14 @@ public void testMultiThreadedTaskletExecutionWhenCommitFails() throws Exception
98
101
@ Test
99
102
public void testMultiThreadedTaskletExecutionWhenRollbackFails () throws Exception {
100
103
// given
101
- Class [] configurationClasses = {JobConfiguration .class , RollbackFailingTransactionManagerConfiguration .class };
104
+ Class <?> [] configurationClasses = {JobConfiguration .class , RollbackFailingTransactionManagerConfiguration .class };
102
105
ApplicationContext context = new AnnotationConfigApplicationContext (configurationClasses );
103
106
JobLauncher jobLauncher = context .getBean (JobLauncher .class );
104
107
Job job = context .getBean (Job .class );
108
+ JobParameters jobParameters = new JobParameters ();
105
109
106
110
// when
107
- JobExecution jobExecution = jobLauncher .run (job , new JobParameters () );
111
+ JobExecution jobExecution = jobLauncher .run (job , jobParameters );
108
112
109
113
// then
110
114
assertNotNull (jobExecution );
@@ -130,7 +134,7 @@ public TaskletStep step() {
130
134
return stepBuilderFactory .get ("step" )
131
135
.<Integer , Integer >chunk (3 )
132
136
.reader (itemReader ())
133
- .writer (itemWriter () )
137
+ .writer (items -> {} )
134
138
.taskExecutor (taskExecutor ())
135
139
.build ();
136
140
}
@@ -160,8 +164,7 @@ public ThreadPoolTaskExecutor taskExecutor() {
160
164
@ Bean
161
165
public ItemReader <Integer > itemReader () {
162
166
return new ItemReader <Integer >() {
163
- private AtomicInteger atomicInteger = new AtomicInteger ();
164
-
167
+ private final AtomicInteger atomicInteger = new AtomicInteger ();
165
168
@ Override
166
169
public synchronized Integer read () {
167
170
int value = atomicInteger .incrementAndGet ();
@@ -170,11 +173,6 @@ public synchronized Integer read() {
170
173
};
171
174
}
172
175
173
- @ Bean
174
- public ItemWriter <Integer > itemWriter () {
175
- return items -> {
176
- };
177
- }
178
176
}
179
177
180
178
@ Configuration
@@ -196,8 +194,13 @@ public DataSource dataSource() {
196
194
public static class TransactionManagerConfiguration {
197
195
198
196
@ Bean
199
- public PlatformTransactionManager transactionManager (DataSource dataSource ) {
200
- return new DataSourceTransactionManager (dataSource );
197
+ public BatchConfigurer batchConfigurer (DataSource dataSource ) {
198
+ return new DefaultBatchConfigurer (dataSource ) {
199
+ @ Override
200
+ public PlatformTransactionManager getTransactionManager () {
201
+ return new DataSourceTransactionManager (dataSource );
202
+ }
203
+ };
201
204
}
202
205
203
206
}
@@ -207,14 +210,19 @@ public PlatformTransactionManager transactionManager(DataSource dataSource) {
207
210
public static class CommitFailingTransactionManagerConfiguration {
208
211
209
212
@ Bean
210
- public PlatformTransactionManager transactionManager (DataSource dataSource ) {
211
- return new DataSourceTransactionManager (dataSource ) {
213
+ public BatchConfigurer batchConfigurer (DataSource dataSource ) {
214
+ return new DefaultBatchConfigurer (dataSource ) {
212
215
@ Override
213
- protected void doCommit (DefaultTransactionStatus status ) {
214
- super .doCommit (status );
215
- if (Thread .currentThread ().getName ().equals ("spring-batch-worker-thread-2" )) {
216
- throw new RuntimeException ("Planned commit exception!" );
217
- }
216
+ public PlatformTransactionManager getTransactionManager () {
217
+ return new DataSourceTransactionManager (dataSource ) {
218
+ @ Override
219
+ protected void doCommit (DefaultTransactionStatus status ) {
220
+ super .doCommit (status );
221
+ if (Thread .currentThread ().getName ().equals ("spring-batch-worker-thread-2" )) {
222
+ throw new RuntimeException ("Planned commit exception!" );
223
+ }
224
+ }
225
+ };
218
226
}
219
227
};
220
228
}
@@ -226,22 +234,27 @@ protected void doCommit(DefaultTransactionStatus status) {
226
234
public static class RollbackFailingTransactionManagerConfiguration {
227
235
228
236
@ Bean
229
- public PlatformTransactionManager transactionManager (DataSource dataSource ) {
230
- return new DataSourceTransactionManager (dataSource ) {
237
+ public BatchConfigurer batchConfigurer (DataSource dataSource ) {
238
+ return new DefaultBatchConfigurer (dataSource ) {
231
239
@ Override
232
- protected void doCommit (DefaultTransactionStatus status ) {
233
- super .doCommit (status );
234
- if (Thread .currentThread ().getName ().equals ("spring-batch-worker-thread-2" )) {
235
- throw new RuntimeException ("Planned commit exception!" );
236
- }
237
- }
240
+ public PlatformTransactionManager getTransactionManager () {
241
+ return new DataSourceTransactionManager (dataSource ) {
242
+ @ Override
243
+ protected void doCommit (DefaultTransactionStatus status ) {
244
+ super .doCommit (status );
245
+ if (Thread .currentThread ().getName ().equals ("spring-batch-worker-thread-2" )) {
246
+ throw new RuntimeException ("Planned commit exception!" );
247
+ }
248
+ }
238
249
239
- @ Override
240
- protected void doRollback (DefaultTransactionStatus status ) {
241
- super .doRollback (status );
242
- if (Thread .currentThread ().getName ().equals ("spring-batch-worker-thread-2" )) {
243
- throw new RuntimeException ("Planned rollback exception!" );
244
- }
250
+ @ Override
251
+ protected void doRollback (DefaultTransactionStatus status ) {
252
+ super .doRollback (status );
253
+ if (Thread .currentThread ().getName ().equals ("spring-batch-worker-thread-2" )) {
254
+ throw new RuntimeException ("Planned rollback exception!" );
255
+ }
256
+ }
257
+ };
245
258
}
246
259
};
247
260
}
0 commit comments