0
votes

i use spring integeration redis,poll message from redis, like this:

@Bean
public PseudoTransactionManager transactionManager() {
    final PseudoTransactionManager pseudoTransactionManager = new PseudoTransactionManager();
    return pseudoTransactionManager;
}

@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionEvaluatingTransactionSynchronizationProcessor transactionSynchronizationProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
    transactionSynchronizationProcessor.setAfterCommitExpression(this.PARSER.parseExpression("#store.rename('commit')"));
    transactionSynchronizationProcessor.setAfterRollbackExpression(this.PARSER.parseExpression("#store.rename('roll')"));
    DefaultTransactionSynchronizationFactory transactionSynchronizationFactory = new DefaultTransactionSynchronizationFactory(transactionSynchronizationProcessor);
    return transactionSynchronizationFactory;
}


@Bean
public SourcePollingChannelAdapterFactoryBean sourcePollingChannelAdapter(RedisStoreMessageSource redisStoreMessageSource, TransactionSynchronizationFactory transactionSynchronizationFactory) {

    SourcePollingChannelAdapterFactoryBean sourcePollingChannelAdapterFactoryBean = new SourcePollingChannelAdapterFactoryBean();
    sourcePollingChannelAdapterFactoryBean.setAutoStartup(true);
    sourcePollingChannelAdapterFactoryBean.setOutputChannelName("mail-delivery-status-route-channel");
    sourcePollingChannelAdapterFactoryBean.setSource(redisStoreMessageSource);
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(10);
    pollerMetadata.setTransactionSynchronizationFactory(transactionSynchronizationFactory);
    PeriodicTrigger periodicTrigger = new PeriodicTrigger(2000);
    pollerMetadata.setTrigger(periodicTrigger);
    sourcePollingChannelAdapterFactoryBean.setPollerMetadata(pollerMetadata);


    return sourcePollingChannelAdapterFactoryBean;
}


@Bean
public TestHandler testHandler() {

    return new TestHandler();
}

@Bean
public IntegrationFlow trans() {
    return flow -> flow.channel("mail-delivery-status-route-channel").handle(testHandler());
}

Normally, after the process is complete, the afterCommit #store.rename('commit') operation will be performed, but it is not doing it now, and will continue polling , i debug ,find that: AbstractPollingEndpoint#bindResourceHolderIfNecessary TransactionSynchronizationManager.isActualTransactionActive() is always false. How can I improve the program.

1

1 Answers

1
votes

The pollerMetadata.setTransactionSynchronizationFactory(transactionSynchronizationFactory); is not enough. You are missing to add an adviceChain into the PollerMetadata, where one of them should be TransactionInterceptor. See TransactionInterceptorBuilder for convenience.

Although it's fully unclear why would one use SourcePollingChannelAdapterFactoryBean manually if there is a Java DSL already in the project and the IntegrationFlow can handle all the boilerplate code for you. I mean you need to look into the:

/**
 * Populate the provided {@link MessageSource} object to the {@link IntegrationFlowBuilder} chain.
 * The {@link org.springframework.integration.dsl.IntegrationFlow} {@code startMessageSource}.
 * In addition use {@link SourcePollingChannelAdapterSpec} to provide options for the underlying
 * {@link org.springframework.integration.endpoint.SourcePollingChannelAdapter} endpoint.
 * @param messageSource the {@link MessageSource} to populate.
 * @param endpointConfigurer the {@link Consumer} to provide more options for the
 * {@link org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean}.
 * @return new {@link IntegrationFlowBuilder}.
 * @see MessageSource
 * @see SourcePollingChannelAdapterSpec
 */
public static IntegrationFlowBuilder from(MessageSource<?> messageSource,
        Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer) {

And configure .transactional() and transactionSynchronizationFactory() there on the PollerSpec.