问题描述:

The working configuration for the step in question is the following:

  • Step, Spring Batch Job Repository, and business repositories (using various datasources) all use a JTA transaction manager.
  • Step "myStep" uses a Jdbc Paging Item Reader.
  • WebLogic, Oracle XE and/or EE

I wanted to analyze the performance of the Jdbc Cursor Item Reader in "myStep", however after the first commit, the second chunk's first read would fail with java.sql.SQLException: Result set already closed.

I suspected it might be JTA / XA driver closing the cursor for some reason, so I gave "myStep" a simple datasource transaction manager (on the datasource the reader was using), and the step was able to complete successfully. This isn't a solution, since this breaks transactionally integrity of the step.

Should I be able to use a cursor reader inside of a JTA managed step (using the environment described below)? If so, what might be configured incorrectly on my end?

Environment

  • Transaction Manager:<bean id="myTransactionManager"

    class="org.springframework.transaction.jta.JtaTransactionManager"/>

  • Datasource Driver: OracleXADataSource JDBC 6 11.1.0.7.0
  • WebLogic: 12.1.3.0.0
  • Oracle DB 11g: Enterprise Edition 11.2.0.4.0
  • OS: OSX or Linux

Config

<bean id="myTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

<bean id="myDataSource" class="org.springframework.jndi.JndiObjectFactoryBean">

<property name="jndiName" value="jdbc/myDataSource"/>

<property name="proxyInterface" value="javax.sql.DataSource"/>

</bean>

<batch:step id="myStep" job-repository="myJobRepositoryFactory">

<batch:tasklet transaction-manager="myTransactionManager">

<batch:chunk

reader="myReader"

processor="myProcessor"

writer="myWriter"

commit-interval="100"

processor-transactional="false"/>

<batch:listeners>

<batch:listener ref="myListener"/>

</batch:listeners>

</batch:tasklet>

</batch:step>

<bean id="myReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">

<property name="dataSource" ref="myDataSource"/>

<property name="sql" value="SELECT * FROM myHugeTable ORDER BY myColumn DESC"/>

<property name="rowMapper">

<bean class="myRowMapper"/>

</property>

</bean>

Caught in the act

Below is the call stack of the result set being closed before the next chunk's read. Notice XA Connection closing all statements, which causes JDBC to close all results sets.

java.lang.Thread.State: RUNNABLE

at weblogic.jdbc.wrapper.ResultSet.internalClose(ResultSet.java:178)

at weblogic.jdbc.wrapper.Statement.closeAllResultSets(Statement.java:286)

at weblogic.jdbc.wrapper.Statement.internalClose(Statement.java:395)

at weblogic.jdbc.wrapper.Statement.internalClose(Statement.java:367)

at weblogic.jdbc.wrapper.XAConnection.closeAllStatements(XAConnection.java:393)

at weblogic.jdbc.wrapper.XAConnection.cleanup(XAConnection.java:406)

at weblogic.jdbc.wrapper.XAConnection.releaseToPool(XAConnection.java:432)

at weblogic.jdbc.jta.DataSource.removeTxAssoc(DataSource.java:1907)

at weblogic.jdbc.jta.DataSource.prepare(DataSource.java:1090)

at weblogic.transaction.internal.XAServerResourceInfo.prepare(XAServerResourceInfo.java:1408)

at weblogic.transaction.internal.XAServerResourceInfo.prepare(XAServerResourceInfo.java:522)

at weblogic.transaction.internal.ServerSCInfo.startPrepare(ServerSCInfo.java:411)

at weblogic.transaction.internal.ServerTransactionImpl.localPrepare(ServerTransactionImpl.java:2709)

at weblogic.transaction.internal.ServerTransactionImpl.globalPrepare(ServerTransactionImpl.java:2340)

at weblogic.transaction.internal.ServerTransactionImpl.internalCommit(ServerTransactionImpl.java:300)

at weblogic.transaction.internal.ServerTransactionImpl.commit(ServerTransactionImpl.java:260)

at org.glassfish.transaction.TransactionManagerImplCommon.commit(TransactionManagerImplCommon.java:571)

at org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1021)

at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:761)

at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:730)

at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150)

at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271)

at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:77)

at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:368)

at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)

at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144)

at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257)

at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:198)

at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)

at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:64)

at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67)

at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:165)

at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)

at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:134)

at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:304)

at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

网友答案:

You should be able to use the cursor reader inside a JTA managed step. We are doing exactly this in the project I'm working on. We use Atomikos as XA TM.

Here is our XA/JTA configuration, that we use. Perhaps it is of some use for you:

@Bean(initMethod = "init", destroyMethod = "shutdownForce")
public UserTransactionService userTransactionService() {
    return new UserTransactionServiceImp(userTransactionServiceProperties());
}

@Bean(initMethod = "init", destroyMethod = "close")
@DependsOn("userTransactionService")
public UserTransactionManager atomikosTransactionManager() {
    UserTransactionManager userTransactionManager = new UserTransactionManager();
    userTransactionManager.setForceShutdown(true);
    userTransactionManager.setStartupTransactionService(false);
    return userTransactionManager;
}

@Bean
@DependsOn("userTransactionService")
public UserTransaction atomikosUserTransaction() throws SystemException {
    return new UserTransactionImp();
}

@Bean
@DependsOn("userTransactionService")
public JtaTransactionManager transactionManager() throws SystemException {
    JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
    jtaTransactionManager.setTransactionManager(atomikosTransactionManager());
    jtaTransactionManager.setUserTransaction(atomikosUserTransaction());
    jtaTransactionManager.setAllowCustomIsolationLevels(true);
    return jtaTransactionManager;
} 

All our datasources are instantiated as org.springframework.boot.jta.atomikos.AtomikosDataSourceBean. E.g., a Ora-datasource is instantiated like this:

    AtomikosDataSourceBean oraXaDs = new AtomikosDataSourceBean();
    oraXaDs.setXaDataSourceClassName(oraDsProp.getDatasourceClass());
    oraXaDs.setUniqueResourceName(oraDsProp.getInstancename());
    oraXaDs.setMinPoolSize(oraDsProp.getPoolMinSize());
    oraXaDs.setMaxPoolSize(oraDsProp.getPoolMaxSize());
    oraXaDs.setTestQuery(oraDsProp.getValidConnectionSQL());

    Properties oraXaDsProps = oraXaDs.getXaProperties();
    oraXaDsProps.setProperty("user", oraDsProp.getUser());
    oraXaDsProps.setProperty("password", oraDsProp.getPassword());
    oraXaDsProps.setProperty("URL", oraDsProp.getUrl());
网友答案:

My two cents on this issue:

First some insight :

Reading from a database cursor means opening a connection, firing one SQL statement against it and constantly reading rows during the whole batch job. That makes sense, because often input data of a job can be characterized by one SQL statement, but executing it and reading all the data from the ResultSet upfront is of course no solution. We just have one problem here with reading constantly: committing the transaction would close the connection. So how do we keep it open? Simple solution: it doesn’t take part in the transaction. Spring Batch’s JdbcCursorItemReader uses a separate connection for opening the cursor, thereby bypassing the transaction managed by the transaction manager. In an application server environment we have to do a little bit more to make it work. Normally we get connections from a DataSource managed by the application server, and all of those connections take part in transactions by default. We need to set up a separate DataSource which does not take part in transactions, and only inject it into our cursor based readers. Injecting them anywhere else could cause a lot of damage regarding transaction safety.

Your problem is in your step basically :(from whatever i can conclude without viewing your datasource xml file :) )

Step, Spring Batch Job Repository, and business repositories (using various datasources) all use a JTA transaction manager.

The JTA transaction manager spring provides should be used in a way that weblogic handles your JTA transactions instead.What you need is only configure the datasource (which should be also under the container) to use XA-aware drivers, and in your app, use the corresponding transaction manager, and lookup the XA-aware datasources by JNDI

However if you cannot rely on the container (e.g. you are writing a standalone app, and etc), you will need to find an transaction manager that is capable on that. Atomikos is one the the most famous free JTA/XA library.

Having said that,after you have configured with either the JNDI way or Atomikos way,here are the configuration which should be kept in mind while using multiple datasources:

<batch:tasklet>
  <batch:transaction-attributes isolation="READ_COMMITTED" propagation="REQUIRES_NEW" timeout="200"/>
  <batch:chunk reader="myItemReader" writer="myItemWriter" commit-interval="20"/>
</batch:tasklet>

Hope this clears out some air on this issue.

相关阅读:
Top