1
votes

Ok, I added my spring xml configuration for more explaination for my situation.

First, I aggregated 7 channels Second, The aggregator releases the aggregated channel declared as a publish-channel. Thirds, Each 7 subscribe-channels to select indoor-database to make each message. Forth, Each channel insert data to the external database using jdbc-outbound-channel adaptor.

Below I attached every xml configration related.. Thanks.

<int:bridge input-channel="request-write-to-openPMSQueueChannel"
        output-channel="writetoOpenPMSChannel">
        <int:poller fixed-rate="5000" max-messages-per-poll="-1">
            <int:transactional propagation="REQUIRED"
                transaction-manager="transactionExternalDatabaseManager" />
        </int:poller>
    </int:bridge>

<int:chain input-channel="writetoOpenPMSChannel"
    output-channel="writetoOpenPMS007Channel">
    <int:service-activator method="exectue007">
        <bean id=""
            class="com.sds.redca.core.module.analyzer.convert.ModelingConvertSVC">
        </bean>
    </int:service-activator>
    <int:splitter ref="fromListToRowSplitter" />
</int:chain>

<int:chain input-channel="writetoOpenPMSChannel"
        output-channel="writetoOpenPMS006Channel">
        <int:service-activator method="exectue006">
            <bean id=""
                class="com.sds.redca.core.module.analyzer.convert.ModelingConvertSVC">
            </bean>
        </int:service-activator>
        <int:splitter ref="fromListToRowSplitter" />
    </int:chain>

<int-jdbc:outbound-channel-adapter channel="writetoOpenPMS001Channel" 
query="INSERT INTO IF_RSRC
       (PJT_ID, DT_INPUT, RSRC_ID, RSRC_TYPE_ID, RSRC_DETAILTYPE_ID, RSRC_NM, CNFG_PJT_NM, RSRC_PATH_NM, LST_RGSTR_ID, LST_RGST_DTM, FINAL_RVSN_NO, YN_USE)
       VALUES
       (:payload[PJT_ID], :headers[timeStamp], :payload[RSRC_ID], :payload[RSRC_TYPE_ID], :payload[RSRC_DETAILTYPE_ID], :payload[RSRC_NM], :payload[CNFG_PJT_NM],
       :payload[RSRC_PATH_NM], :payload[LST_RGSTR_ID], :payload[LST_RGST_DTM], :payload[FINAL_RVSN_NO], :payload[YN_USE])"
data-source="outboundDataSource" />
<int-jdbc:outbound-channel-adapter channel="writetoOpenPMS002Channel" 
query="INSERT INTO IF_RSRC_RELATION
       (PJT_ID, DT_INPUT, MAIN_RSRC_ID, SUB_RSRC_ID, RELATION_TYPE_CODE_ID)
       VALUES
       (:payload[PJT_ID], :headers[timeStamp], :payload[MAIN_RSRC_ID], :payload[SUB_RSRC_ID], :payload[RELATION_TYPE_CODE_ID])"
data-source="outboundDataSource" />

But The problem is that I have to trigger some event right after when 7 insert jobs finish successfully. How can I get the point where 7 insert transactions finished for triggering some post-event?

1

1 Answers

1
votes

Sorry, not enough info. We need to see the upstream config.

If all your INSERTs are in the same TX, you can just and one more subscriber in he end of bussiness logic.

If everything is done withing the same Thread with TX, the TX won't be finished until you don't do that final notification.

From other side, if all of them are live within different transactions you need something like receipt, when each item adds some info to one more table to confirm that it has done original work. Yes, it can be at the same TX.

To determine that all desired items finished their work you need to poll with short interval that new table with specific business key to count them.

From other side all your tasks can send the same message to the common <aggregator> and that one will release a single group for them only when all have committed their transaction.

There might be anoher solution, but we need to know more about your use-case.

UPDATE

Thanks for the config.

I hope your writetoOpenPMSChannel is single-threaded publish-subscribe channel.

So, all your writetoOpenPMS* processes are bounded to that poller's TX and will be done one by one within that single TX.

Hence there is no stops to add one more subscriber to that channel as last one. And it receives the same message in the end of TX, just before commit. If you don't have any deferred constraints on those tables everything will be OK and within the single TX.

From other side <poller>'s <transactional> has synchronization-factory option to get deal with commit/rollback using TransactionSynchronization strategy: http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/transactions.html#transaction-synchronization