1
votes

I have a service-activator which uses a poller to pull messages from a channel. The channel has a queue which is backed by a persistent store to a database. The poller is also configured with a task-executor in order to add some concurrency to the message processing from the channel.

The task-executor is configured with a queue-capacity.

Since the poller retrieves messages from the channel from a database and this is configured to be transactional, then what happens to the transactions for messages that get queued in the task-executor if the task-executor has no more threads available. The requests on the task-executor for a thread are queued and since these messages have no thread of their own, what happens to the transaction? I assume the removal of messages from the persistent channel store by the poller that are queued by (in) the task executor will be committed. So if the server fails while it has queued runnables queued in the task executor they will be lost?

Since the idea of the transactional persistent channel queue is to ensure no messages are lost in case the server goes down, how are queued messages (in the task-executor) handled in terms of their transactions active on the channels database backed queue/store ?

    <bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="channelServerDataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="${user.name}_${channel.queue.region:default}"/>
    <property name="usingIdCache" value="false"/>
</bean> 

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/> 
</int:transaction-synchronization-factory>

<int:channel id="transacitonAsyncServiceQueue">
    <int:queue message-store="store"/> 
    <!--  <int:queue/>  --> 
</int:channel>

<bean id="rxPollingTrigger" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="500"/>
    <constructor-arg value="MILLISECONDS"/>
    <property name = "initialDelay" value = "30000"/> 
    <!-- initialDelay important to ensure channel doesnt start processing before the datasources have been initialised becuase we
         now persist transactions in the queue, at startup (restart) there maybe some ready to go which get processed before the
         connection pools have been created which happens when the servlet is first hit -->
</bean> 

<int:service-activator ref="asyncChannelReceiver" method="processMessage" input-channel="transacitonAsyncServiceQueue">
    <int:poller trigger="rxPollingTrigger" max-messages-per-poll="20"  task-executor="taskExecutor" receive-timeout="400">
        <int:transactional transaction-manager="transactionManagerAsyncChannel" /> 
    </int:poller>
    <int:request-handler-advice-chain>
        <ref bean="databaseSessionContext" />
    </int:request-handler-advice-chain>
</int:service-activator>

<task:executor id="taskExecutor" pool-size="100-200" queue-capacity="200" keep-alive="1" rejection-policy="CALLER_RUNS" />  
1

1 Answers

0
votes

...then what happens to the transactions for messages that get queued in the task-executor if the task-executor has no more threads available. The requests on the task-executor for a thread are queued and since these messages have no thread of their own, what happens to the transaction?

It doesn't work that way - the transaction doesn't start until the task is executed (including the receive() from the channel).

The poller schedules the task, the task starts, the transaction starts, the work proceeds, the transaction is committed/rolled back.

See AbstractPollingEndpoint - the pollingTask.call() is where the transaction starts.