1
votes

This is my current setup:

queue1 and queue2 are marged together with integration flow to channel1:

@Bean
public IntegrationFlow q1f() {
    return IntegrationFlows
            .from(queue1InboundAdapter())
            ...
            .channel(amqpInputChannel())
            .get();
}

@Bean
public IntegrationFlow q2f() {
    return IntegrationFlows
            .from(queue2InboundAdapter())
            ...
            .channel(amqpInputChannel())
            .get();
}

then, everything is aggregated and then confirmed after aggregated message is confirmed by rabbitmq:

@Bean
    public IntegrationFlow aggregatingFlow() {
        return IntegrationFlows
                .from(amqpInputChannel())
                .aggregate(...
                        .expireGroupsUponCompletion(true)
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(TimeUnit.SECONDS.toMillis(10))
                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))
                )
                .handle(amqpOutboundEndpoint())
                .get();
    }

    @Bean
    public AmqpOutboundEndpoint amqpOutboundEndpoint() {
        AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());
        outboundEndpoint.setConfirmAckChannel(manualAckChannel());
        outboundEndpoint.setConfirmCorrelationExpressionString("#root");
        outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");
        outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key
        return outboundEndpoint;
    }

ackTemplate() is set with cf that has springFactory.setPublisherConfirms(true);.

The problem I see is that once in 10 days, there are some messages that are stuck in unacknowledged state in rabbitmq.

My guess is that somehow publish of message is waiting for rabbit to do PUBLISHER CONFIRMS but it never gets it and times out? In this case, I never ACK message in queue1. Is this possible?

So just one more time complete workflow:

[two queues -> direct channel -> aggregator (keeps channel and tag values) -> publish to rabbit -> rabbit returns ACK via publisher confirms -> spring confirms all messages on channel+values that it kept in memory for aggregated message]

I also have my implementation of aggregator (since I need to manually ack messages from both q1 and q2):

public abstract class AbstractManualAckAggregatingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
    public static final String MANUAL_ACK_PAIRS = PREFIX + "manualAckPairs";
    private AckingState ackingState;

    public AbstractManualAckAggregatingMessageGroupProcessor(AckingState ackingState){
        this.ackingState = ackingState;
    }

    @Override
    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        Map<String, Object> aggregatedHeaders = super.aggregateHeaders(group);
        List<ManualAckPair> manualAckPairs = new ArrayList<>();
        group.getMessages().forEach(m -> {
            Channel channel = (Channel)m.getHeaders().get(AmqpHeaders.CHANNEL);
            Long deliveryTag = (Long)m.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            manualAckPairs.add(new ManualAckPair(channel, deliveryTag, ackingState));
        });
        aggregatedHeaders.put(MANUAL_ACK_PAIRS, manualAckPairs);
        return aggregatedHeaders;
    }
}

UPDATE

This is how rabbit admin looks (2 unacked messages for a long time, and it will not be ACKED untill restart - when it is redelivered): enter image description here

2
Looks like there is too much custom business logic. We need some project on GitHub to play with and possibly reproduce. As simple as possible. So, far no any ideas what is going on. You may play with different ConnectionFactory for the ackTemplate, so AMQP channels are not blocked while you're sending. - Artem Bilan
@ArtemBilan I uploaded my full flow, and you can also find 1 test that uses testcontaienrs (to run rabbit on local docker) and shows how aggregation works. Full flow desc. is at github.com/bojanv55/spring-integration-aggregate-ack/blob/… - Bojan Vukasovic
@ArtemBilan I just created in that project connection over toxi-proxy to simulate loss of publisher confirms messages. In this case - ACK from server related to delivery never comes to java app and my java app cannot manually confirm message in other queue. Is there option to wait for publisher confirms and if not received - repeat sending of the message? - Bojan Vukasovic

2 Answers

1
votes

In Spring AMQP version 2.1 (Spring Integration 5.1), We added a Future<?> and returned message to the CorrelationData to assist with this kind of thing. If you are using an older version, you can subclass CorrelationData (and you'd have to handle setting the future and returned message in your code).

This, together with a scheduled task, can detect missing acks...

@SpringBootApplication
@EnableScheduling
public class Igh2755Application {

    public static void main(String[] args) {
        SpringApplication.run(Igh2755Application.class, args);
    }

    private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            SuccessCallback<? super Confirm> successCallback = confirm -> {
                System.out.println((confirm.isAck() ? "A" : "Na") + "ck received");
            };
            FailureCallback failureCallback = throwable -> {
                System.out.println(throwable.getMessage());
            };

            // Good - ack
            CorrelationData correlationData = new CorrelationData("good");
            correlationData.getFuture().addCallback(successCallback, failureCallback);
            this.futures.put(correlationData);
            template.convertAndSend("", "foo", "data", correlationData);

            // Missing exchange nack, no return
            correlationData = new CorrelationData("missing exchange");
            correlationData.getFuture().addCallback(successCallback, failureCallback);
            this.futures.put(correlationData);
            template.convertAndSend("missing exchange", "foo", "data", correlationData);

            // Missing queue ack, with return
            correlationData = new CorrelationData("missing queue");
            correlationData.getFuture().addCallback(successCallback, failureCallback);
            this.futures.put(correlationData);
            template.convertAndSend("", "missing queue", "data", correlationData);
        };
    }

    @Scheduled(fixedDelay = 5_000)
    public void checkForMissingAcks() {
        System.out.println("Checking pending acks");
        CorrelationData correlationData = this.futures.poll();
        while (correlationData != null) {
            try {
                if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {
                    if (correlationData.getReturnedMessage() == null) {
                        System.out.println("Ack received OK for " + correlationData.getId());
                    }
                    else {
                        System.out.println("Message returned for " + correlationData.getId());
                    }
                }
                else {
                    System.out.println("Nack received for " + correlationData.getId());
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Interrupted");
            }
            catch (ExecutionException e) {
                System.out.println("Failed to get an ack " + e.getCause().getMessage());
            }
            catch (TimeoutException e) {
                System.out.println("Timed out waiting for ack for " + correlationData.getId());
            }
            correlationData = this.futures.poll();
        }
        System.out.println("No pending acks, exiting");
    }

}

.

Checking pending acks
Ack received OK for good
Nack received for missing exchange
Message returned for missing queue
No pending acks, exiting

With Spring Integration there is a confirmCorrelationExpression which can be used to create the CorrelationData instance.

EDIT

With Spring Integration...

@SpringBootApplication
@EnableScheduling
public class Igh2755Application {

    public static void main(String[] args) {
        SpringApplication.run(Igh2755Application.class, args);
    }

    private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();

    public interface Gate {

        void send(@Header("exchange") String exchange, @Header("rk") String rk, String payload);

    }

    @Bean
    @DependsOn("flow")
    public ApplicationRunner runner(Gate gate) {
        return args -> {
            gate.send("", "foo", "good");
            gate.send("junque", "rk", "missing exchange");
            gate.send("", "junque", "missing queue");
        };
    }

    @Bean
    public IntegrationFlow flow(RabbitTemplate template) {
        return IntegrationFlows.from(Gate.class)
                    .handle(Amqp.outboundAdapter(template)
                            .confirmCorrelationExpression("@correlationCreator.create(#root)")
                            .exchangeNameExpression("headers.exchange")
                            .routingKeyExpression("headers.rk")
                            .returnChannel(returns())
                            .confirmAckChannel(acks())
                            .confirmNackChannel(acks()))
                    .get();
    }

    @Bean
    public MessageChannel acks() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel returns() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow ackFlow() {
        return IntegrationFlows.from("acks")
                /*
                 * Work around a bug because the correlation data is wrapped and so the
                 * wrong future is completed.
                 */
                .handle(m -> {
                    System.out.println(m);
                    if (m instanceof ErrorMessage) { // NACK
                        NackedAmqpMessageException nme = (NackedAmqpMessageException) m.getPayload();
                        CorrelationData correlationData = (CorrelationData) nme.getCorrelationData();
                        correlationData.getFuture().set(new Confirm(false, "Message was returned"));
                    }
                    else {
                        ((CorrelationData) m.getPayload()).getFuture().set(new Confirm(true, null));
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow retFlow() {
        return IntegrationFlows.from("returns")
                .handle(System.out::println)
                .get();
    }

    @Bean
    public CorrelationCreator correlationCreator() {
        return new CorrelationCreator(this.futures);
    }

    public static class CorrelationCreator {

        private final BlockingQueue<CorrelationData> futures;

        public CorrelationCreator(BlockingQueue<CorrelationData> futures) {
            this.futures = futures;
        }

        public CorrelationData create(Message<String> message) {
            CorrelationData data = new CorrelationData(message.getPayload());
            this.futures.add(data);
            return data;
        }

    }

    @Scheduled(fixedDelay = 5_000)
    public void checkForMissingAcks() {
        System.out.println("Checking pending acks");
        CorrelationData correlationData = this.futures.poll();
        while (correlationData != null) {
            try {
                if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {
                    if (correlationData.getReturnedMessage() == null
                            && !correlationData.getId().equals("Message was returned")) {
                        System.out.println("Ack received OK for " + correlationData.getId());
                    }
                    else {
                        System.out.println("Message returned for " + correlationData.getId());
                    }
                }
                else {
                    System.out.println("Nack received for " + correlationData.getId());
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Interrupted");
            }
            catch (ExecutionException e) {
                System.out.println("Failed to get an ack " + e.getCause().getMessage());

            }
            catch (TimeoutException e) {
                System.out.println("Timed out waiting for ack for " + correlationData.getId());
            }
            correlationData = this.futures.poll();
        }
        System.out.println("No pending acks, exiting");
    }

}
0
votes

you can declare connection as bean

@Bean
public ConnectionFactory createConnectionFactory(){
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("/");
    connectionFactory.setPublisherReturns(true);
    connectionFactory.setPublisherConfirmType(ConfirmType.SIMPLE);
    return connectionFactory;
}

Then RabbitTemplate as

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setConfirmCallback(callback);
    return rabbitTemplate;
}

Where callback is implementation of ConfirmCallback interface

and while sending you can just wait for confirmation

System.out.println("Sending message...");
        rabbitTemplate.convertAndSend(rabbitMQProperties.getEXCHANGENAME(), 
                rabbitMQProperties.getQUEUENAME(), "hello from rabbit");
        rabbitTemplate.waitForConfirms(1);

waitforconfirms will take time in milisecond. I put it as 1 for testing purpose.