0
votes

I have implemented the example as shown here Spring Dynamic Destination

In the rabbitmq, it is creating an exchange dynamically, but there is no option to provide binding or routing key. My requirement is to send a message to this dynamically created exchange with a routing key. How would i need to implement this to setup the routing key?

@Component
public class DDProducerBean {

    @Autowired
    private BinderAwareChannelResolver poChannelResolver = null;

    public void publish(DDSocketVO ddSocketVO) throws Exception {
        this.poChannelResolver.resolveDestination(ddSocketVO.getDestination()).send(MessageBuilder.withPayload(new ObjectMapper().
                setVisibility(PropertyAccessor.FIELD, Visibility.ANY).
                writeValueAsString(ddSocketVO)).build());
    }

}
1
Why producer.routingKeyExpression doesn't fit your requirements? docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/…Artem Bilan
This implementation is specific for spring.cloud.stream.dynamicDestinations where the BinderAwareChannelResolver takes care of dynamically creating/binding the outbound channel for these dynamic destinations. But it doesn't talk about how to bind the outbound channel to a queue and how to specify a routing key expression. As these are dynamic channels/ producers, we can't use producer.routingKeyExpression as the producer is created dynamically.abhdeb
My requirement is to route messages dynamically at runtime based on the payload. So i need a way to set the routing key at run time. As a result i can't use producer.routingKeyExpression, which is done in config or properties or yml, and not dynamically at run time, so that won't serve the purpose.abhdeb
??? It is called expression. It is exactly for the runtime resolution against requestMessage and the whole beanFactory: docs.spring.io/spring-integration/reference/html/…: > 8 A SpEL expression that is evaluated to determine the routing-key to use when sending Messages, with the message as the root object (e.g. payload.key).Artem Bilan
Consumer: Below are the examples of routing keys that are currently bound in an exchange in RabbitMQ - product.list.abhrdeb, product.list.sibera (where abhrdeb and sibera are userids, for each user we have a unique routing key like above.) In RabbitMQ, the amq.topic(exchange) has the following bindings - Queue-1 - routing key (product.list.abhrdeb), Queue-2 - routing key (product.list.sibera). Now based on the routing key in the payload and the above binding, messages will be routed to specific queues. In this scenario how do i set the routing key dynamically in the producer?abhdeb

1 Answers

0
votes

Here is the workaround as suggested Here

Basically create a MessageChannel with the dynamic destination using BinderAwareChannelResolver, then connect to RabbitMQ with RabbitAdmin API and bind the newly created exchange to another queue or exchange with routing key before sending messages.

@Autowired
private BinderAwareChannelResolver poChannelResolver;

public void publish(WebSocketVO webSocketVO) throws Exception {

    MessageChannel channel = this.poChannelResolver.resolveDestination(webSocketVO.getDestination());

    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setUsername(System.getProperty("spring.cloud.stream.binders.corerabbit.environment.spring.rabbitmq.username"));
    connectionFactory.setPassword(System.getProperty("spring.cloud.stream.binders.corerabbit.environment.spring.rabbitmq.password"));
    connectionFactory.setAddresses(System.getProperty("spring.cloud.stream.binders.corerabbit.environment.spring.rabbitmq.addresses"));
    connectionFactory.setVirtualHost(System.getProperty("spring.cloud.stream.binders.corerabbit.environment.spring.rabbitmq.virtual-host"));

    AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory);

    TopicExchange sourceExchange = new TopicExchange(webSocketVO.getDestination(), false, true);
    TopicExchange destExchange = new TopicExchange("amq.topic");

    amqpAdmin.declareBinding(BindingBuilder.bind(destExchange).to(sourceExchange).with(webSocketVO.getRoutingKeyExpression()));




    channel.send(MessageBuilder.withPayload(new ObjectMapper().
            setVisibility(PropertyAccessor.FIELD, Visibility.ANY).
            writeValueAsString(webSocketVO)).build());


    amqpAdmin.deleteExchange(webSocketVO.getDestination());

    connectionFactory.destroy();

}