diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/AbstractCursorItemReader.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/AbstractCursorItemReader.java index e0660fd258..e502c6ca0c 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/AbstractCursorItemReader.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/AbstractCursorItemReader.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2013 the original author or authors. + * Copyright 2006-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -134,6 +134,9 @@ public abstract class AbstractCursorItemReader extends AbstractItemCountingIt private boolean useSharedExtendedConnection = false; + private Boolean connectionAutoCommit; + + private Boolean initialConnectionAutoCommit; public AbstractCursorItemReader() { super(); @@ -356,6 +359,25 @@ public boolean isUseSharedExtendedConnection() { return useSharedExtendedConnection; } + /** + * Set whether "autoCommit" should be overridden for the connection used by the cursor. If not set, defaults to + * Connection / Datasource default configuration. + * + * @param autoCommit value used for {@link Connection#setAutoCommit(boolean)}. + */ + public void setConnectionAutoCommit(boolean autoCommit) { + this.connectionAutoCommit = autoCommit; + } + + /** + * Return whether "autoCommit" should be overridden for the connection used by the cursor. + * + * @return the "autoCommit" value, or {@code null} if none to be applied. + */ + public Boolean getConnectionAutoCommit() { + return this.connectionAutoCommit; + } + public abstract String getSql(); /** @@ -380,6 +402,9 @@ protected void doClose() throws Exception { JdbcUtils.closeResultSet(this.rs); rs = null; cleanupOnClose(); + if (this.initialConnectionAutoCommit != null) { + this.con.setAutoCommit(initialConnectionAutoCommit); + } if (useSharedExtendedConnection && dataSource instanceof ExtendedConnectionDataSourceProxy) { ((ExtendedConnectionDataSourceProxy)dataSource).stopCloseSuppression(this.con); if (!TransactionSynchronizationManager.isActualTransactionActive()) { @@ -424,6 +449,10 @@ protected void initializeConnection() { else { this.con = dataSource.getConnection(); } + if (this.connectionAutoCommit != null && this.con.getAutoCommit() != this.connectionAutoCommit) { + this.initialConnectionAutoCommit = this.con.getAutoCommit(); + this.con.setAutoCommit(this.connectionAutoCommit); + } } catch (SQLException se) { close(); diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/database/JdbcCursorItemReaderConfigTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/database/JdbcCursorItemReaderConfigTests.java index a0dfc7bf28..91e352eb97 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/database/JdbcCursorItemReaderConfigTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/database/JdbcCursorItemReaderConfigTests.java @@ -15,7 +15,11 @@ */ package org.springframework.batch.item.database; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.sql.Connection; @@ -25,8 +29,9 @@ import javax.sql.DataSource; import org.junit.Test; -import org.junit.runners.JUnit4; import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; import org.springframework.batch.item.ExecutionContext; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; @@ -100,4 +105,39 @@ public Void doInTransaction(TransactionStatus status) { }); } + @Test + public void testOverrideConnectionAutoCommit() throws Exception { + boolean initialAutoCommit= false; + boolean neededAutoCommit = true; + + DataSource ds = mock(DataSource.class); + Connection con = mock(Connection.class); + when(con.getAutoCommit()).thenReturn(initialAutoCommit); + PreparedStatement ps = mock(PreparedStatement.class); + when(con.prepareStatement("select foo from bar", ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY)).thenReturn(ps); + when(ds.getConnection()).thenReturn(con); + + final JdbcCursorItemReader reader = new JdbcCursorItemReader(); + reader.setDataSource(ds); + reader.setSql("select foo from bar"); + reader.setConnectionAutoCommit(neededAutoCommit); + + // Check "open" outside of a transaction (see AbstractStep#execute()) + final ExecutionContext ec = new ExecutionContext(); + reader.open(ec); + + ArgumentCaptor autoCommitCaptor = ArgumentCaptor.forClass(Boolean.class); + verify(con, times(1)).setAutoCommit(autoCommitCaptor.capture()); + assertEquals(neededAutoCommit, autoCommitCaptor.getValue()); + + reset(con); + reader.close(); + + // Check restored autocommit value + autoCommitCaptor = ArgumentCaptor.forClass(Boolean.class); + verify(con, times(1)).setAutoCommit(autoCommitCaptor.capture()); + assertEquals(initialAutoCommit, autoCommitCaptor.getValue()); + } + }