0
votes

i have producer and consumer spring cloud stream applications, we don't know which application we will start first.either it can be producer or consumer.

when we declare properties in application.properties for producer and consumer applications about destination(Exchange),group(queue),it would be in such a way if destination(Exchange),group(queue) is already exist don't try to create again use existing one otherwise create.

1.if producer starts first it needs create destination and group, and same destination and group will be used by consumer without trying to create again.

2.if consumer starts first it needs create destination and group, and same destination and group will be used by producer without trying to create again.

Need both producer and consumer application.properties to satisfy above requirements.

Producer:

public interface FoodOrderPublisher {

      String OUTPUT = "foodOrderPublishChannel";

      @Output(OUTPUT)
      MessageChannel create();
}

spring.cloud.stream.bindings.foodOrderPublishChannel.destination=foodOrders
spring.cloud.stream.bindings.foodOrderPublishChannel.group=foodOrdersQueue
spring.cloud.stream.bindings.foodOrderPublishChannel.producer.requiredGroups=foodOrdersQueue
spring.cloud.stream.default.contentType=application/json

Consumer:

spring.cloud.stream.bindings.foodOrderRecieveChannel.destination=foodOrders
spring.cloud.stream.bindings.foodOrderRecieveChannel.group=foodOrdersQueue

public interface FoodOrderConsumer {

    String INPUT = "foodOrderRecieveChannel";

    @Input(INPUT)
    SubscribableChannel receive();

}

With above code:

Producer application starting properly by creating destination and group where as consumer is giving following error:

2018-12-11 20:43:41.607 INFO 1980 --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: foodOrders.foodOrdersQueue, bound to: foodOrders 2018-12-11 20:43:41.739 WARN 1980 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Failed to declare exchange: Exchange [name=foodOrders, type=topic, durable=true, autoDelete=false, internal=false, arguments={}], continuing... org.springframework.amqp.AmqpIOException: java.io.IOException 2018-12-11 20:43:41.753 ERROR 1980 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured

java.net.SocketException: socket closed

1

1 Answers

0
votes

Set the requiredGroups property on the producer side.

requiredGroups

A comma-separated list of groups to which the producer must ensure message delivery even if they start after it has been created (for example, by pre-creating durable queues in RabbitMQ).

.

without trying to create again.

Both sides will attempt the creation but the second one effectively does nothing since the operation is idempotent.