I'm trying to set-up a Integration Workflow which publishes messages to RabbitMQ.
I have 2 questions regarding this: 1. Is my Queue Bean Working as I hope it is :) 2. How can I set a message's priority with the outbound-amqp-adapter using Integration DSL?
@Configuration
public class RabbitConfig {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Bean
TopicExchange worksExchange() {
return new TopicExchange("work.exchange", true, false);
}
@Bean
Queue queue() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
return new Queue("dms.document.upload.queue", true, false, false, args);
}
@Bean
public RabbitTemplate worksRabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
template.setExchange("work.exchange");
template.setRoutingKey("work");
template.setConnectionFactory(rabbitConnectionFactory);
return template;
}
@Configuration
public class WorksOutbound {
@Autowired
private RabbitConfig rabbitConfig;
@Bean
public IntegrationFlow toOutboundQueueFlow() {
return IntegrationFlows.from("worksChannel")
.transform(Transformers.toJson())
.handle(Amqp.outboundAdapter(rabbitConfig.worksRabbitTemplate()))
.get();
}
}
UPDATE After beeing able to push the message with the appropriate "Priority header" I can pull the messages according to their priority using the Rabbit Management UI, but I am somehow unable to pull them correctly using spring-amqp consumer...
@Bean
public SimpleMessageListenerContainer workListenerContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(rabbitConnectionFactory);
container.setQueues(worksQueue());
container.setConcurrentConsumers(2);
container.setDefaultRequeueRejected(false);
return container;
}