0
votes

enter image description here

I have four exact replicas of a service that among other things catch messages from a certain queue using Apache Camel RabbitMQ endpoints. Each route looks like this:

//Start Process from RabbitMQ queue
    from("rabbitmq://" +
            System.getenv("ADVERTISE_ADDRESS") +
            "/" +
            System.getenv("RABBITMQ_EXCHANGE_NAME") +
            "?routingKey=" +
            System.getenv("RABBITMQ_ROUTING_KEY") +
            "&autoAck=true")
            .process(exchange -> exchange.getIn().setBody(exchange.getIn().getBody()))
            .unmarshal().json(JsonLibrary.Jackson, TwitterBean.class)
            .transform().method(ResponseTransformer.class, "transformtwitterBean")
            .marshal().json(JsonLibrary.Jackson)
            .setHeader(Exchange.HTTP_METHOD, constant("POST"))
            .setHeader(Exchange.CONTENT_TYPE, constant("application/json"))
            .to("http4://" + System.getenv("ADVERTISE_ADDRESS") + ":" + System.getenv("CAMUNDA_PORT") + "/rest/process-definition/key/MainProcess/start")
            .log("Response: ${body}");

Right now each endpoint processes the message. Even though the "concurrent consumers"-option by default is one. I assumed that maybe my messages weren't acknowledged, so I set the autoAck option to true.

This didn't help, how can I make these services competing consumers?

EDIT:

A code snippet from the configuration of my publisher app:

@Configuration
public class RabbitMqConfig {
    @Bean
    Queue queue() {
        return new Queue(System.getenv("RABBITMQ_QUEUE_NAME"), true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(System.getenv("RABBITMQ_EXCHANGE_NAME"), true, true);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(System.getenv("RABBITMQ_ROUTING_KEY"));
    }

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
}
2
process(exchange -> exchange.getIn().setBody(exchange.getIn().getBody())) Wouldn't this just set the in body to the value already present?Novaterata
@Novaterata, quite possible. I haven't worked with Camel for 1.5+ now so I'm unable to clear that up for you!Quinten Scheppermans

2 Answers

1
votes

The issue you have is that you're not naming your queue on the service side

Based on the camel apache rabbitmq documentation, this means that a random name is generated for the queue.

So:

  • you have a publisher that sends a message to an exchange
  • then each of your service creates a queue with a random name, and binds it to the exchange

Each service having it's own queue, bound to the same exchange, will get the same messages.

To avoid this you need to provide a queue name, so that each service will connect to the same queue, which will mean they will share the message consumption with the other service instances.

0
votes

Sounds like you don't have a Queue, but a Topic. See here for a comparison.

The message broker is responsible to give a queue message to only one consumer, no matter how much of them are present.