3
votes

I had a terrible night trying to figure out what is going on with RabbitMQ and SpringXD, unfortunately without a success.

The problem: SpringXD closes RabbitMQ connections repeatedly, or reports warnings related to the channel cache size.

Fragment from the SpringXD log (during stream initialization/autowiring):

 2016-05-03T07:42:43+0200 1.3.0.RELEASE WARN
 DeploymentsPathChildrenCache-0 listener.SimpleMessageListenerContainer
 - CachingConnectionFactory's channelCacheSize can not be less than the 
 number of concurrentConsumers so it was reset to match: 4

...

 2016-05-03T07:54:17+0200 1.3.0.RELEASE ERROR AMQP Connection
 192.168.120.125:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

 2016-05-03T17:38:58+0200 1.3.0.RELEASE ERROR AMQP Connection
 192.168.120.125:5672 connection.CachingConnectionFactory - Channel shutdown: connection error; protocol method:
 method<connection.close>(reply-code=504, reply-text=CHANNEL_ERROR - 
 second 'channel.open' seen, class-id=20, method-id=10)

Fragment from the RabbitMQ log:

 =WARNING REPORT==== 3-May-2016::08:08:09 === closing AMQP connection <0.22276.61> (192.168.120.125:59350 -> 192.168.120.125:5672): client
 unexpectedly closed TCP connection

 =ERROR REPORT==== 3-May-2016::08:08:11 === closing AMQP connection 0.15409.61> (192.168.120.125:58527 -> 192.168.120.125:5672):
 {writer,send_failed,{error,closed}}

state blocked error is rare

 =ERROR REPORT==== 3-May-2016::17:38:58 === Error on AMQP connection <0.20542.25> (192.168.120.125:59421 -> 192.168.120.125:5672, vhost:
'/', user: 'xd', state: blocked), channel 7: operation channel.open
caused a connection exception channel_error: "second 'channel.open'
 seen"

My setup (6 nodes)

- springxd 1.3.0 distributed (zookeeper)  
- RabbitMQ 3.6.0, Erlang R16B03-1 cluster


    ackMode:                   AUTO ## or NONE
    autoBindDLQ:               false
    backOffInitialInterval:    1000
    backOffMaxInterval:        10000
    backOffMultiplier:         2.0
    batchBufferLimit:          10000
    batchingEnabled:           false
    batchSize:                 200
    batchTimeout:              5000
    compress:                  false
    concurrency:               4
    deliveryMode:              NON_PERSISTENT ## or PERSISTENT
    durableSubscription:       false
    maxAttempts:               10
    maxConcurrency:            10
    prefix:                    xdbus.
    prefetch:                  1000
    replyHeaderPatterns:       STANDARD_REPLY_HEADERS,*
    republishToDLQ:            false
    requestHeaderPatterns:     STANDARD_REQUEST_HEADERS,*
    requeue:                   true
    transacted:                false
    txSize:                    1000

spring: rabbitmq:

addresses:
priv1:5672,priv2:5672,priv3:5672,
priv4:5672,priv5:5672,priv6:5672

adminAddresses:  
http://priv1:15672, http://priv2:15672, http://priv3:15672, http://priv4:15672, http://priv5:15672,http://priv6:15672

nodes: 
rabbit@priv1,rabbit@priv2,rabbit@priv3,
rabbit@priv4,rabbit@priv5,rabbit@priv6

username: xd
password: xxxx
virtual_host: /
useSSL: false

ha-xdbus policy:

 - ^xdbus\. all  
 - ha-mode: exactly
 - ha-params:   2 
 - queue-master-locator:    min-masters

Rabbit conf

[
 {rabbit, 
[
     {tcp_listeners, [5672]},
     {queue_master_locator, "min-masters"}
]
}
].

When ackMode is NONE the following happens:

Eventually the number of consumers drop to zero and I have a zombie streams that don't recover from that state, which in turn causes unwanted queueing.

When ackMode is AUTO the following happens:

Some messages left un-acked forever.

SpringXD streams and durable queues

Rabbit module is being used as source or sink, no custom autowiring.

Typical stream definitions are as follows:

Ingestion:

event_generator | rabbit --mappedRequestHeaders=XDRoutingKey --routingKey='headers[''XDRoutingKey'']'

Processing/Sink:

rabbit --queues='xdbus.INQUEUE-A' | ENRICHMENT-PROCESSOR-A | elastic-sink
rabbit --queues='xdbus.INQUEUE-B' | ENRICHMENT-PROCESSOR-B | elastic-sink

xdbus.INQUEUE-xxx are manually created from the Rabbit admin GUI. (durable)

GLOBAL statistics (from the RabbitMQ Admin)

  • Connections: 190
  • Channels: 2263 (Channel cache problem perhaps ?)
  • Exchanges: 20
  • Queues: 120
  • Consumers : 1850

Finally:

I would appreciate if someone could answer what is wrong with the configuration (I am pretty sure the network is performing well, so there are no network problems and there is no problem related to max open files limitation).

Message rates vary from 2K/sec to max 30k/sec which is relative small load.

Thanks!

Ivan

1

1 Answers

2
votes

We have seen some similar instability when churning channels at a high rate.

The work-around was to increase the channel cache size to avoid the high rate of churning; it's not clear where the instability lies, but I don't believe it is in Spring AMQP.

One problem, however, is that XD doesn't expose channelCacheSize as a property.

The answer at the link above has a work-around to add the property by replacing the bus configuration XML. Increasing the cache size solved that user's problem.

We have an open JIRA issue to expose the property but it's not implemented yet.

I see you originally posted this as an 'answer' to that question.

Could someone be more specific and explain where exactly rabbit-bus.xml should be installed and why is this happening anyway.

As it says there, you need to put it under the xd config directory:

xd/config/META-INF/spring-xd/bus/rabbit-bus.xml.

EDIT

Technique using the bus extension mechanism instead...

$ cat xd/config/META-INF/spring-xd/bus/ext/cf.xml 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg ref="rabbitFactory" />
        <property name="addresses" value="${spring.rabbitmq.addresses}" />
        <property name="username" value="${spring.rabbitmq.username}" />
        <property name="password" value="${spring.rabbitmq.password}" />
        <property name="virtualHost" value="${spring.rabbitmq.virtual_host}" />
        <property name="channelCacheSize" value="${spring.rabbitmq.channelCacheSize:100}" />
    </bean>

</beans>

EDIT: TEST RESULTS

Prepopulated queue foo with 1 million messages.

    concurrency:               10
    prefetch:                  1000
    txSize:                    1000

.

xd:>stream create foo --definition "rin:rabbit --concurrency=10 --maxConcurrency=10 --prefetch=1000 --txSize=1000 | t1:transform | t2:transform | rout:rabbit --routingKey='''bar'''" --deploy
Created and deployed new stream 'foo'

So with this configuration, we end up with 40 consumers.

I never saw more than 29 publishing channels from the bus, there were 10 publishers for the sink.

1m messages were transferred from foo to bar in less than 5 minutes (via xdbus.foo.0, xdbus.foo.1 and xdbus.foo.2) - 4m messages published.

No errors - but my laptop needs to cool off :D