I have following RabbitMQ Configuration
@Configuration
@IntegrationComponentScan
public class RabbitConfig {
@Autowired // TODO constructor!
private ConnectionFactory connectionFactory;
public RabbitConfig(
@Value("${article.inbound.queue}") String queueName,
@Value("${article.inbound.exchange}") String exchangeName,
@Value("${article.inbound.routingkey}") String routingKey) {
this.queueName = queueName;
this.exchangeName = exchangeName;
this.routingKey = routingKey;
}
@Bean
Exchange exchange() {
return ExchangeBuilder
.topicExchange(this.exchangeName)
.durable(true)
.build();
}
@Bean
Queue queue() {
return QueueBuilder.durable(queueName).build();
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(exchange())
.with(routingKey)
.noargs();
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleMessageListenerContainer articleListenerContainer(
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setMessageConverter(jsonMessageConverter());
return container;
}
}
And simple Spring Integration Flow
@Bean
IntegrationFlow fromMessageBroker(SimpleMessageListenerContainer messageListener) {
return IntegrationFlows.from(Amqp.inboundAdapter(messageListener))
.log()
.handle(message -> {
final MessageHeaders headers = message.getHeaders();
final Object assetId = headers.get("assetId");
log.info(assetId);
})
.get();
}
When I start Spring Boot everything is fine until I publish message to exchange which my defined queue is bind. Then all processing goes further and after that SimpleMessageListenerContainer crashes.
2018-09-20 13:11:08.240 INFO 49400 --- [ main] c.p.ftppush.article.MessageConsumer : Started MessageConsumer in 1.743 seconds (JVM running for 2.386)
2018-09-20 13:11:12.309 INFO 49400 --- [erContainer#0-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload=byte[0], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=article.original.orgn.123, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b7db656, amqp_receivedExchange=que.article.content.pf.normal.trigger, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b7db656, amqp_deliveryTag=1, assetId=1qh22m9027k6d1jz29tsi510x5, amqp_consumerQueue=exc.article.content, amqp_redelivered=false, id=0d04f22a-991f-000b-63f5-5cb087b915ab, amqp_consumerTag=amq.ctag-bK2-KaIxWpk57HJeQ_38AQ, timestamp=1537441872308}]
2018-09-20 13:11:12.310 INFO 49400 --- [erContainer#0-1] com.perform.ftppush.article.Flow : 1qh22m9027k6d1jz29tsi510x5
2018-09-20 13:11:12.317 ERROR 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer thread error, thread abort.
java.lang.AbstractMethodError: org.springframework.integration.channel.interceptor.WireTap.postSend(Lorg/springframework/messaging/Message;Lorg/springframework/messaging/MessageChannel;Z)V
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.postSend(AbstractMessageChannel.java:607) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:460) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:227) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:45) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:497) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:465) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundGateway.access$1000(AmqpInboundGateway.java:66) ~[spring-integration-amqp-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundGateway$Listener.process(AmqpInboundGateway.java:315) ~[spring-integration-amqp-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundGateway$Listener.onMessage(AmqpInboundGateway.java:263) ~[spring-integration-amqp-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
at java.base/java.lang.Thread.run(Thread.java:844) [na:na]
2018-09-20 13:11:12.322 ERROR 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2018-09-20 13:11:12.322 INFO 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2018-09-20 13:11:12.322 INFO 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
As you can see in logs flow goes to the end and then this container crashes. Unfortunately there is no useful information in exception trace. I'm wondering why it is like that. I was trying to redirect this messages in this flow into MessageChannel and then handle them but it didn't help.