1
votes

I was testing on SpringXD 1.3.0.RELEASE the duplication of messages to different sinks. My configuration is a three node cluster supported by RabbitMQ as message bus. My test was something like this:

  1. First Case

    stream create sourceToDuplicate --definition "trigger --fixedDelay=1 
    --timeUnit=MILLISECONDS --payload='test' > topic:test" --deploy
    stream create processMessages1 --definition "topic:test > cassandra --initScript=file:<absolut-path-to>/int-db.cql --ingestQuery='insert into book (isbn, title, author) values (uuid(), ?, ?)'"
    stream create processMessages2 --definition "topic:test > aggregator --count=1000 --timeout=1000 | file" --deploy
    

Now in order to increase the consumer on the cassandra-sink, I want to deploy the first stream with "module.cassandra.consumer.concurrency=10". This property let fail the deployment.

My workaround is now a fourth stream, so that I can increase the consumers:

  1. Second Case

    stream create topicToQueue1 --definition "topic:test > queue:test1" --deploy
    stream create processMessage1 --definition "queue:test1 > cassandra..."
    stream deploy processMessage1 --properties "module.cassandra.consumer.concurrency=10"
    

Finally my question: Why should the first use case fail if there is on rabbitmq already a queue added for the topic:channel where more consumers are allowed?

Merry Christmas to everyone

--- Update ---

Version: SpringXD 1.3.0.RELEASE

Error:

2015-12-18T13:58:28+0100 1.3.0.RELEASE INFO DeploymentSupervisor-0 
zk.ZKStreamDeploymentHandler - Deployment status for stream 'processMessage1':    
DeploymentStatus{state=failed,error(s)=java.lang.IllegalArgumentException:     
RabbitMessageBus does not support consumer property: concurrency for processMessage1.topic:test.
at org.springframework.xd.dirt.integration.bus.MessageBusSupport.validateProperties(MessageBusSupport.java:786)
    at org.springframework.xd.dirt.integration.bus.MessageBusSupport.validateConsumerProperties(MessageBusSupport.java:757)
    at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus.bindPubSubConsumer(RabbitMessageBus.java:472)
    at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindMessageConsumer(AbstractMessageBusBinderPlugin.java:275)
    at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindConsumerAndProducers(AbstractMessageBusBinderPlugin.java:155)
    at org.springframework.xd.dirt.plugins.stream.StreamPlugin.postProcessModule(StreamPlugin.java:73)
    at org.springframework.xd.dirt.module.ModuleDeployer.postProcessModule(ModuleDeployer.java:238)
    at org.springframework.xd.dirt.module.ModuleDeployer.doDeploy(ModuleDeployer.java:218)
    at org.springframework.xd.dirt.module.ModuleDeployer.deploy(ModuleDeployer.java:200)
    at org.springframework.xd.dirt.server.container.DeploymentListener.deployModule(DeploymentListener.java:365)
    at org.springframework.xd.dirt.server.container.DeploymentListener.deployStreamModule(DeploymentListener.java:334)
    at org.springframework.xd.dirt.server.container.DeploymentListener.onChildAdded(DeploymentListener.java:181)
    at org.springframework.xd.dirt.server.container.DeploymentListener.childEvent(DeploymentListener.java:149)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:509)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:503)
    at org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:92)
    at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
    at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:500)
    at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache$10.run(PathChildrenCache.java:762)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
1
>This property let fail the deployment. What error did you get? What version of XD? (edit the question, don't try to add a stack trace in a comment).Gary Russell
Version is 1.3.0...I edit the questionI-Doit

1 Answers

0
votes

You can't have concurrency > 1 on a topic: named channel - otherwise each thread will get a copy of the message.

If you want to use concurrency on a named channel, it has to be a queue: so each thread competes for messages.