0
votes

How can I setup listener container when is channelTransacted(true) to throw message into dead letter queue when is something wrong instead of requeue? When I do not use channelTransacted, everything works fine and I can see the message in the dead letter queue.

@Bean(name = "amqpInboundEsignRequest")
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) {
    return IntegrationFlows.from(
            Amqp.inboundAdapter(connectionFactory, esignIAutoRequestQueue())
                    .acknowledgeMode(AcknowledgeMode.AUTO)
                    .messageConverter(new Jackson2JsonMessageConverter())
                    .autoStartup(false)
                    .defaultRequeueRejected(false)
                    //.channelTransacted(true) // dead letter does not work
                    //.transactionManager(transactionManager) // dead letter does not work
    )
            .log("amqpInbound.start-process")
            .handle(p -> {
                throw new RuntimeException("Something wrong!");
            })
            .get();
}

EDIT

These are versions of dependencies.

[INFO] +- org.springframework.boot:spring-boot-starter-amqp:jar:1.5.9.RELEASE:compile
[INFO] |  +- org.springframework:spring-messaging:jar:4.3.13.RELEASE:compile
[INFO] |  \- org.springframework.amqp:spring-rabbit:jar:1.7.4.RELEASE:compile
[INFO] |     +- com.rabbitmq:http-client:jar:1.1.1.RELEASE:compile
[INFO] |     +- com.rabbitmq:amqp-client:jar:4.0.3:compile
[INFO] |     +- org.springframework.retry:spring-retry:jar:1.2.1.RELEASE:compile
[INFO] |     \- org.springframework.amqp:spring-amqp:jar:1.7.4.RELEASE:compile

[INFO] +- org.springframework.boot:spring-boot-starter-integration:jar:1.5.9.RELEASE:compile
[INFO] |  +- org.springframework.integration:spring-integration-core:jar:4.3.12.RELEASE:compile
[INFO] |  \- org.springframework.integration:spring-integration-java-dsl:jar:1.2.3.RELEASE:compile
[INFO] |     \- org.reactivestreams:reactive-streams:jar:1.0.0:compile

I would like to have transaction synchronized with the external transaction db (PlatformTransactionManager). When I set transactionManager(transactionManager) on listener container it always do requeue.

1

1 Answers

1
votes

What version are you using? I just tested channelTransacted=true with Spring Integration 5.0.2 and Spring AMQP 2.0.2, as well as Spring Integration 4.3.14 and Spring AMQP 1.7.6 and everything works as expected, the failed message went to the DLQ.

@SpringBootApplication
public class So48914749Application {

    public static void main(String[] args) {
        SpringApplication.run(So48914749Application.class, args);
    }

    @Bean
    public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(
                Amqp.inboundAdapter(connectionFactory, "foo")
                        .acknowledgeMode(AcknowledgeMode.AUTO)
                        .messageConverter(new Jackson2JsonMessageConverter())
                        .autoStartup(true)
                        .defaultRequeueRejected(false)
                        .channelTransacted(true) // dead letter does not work
        // .transactionManager(transactionManager) // dead letter does not work
        )
                .log("amqpInbound.start-process")
                .handle(p -> {
                    throw new RuntimeException("Something wrong!");
                })
                .get();
    }

    @Bean
    public Queue queue() {
        return QueueBuilder.durable("foo")
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", "dlq")
                .build();
    }

    @Bean
    public Queue dlq() {
        return new Queue("dlq");
    }

    @RabbitListener(queues = "dlq")
    public void dlq(Message in) {
        System.out.println(in);
    }

}

result:

(Body:'[B@17793549(byte[8])' MessageProperties [headers={x-first-death-exchange=, x-death=[{reason=rejected, count=1, exchange=, time=Wed Feb 21 16:43:48 EST 2018, routing-keys=[foo], queue=foo}], x-first-death-reason=rejected, x-first-death-queue=foo}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=dlq, receivedDelay=null, deliveryTag=4, messageCount=0, consumerTag=amq.ctag-dOypXDkkQ5Hvw2W9cUIpzg, consumerQueue=dlq])

EDIT

Please be more precise in future; your question implied the same problem occurred regardless of whether a transaction manager was provided.

See the second NOTE under A note on Rollback of Received Messages. This is fixed in 2.0.

In 1.7.x, we had to retain the old behavior by default, for backwards compatibility. The 1.7.x documentation explains that you have to set the alwaysRequeueWithTxManagerRollback to false the get the new (consistent) behavior.

This is a listener container property so you will need to wire up a container and inject it into the inbound channel adapter.