Skip to content

JpaPagingItemReader causes the program to enter loop [BATCH-2724] #880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
spring-projects-issues opened this issue May 22, 2018 · 11 comments
Labels
has: backports Legacy label from JIRA. Superseded by "for: backport-to-x.x.x" in: infrastructure type: enhancement
Milestone

Comments

@spring-projects-issues
Copy link
Collaborator

ckl opened BATCH-2724 and commented

The data reader reported an error, causing the read method of FaultTolerantChunkProvider to enter a dead loop, and reported the following exception information. Because the exception was printed at the debug level, the production environment could hardly find this problem.

java.lang.IllegalStateException: Transaction already active
at org.hibernate.jpa.internal.TransactionImpl.begin(TransactionImpl.java:42) at org.springframework.batch.item.database.JpaPagingItemReader.doReadPage(JpaPagingItemReader.java:197) at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:108) at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) at org.springframework.batch.item.support.SynchronizedItemStreamReader.read(SynchronizedItemStreamReader.java:55) at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133) at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) at com.sun.proxy.$Proxy126.read(Unknown Source) at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91) at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87) at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:116) at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:110) at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:272) at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)


Affects: 3.0.8

Reference URL: http://www.kailing.pub/article/index/arcid/201.html

Attachments:

Referenced from: pull request #620, and commits 7ce3408

Backported to: 4.1.0.M3

@spring-projects-issues
Copy link
Collaborator Author

Michael Minella commented

Please provide your configuration.  I don't believe your transactions are configured correctly.  This sounds like you are using the normal DataSourceTransactionManager instead of the JpaTransactionManager since Hibernate is trying to create a new transaction (one should already be in effect at that point).

@spring-projects-issues
Copy link
Collaborator Author

ckl commented

The spring boot used by the project is configured as follows:

@SpringBootApplication(scanBasePackages = {"com.yudianbank"})
@EnableAspectJAutoProxy
@EnableBatchProcessing
@EnableApolloConfig
@EnableJpaAuditing
public class BatchApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication application = new SpringApplication(BatchApplication.class);
        application.setWebEnvironment(false);
        application.run(args);
    }

}

application.properties

#datasource
spring.datasource.url = jdbc:mysql://10.17.199.122:3306/ams2?autoReconnect=true&useUnicode=true&characterEncoding=utf-8
spring.datasource.password = 763
spring.datasource.username = it_ams

#jpa
spring.jpa.database = MYSQL
spring.jpa.hibernate.ddl-auto = update
spring.jpa.show-sql = false
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
spring.jpa.hibernate.naming-strategy = org.hibernate.cfg.ImprovedNamingStrategy

Determine the project to use the jpa transaction manager

@spring-projects-issues
Copy link
Collaborator Author

Michael Minella commented

Based on your above configuration, Spring Batch wouldn't be using the JpaTransactionManager.  You need to create a custom BatchConfigurer to use it.  You can read more about the BatchConfigurer in the documentation here: https://docs.spring.io/spring-batch/4.0.x/reference/html/job.html#javaConfig  Also, going forward, questions like this should be posted in StackOverflow and tagged with spring-batch where we can share the responses with the community.

@spring-projects-issues
Copy link
Collaborator Author

ckl commented

This place is not a matter of a transaction and it looks like when the data fails to be read, the commit or rollback of the transaction is not properly handled. Because as long as the data source data is no problem, it will not throw this exception. The exception is triggered only when READ_SKIP_COUNT is counted, as shown in the figure.
!QQ图片20180524125938.png|thumbnail!

My environment is Partitioning Step, multi-threaded

@spring-projects-issues
Copy link
Collaborator Author

Michael Minella commented

Until we can see your job's configuration, there really isn't anything else we can do.  You just provided your properties and your Spring Boot main class.  We need to see the job and any BatchConfigurer you are using.

@spring-projects-issues
Copy link
Collaborator Author

ckl commented

I did not show the declaration BatchConfigurer. I used the DefaultBatchConfigurer in the spring batch core. The specific information is as follows:

@Component
public class DefaultBatchConfigurer implements BatchConfigurer {
	private static final Log logger = LogFactory.getLog(DefaultBatchConfigurer.class);

	private DataSource dataSource;
	private PlatformTransactionManager transactionManager;
	private JobRepository jobRepository;
	private JobLauncher jobLauncher;
	private JobExplorer jobExplorer;

	@Autowired(required = false)
	public void setDataSource(DataSource dataSource) {
		this.dataSource = dataSource;
		this.transactionManager = new DataSourceTransactionManager(dataSource);
	}

	protected DefaultBatchConfigurer() {}

	public DefaultBatchConfigurer(DataSource dataSource) {
		setDataSource(dataSource);
	}

	@Override
	public JobRepository getJobRepository() {
		return jobRepository;
	}

	@Override
	public PlatformTransactionManager getTransactionManager() {
		return transactionManager;
	}

	@Override
	public JobLauncher getJobLauncher() {
		return jobLauncher;
	}

	@Override
	public JobExplorer getJobExplorer() {
		return jobExplorer;
	}

	@PostConstruct
	public void initialize() {
		try {
			if(dataSource == null) {
				logger.warn("No datasource was provided...using a Map based JobRepository");

				if(this.transactionManager == null) {
					this.transactionManager = new ResourcelessTransactionManager();
				}

				MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean(this.transactionManager);
				jobRepositoryFactory.afterPropertiesSet();
				this.jobRepository = jobRepositoryFactory.getObject();

				MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(jobRepositoryFactory);
				jobExplorerFactory.afterPropertiesSet();
				this.jobExplorer = jobExplorerFactory.getObject();
			} else {
				this.jobRepository = createJobRepository();

				JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
				jobExplorerFactoryBean.setDataSource(this.dataSource);
				jobExplorerFactoryBean.afterPropertiesSet();
				this.jobExplorer = jobExplorerFactoryBean.getObject();
			}

			this.jobLauncher = createJobLauncher();
		} catch (Exception e) {
			throw new BatchConfigurationException(e);
		}
	}

	protected JobLauncher createJobLauncher() throws Exception {
		SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
		jobLauncher.setJobRepository(jobRepository);
		jobLauncher.afterPropertiesSet();
		return jobLauncher;
	}

	protected JobRepository createJobRepository() throws Exception {
		JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
		factory.setDataSource(dataSource);
		factory.setTransactionManager(transactionManager);
		factory.afterPropertiesSet();
		return  factory.getObject();
	}
}

Is there any problem with this configuration?

@spring-projects-issues
Copy link
Collaborator Author

Michael Minella commented

Yes.  As I stated earlier, the DataSourceTransactionManager will not work with JPA. You must create your own BatchConfigurer implementation that uses the JpaTransactioinManager.  We provide an example of this in Spring Cloud Task.  We should do the same in Spring Batch.  I'm re-opening this as a documentation feature to address improving our documentation around this.

@spring-projects-issues
Copy link
Collaborator Author

ckl commented

As described above, this exception is not inevitable. I did not configure the JpaTransactioinManager. I wouldn't throw this exception when reading data without problems, so I suspect that the exception was not caused by not configuring the JpaTransactioinManager.
My current solution is to customize the jpa reader to remove the transaction part of the code, because my program does not need to free state jpa object, so that my program will not be a problem. Is this practice feasible?

@spring-projects-issues
Copy link
Collaborator Author

Michael Minella commented

Without providing your job definition, there is nothing else we can do.  From what you have provided to us, there is nothing to indicate that this error is caused by anything other than what I have laid out.

@spring-projects-issues
Copy link
Collaborator Author

ckl commented

Thank you very much for all your replies. This problem has been solved locally through a custom jpa reader. Reading out here also hopes not to confuse others. It may be caused by our environmental problems. Thank you again for your reply
My CustomJpaPagingItemReader looks like this:

/**
 * Created by kl on 2018/5/17.
 * Content :自定义JPA数据读取器,去掉事务
 */
public class CustomJpaPagingItemReader<T> extends AbstractPagingItemReader<T> {

    private EntityManagerFactory entityManagerFactory;

    private EntityManager entityManager;

    private final Map<String, Object> jpaPropertyMap = new HashMap<String, Object>();
    private String queryString;

    private JpaQueryProvider queryProvider;

    private Map<String, Object> parameterValues;

    /**
     * Create a query using an appropriate query provider (entityManager OR
     * queryProvider).
     */
    private Query createQuery() {
        if (queryProvider == null) {
            return entityManager.createQuery(queryString);
        } else {
            return queryProvider.createQuery();
        }
    }

    public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
        this.entityManagerFactory = entityManagerFactory;
    }

    /**
     * The parameter values to be used for the query execution.
     *
     * @param parameterValues the values keyed by the parameter named used in
     *                        the query string.
     */
    public void setParameterValues(Map<String, Object> parameterValues) {
        this.parameterValues = parameterValues;
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        super.afterPropertiesSet();

        if (queryProvider == null) {
            Assert.notNull(entityManagerFactory);
            Assert.hasLength(queryString);
        }
        // making sure that the appropriate (JPA) query provider is set
        else {
            Assert.isTrue(queryProvider != null, "JPA query provider must be set");
        }
    }

    /**
     * @param queryString JPQL query string
     */
    public void setQueryString(String queryString) {
        this.queryString = queryString;
    }

    /**
     * @param queryProvider JPA query provider
     */
    public void setQueryProvider(JpaQueryProvider queryProvider) {
        this.queryProvider = queryProvider;
    }

    @Override
    protected void doOpen() throws Exception {
        super.doOpen();

        entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
        if (entityManager == null) {
            throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
        }
        // set entityManager to queryProvider, so it participates
        // in JpaPagingItemReader's managed transaction
        if (queryProvider != null) {
            queryProvider.setEntityManager(entityManager);
        }

    }

    @Override
    @SuppressWarnings("unchecked")
    protected void doReadPage() {

        Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());

        if (parameterValues != null) {
            for (Map.Entry<String, Object> me : parameterValues.entrySet()) {
                query.setParameter(me.getKey(), me.getValue());
            }
        }

        if (results == null) {
            results = new CopyOnWriteArrayList<T>();
        } else {
            results.clear();
        }
        results.addAll(query.getResultList());

    }

    @Override
    protected void doJumpToPage(int itemIndex) {
    }

    @Override
    protected void doClose() throws Exception {
        entityManager.close();
        super.doClose();
    }

}

@spring-projects-issues
Copy link
Collaborator Author

Mahmoud Ben Hassine commented

I'm re-opening this as a documentation feature to address improving our documentation around this.

The documentation has been updated with an example of how to provide a custom BatchConfigurer (See https://github.com/spring-projects/spring-batch/pull/620/files).

In the case of this issue, it would be something like:

@Bean
 public BatchConfigurer batchConfigurer() {
 	return new DefaultBatchConfigurer() {
 		@Override
 		public PlatformTransactionManager getTransactionManager() {
 			return jpaTransactionManager();
 		}
 	};
 }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
has: backports Legacy label from JIRA. Superseded by "for: backport-to-x.x.x" in: infrastructure type: enhancement
Projects
None yet
Development

No branches or pull requests

1 participant