I have four exact replicas of a service that among other things catch messages from a certain queue using Apache Camel RabbitMQ endpoints. Each route looks like this:
//Start Process from RabbitMQ queue
from("rabbitmq://" +
System.getenv("ADVERTISE_ADDRESS") +
"/" +
System.getenv("RABBITMQ_EXCHANGE_NAME") +
"?routingKey=" +
System.getenv("RABBITMQ_ROUTING_KEY") +
"&autoAck=true")
.process(exchange -> exchange.getIn().setBody(exchange.getIn().getBody()))
.unmarshal().json(JsonLibrary.Jackson, TwitterBean.class)
.transform().method(ResponseTransformer.class, "transformtwitterBean")
.marshal().json(JsonLibrary.Jackson)
.setHeader(Exchange.HTTP_METHOD, constant("POST"))
.setHeader(Exchange.CONTENT_TYPE, constant("application/json"))
.to("http4://" + System.getenv("ADVERTISE_ADDRESS") + ":" + System.getenv("CAMUNDA_PORT") + "/rest/process-definition/key/MainProcess/start")
.log("Response: ${body}");
Right now each endpoint processes the message. Even though the "concurrent consumers"-option by default is one. I assumed that maybe my messages weren't acknowledged, so I set the autoAck option to true.
This didn't help, how can I make these services competing consumers?
EDIT:
A code snippet from the configuration of my publisher app:
@Configuration
public class RabbitMqConfig {
@Bean
Queue queue() {
return new Queue(System.getenv("RABBITMQ_QUEUE_NAME"), true);
}
@Bean
DirectExchange exchange() {
return new DirectExchange(System.getenv("RABBITMQ_EXCHANGE_NAME"), true, true);
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(System.getenv("RABBITMQ_ROUTING_KEY"));
}
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}
process(exchange -> exchange.getIn().setBody(exchange.getIn().getBody()))
Wouldn't this just set the in body to the value already present? – Novaterata