0
votes

We are using spring cloud data flow stream applications and RabbitMQ as a message broker.

In overall stream flow from source to sink modules , we are loosing data when ever we see "ChannelShutdown : connection errors" occurred in any of modules in a given stream flow.

Stream Example: Source | Transformer1 | transformer2 | transformer3 | sink

i.e. Any of the RabbitMQ channels connects were lost then Applications were failing to transmit the data to next modules/Applications which leads to data loss.

Exceptions :

2019-02-18 15:29:41.364 ERROR 94489 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error; 
2019-02-18 15:29:42.008  INFO 94489 --- [strationQueue-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@6adc5b9c: tags=[{amq.ctag-5dNneAd3QgwWADta7JAmQQ=employeeRegistrations.employeeRegistrationQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@a6e4897 Shared Rabbit Connection: SimpleConnection@22dc59b2 [delegate=amqp://[email protected]:5672/, localPort= 50775], acknowledgeMode=NONE local queue size=0
2019-02-18 15:29:42.010  INFO 94489 --- [strationQueue-2] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2019-02-18 15:29:42.019  INFO 94489 --- [strationQueue-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#d611f1c:1/SimpleConnection@1782b48a [delegate=amqp://[email protected]:5672/, localPort= 50864]

To replicate the issue : I ran two spring cloud stream programs

  1. Producer - Which pushes 100,000 messages to RabbitMQ exchange
  2. Consumer - Which is sink module receives payload from that queue [linked to exchange ]and prints

In order to get "channel shutdown : connection error" in consumer program logs , I went to rabbitMQ UI page and continuously removed the connections available in RabbitMQ UI page.

enter image description here

finally, During this process , consumer received only 98,484 messages out of 100,000 messages. So we lost data in transit due to channels connection shutdown


My Question :

Can we catch or detect the "Channel shutdown: Connection Error" in Spring stream applications ?

Is there any RabbitMQ listener class available to include in stream application to handle the error "Channel shutdown: Connection Error" ?

I came across RabbitMQ listeners like

using @RabbitListener Annotation inside Stream application

Example:
@RabbitListener(queues = TEST_QUEUE)
    public void handle(Foo in) {
        logger.info("Received: " + in);
    }

but This RabbitMQ listener listens to only specified queues or binding as specified in definition https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/annotation/RabbitListener.html

I would like to know is there any common RabbitMQ listener class which listens to channel connections instead of listening to specific queue.

So my question is like are there any Listeners available to check any channels [ linked to present application ] shutdown happened or not if so I can handle the data loss by sending payload back to next application after channel connection established.

Does SimpleRabbitListenerContainerFactory class help me in this situation ? if so please let me know the approach to solve this data loss issue due to channel shutdowns and connection loss issue.

Example:

1
Your question is not at all clear. "@RabbitListener expects queue and binder " - I don't know what that means; edit the question to include more information about your issue, including configuration and code.Gary Russell
I corrected my question. Please let me know if you need further information on the question which I am askingkeerthi

1 Answers

0
votes

You can enable retry in the producer with Spring Boot properties. Documentation here.

Scroll down to RabbitMQ

...
spring.rabbitmq.template.retry.enabled=false # Whether publishing retries are enabled.
spring.rabbitmq.template.retry.initial-interval=1000ms # Duration between the first and second attempt to deliver a message.
spring.rabbitmq.template.retry.max-attempts=3 # Maximum number of attempts to deliver a message.
spring.rabbitmq.template.retry.max-interval=10000ms # Maximum duration between attempts.
spring.rabbitmq.template.retry.multiplier=1 # Multiplier to apply to the previous retry interval.
...

But, to answer your question, you can add a ConnectionListener to the connection factory bean definition.