0
votes

I'm using spark-rabbitmq_1.6 library to connect to RabbitMQ through Spark Streaming. The queue that I'm trying to connect to has limit of x-max-length = 1000. I set the Rabbit Config Params as below

Map<String, String>rabbitMqConParams = new HashMap<String, String>();
rabbitMqConParams.put("hosts", "rabbit.host.com");
...
rabbitMqConParams.put("x-max-length", "1000");

JavaReceiverInputDStream<String> receiverStream = RabbitMQUtils.createJavaStream(streamCtx, String.class, rabbitMqConParams, messageHandler);

Although the x-max-length is set, it throws the below error.

16/11/28 15:20:27 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Could not connect
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
    at org.apache.spark.streaming.rabbitmq.consumer.Consumer.declareQueue(Consumer.scala:136)
    at org.apache.spark.streaming.rabbitmq.consumer.Consumer.setQueue(Consumer.scala:110)
    at org.apache.spark.streaming.rabbitmq.consumer.Consumer.setQueue(Consumer.scala:82)
    at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$2.apply(RabbitMQInputDStream.scala:64)
    at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver$$anonfun$2.apply(RabbitMQInputDStream.scala:58)
....
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-length' for queue 'aeon.output' in vhost '/': received '1000' but current is '1000', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)

Any suggestions as to why this could occur? Any help is greatly appreciated.

Thanks.

1

1 Answers

0
votes