I've got an application with 3 distributed dataSources (com.atomikos.jdbc.AtomikosDataSourceBean). I'm using Atomikos transaction manager as JTA implementation. Every dataSource works with PostgreSQL database. Now, I'm invoking my queries consequentially to each dataSource, and everything is working ok.
I'm wondering, if it is possible, using JTA, to invoke my queries in parallel (multithreading, concurrently)?
I've tried simply to invoke query in newly created thread, using jdbcTemplate (Spring). Firstly, I've faced a spring issue. Spring stores the transactional context in ThreadLocal field, so it wasn't resolved properly in my new thread (Spring transaction manager and multithreading). I've solved this issue, by setting same transactional context into newly created thread's ThreadLocal. But the same issue I'm facing in the Atomikos code. They also store the CompositeTransactionImp in the thread scoped map (BaseTrancationManager#getCurrentTx). But in Atomikos case it isn't possible to set there values for new thread. So I can't perform my queries concurrently because it seems that Atomikos doesn't support such approach. But I've also looked through JTA specification and found their the following: "Multiple threads may concurrently be associated with the same global transaction." ("3.2 TransactionManager Interface", http://download.oracle.com/otndocs/jcp/jta-1.1-spec-oth-JSpec/?submit=Download)
QUESTION: How can I invoke two or more queries to different dataSources concurrently, using JTA (2 phase commit), in scope of one global transaction?
DataSources config in tomcat context:
<Resource name="jdbc/db1" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
factory="com.company.package.AtomikosDataSourceBeanFactory"
xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
xaProperties.serverName="localhost"
xaProperties.portNumber="5451"
xaProperties.databaseName="db1"
uniqueResourceName="jdbc/db1"
xaProperties.user="secretpassword"
xaProperties.password="secretpassword"
minPoolSize="5"
maxPoolSize="10"
testQuery="SELECT 1" />
<Resource name="jdbc/db2" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
factory="com.company.package.AtomikosDataSourceBeanFactory"
xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
xaProperties.serverName="localhost"
xaProperties.portNumber="5451"
xaProperties.databaseName="db2"
uniqueResourceName="jdbc/db2"
xaProperties.user="secretpassword"
xaProperties.password="secretpassword"
minPoolSize="5"
maxPoolSize="10"
testQuery="SELECT 1" />
<Resource name="jdbc/db3" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
factory="com.company.package.AtomikosDataSourceBeanFactory"
xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
xaProperties.serverName="localhost"
xaProperties.portNumber="5451"
xaProperties.databaseName="db3"
uniqueResourceName="jdbc/db3"
xaProperties.user="secretpassword"
xaProperties.password="secretpassword"
minPoolSize="5"
maxPoolSize="10"
testQuery="SELECT 1" />
Transaction manager config in spring context:
<bean id="transactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close" lazy-init="true">
<property name="forceShutdown" value="false" />
</bean>
Code:
final SqlParameterSource parameters = getSqlParameterSourceCreator().convert(entity);
// Solving Spring's ThreadLocal issue: saving thread local params
final Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
final List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
final boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
final String currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<Throwable>();
// Running query in a separate thread.
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
// Solving Spring's ThreadLocal issue: setting thread local values to newly created thread.
for (Map.Entry<Object, Object> entry : resourceMap.entrySet()) {
TransactionSynchronizationManager.bindResource(entry.getKey(), entry.getValue());
}
if (synchronizations != null && !synchronizations.isEmpty()) {
TransactionSynchronizationManager.initSynchronization();
for (TransactionSynchronization synchronization : synchronizations) {
TransactionSynchronizationManager.registerSynchronization(synchronization);
}
}
TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
// Executing query.
final String query = "insert into ...";
NamedParameterJdbcTemplate template = new NamedParameterJdbcTemplate(dataSourceOne);
template.update(query, parameters);
} catch (final Throwable ex) {
exceptionHolder.set(ex);
}
}
});
thread.start();
// ... same code as above for other dataSources.
// allThreds.join(); - joining to all threads.