8
votes

I've got an application with 3 distributed dataSources (com.atomikos.jdbc.AtomikosDataSourceBean). I'm using Atomikos transaction manager as JTA implementation. Every dataSource works with PostgreSQL database. Now, I'm invoking my queries consequentially to each dataSource, and everything is working ok.

I'm wondering, if it is possible, using JTA, to invoke my queries in parallel (multithreading, concurrently)?

I've tried simply to invoke query in newly created thread, using jdbcTemplate (Spring). Firstly, I've faced a spring issue. Spring stores the transactional context in ThreadLocal field, so it wasn't resolved properly in my new thread (Spring transaction manager and multithreading). I've solved this issue, by setting same transactional context into newly created thread's ThreadLocal. But the same issue I'm facing in the Atomikos code. They also store the CompositeTransactionImp in the thread scoped map (BaseTrancationManager#getCurrentTx). But in Atomikos case it isn't possible to set there values for new thread. So I can't perform my queries concurrently because it seems that Atomikos doesn't support such approach. But I've also looked through JTA specification and found their the following: "Multiple threads may concurrently be associated with the same global transaction." ("3.2 TransactionManager Interface", http://download.oracle.com/otndocs/jcp/jta-1.1-spec-oth-JSpec/?submit=Download)

QUESTION: How can I invoke two or more queries to different dataSources concurrently, using JTA (2 phase commit), in scope of one global transaction?

DataSources config in tomcat context:

<Resource name="jdbc/db1" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
          factory="com.company.package.AtomikosDataSourceBeanFactory"
          xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
          xaProperties.serverName="localhost"
          xaProperties.portNumber="5451"
          xaProperties.databaseName="db1"
          uniqueResourceName="jdbc/db1"
          xaProperties.user="secretpassword"
          xaProperties.password="secretpassword"
          minPoolSize="5"
          maxPoolSize="10"
          testQuery="SELECT 1"  />

<Resource name="jdbc/db2" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
          factory="com.company.package.AtomikosDataSourceBeanFactory"
          xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
          xaProperties.serverName="localhost"
          xaProperties.portNumber="5451"
          xaProperties.databaseName="db2"
          uniqueResourceName="jdbc/db2"
          xaProperties.user="secretpassword"
          xaProperties.password="secretpassword"
          minPoolSize="5"
          maxPoolSize="10"
          testQuery="SELECT 1"  />

<Resource name="jdbc/db3" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
          factory="com.company.package.AtomikosDataSourceBeanFactory"
          xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
          xaProperties.serverName="localhost"
          xaProperties.portNumber="5451"
          xaProperties.databaseName="db3"
          uniqueResourceName="jdbc/db3"
          xaProperties.user="secretpassword"
          xaProperties.password="secretpassword"
          minPoolSize="5"
          maxPoolSize="10"
          testQuery="SELECT 1"  />

Transaction manager config in spring context:

 <bean id="transactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
  init-method="init" destroy-method="close" lazy-init="true">
  <property name="forceShutdown" value="false" />
 </bean>

Code:

    final SqlParameterSource parameters = getSqlParameterSourceCreator().convert(entity);

    // Solving Spring's ThreadLocal issue: saving thread local params
    final Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
    final List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
    final boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
    final String currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
    final AtomicReference<Throwable> exceptionHolder = new AtomicReference<Throwable>();

    // Running query in a separate thread.
    final Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                // Solving Spring's ThreadLocal issue: setting thread local values to newly created thread.
                for (Map.Entry<Object, Object> entry : resourceMap.entrySet()) {
                    TransactionSynchronizationManager.bindResource(entry.getKey(), entry.getValue());
                }
                if (synchronizations != null && !synchronizations.isEmpty()) {
                    TransactionSynchronizationManager.initSynchronization();
                    for (TransactionSynchronization synchronization : synchronizations) {
                        TransactionSynchronizationManager.registerSynchronization(synchronization);
                    }
                }
                TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
                TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);

                // Executing query.
                final String query = "insert into ...";
                NamedParameterJdbcTemplate template = new NamedParameterJdbcTemplate(dataSourceOne);

                template.update(query, parameters);
            } catch (final Throwable ex) {
                exceptionHolder.set(ex);
            }
        }
    });
    thread.start();

    // ... same code as above for other dataSources.

    // allThreds.join(); - joining to all threads.
1

1 Answers

1
votes

I think your idea to workaround the fact that TransactionSynchronizationManager must play with single threaded transaction is interesting but probably dangerous.

In the TransactionSynchronizationManager the transactionnal resources are stored in a ThreadLocal Map where the key is the resource factory and I'm wondering what will append when you will execute this workaround with multiple threads using the same resource factory - it probably don't apply in your case since you have 3 datasource -. (At first sight I would say that one of your transactionnal resource will be replaced by another one, but maybe I'm missing something...).

Anyway, I think you can try to use the javax.transaction.TransactionManager.resume() to achieve what you are trying to do.

The idea is to use the JTA api directly and so by-passing single-thread Spring transaction support.

Here is some code to illustrate what I have in mind:

@Autowired
JtaTransactionManager txManager;  //from Spring

javax.transaction.TransactionManager jtaTransactionManager;

public void parallelInserts() {
    jtaTransactionManager = txManager.getTransactionManager();  //we are getting the underlying implementation
    jtaTransactionManager.begin();
    final Transaction jtaTransaction  = jtaTransactionManager.getTransaction();
    try {
      Thread t1 = new Thread(){
        @Override
        public void run() {
            try {
                jtaTransactionManager.resume(jtaTransaction);
                //... do the insert
            } catch (InvalidTransactionException e) {
                try {
                    jtaTransaction.setRollbackOnly();
                } catch (SystemException e1) {
                    e1.printStackTrace();
                }
                e.printStackTrace();
            } catch (SystemException e) {
                e.printStackTrace();
            }
        }
      };
      t1.start();
      //same with t2 and t3
    } catch (Exception ex) {
        jtaTransactionManager.setRollbackOnly();
        throw ex;
    }
    //join threads and commit
    jtaTransactionManager.commit();
}

I think this solution may work (I must say that I didn't try myself). The only limitation that I see now is that you cannot re-use the threads because there is no counter-part to the resume() call and the second time you will call resume() you will probably have an IllegalStateException.