0
votes

I am trying to create a flow(1) in which message is received from TCP adapter which can be client or server and it sends the message to ActiveMQ broker.

My another flow(2) pick the message from required queue and send to the destination

TCP(client/server) ==(1)==> ActiveMQ Broker ==(2)==> HTTP Outbound adapter

I want to ensure that in case my message is not delivered to the required destination then it re-attempt to send the message again.

My current flow(1) to broker is :

IntegrationFlow flow = IntegrationFlows
            .from(Tcp
                    .inboundAdapter(Tcp.netServer(Integer.parseInt(1234))
                            .serializer(customSerializer).deserializer(customSerializer)
                            .id("server").soTimeout(5000))
                    .id(hostConnection.getConnectionNumber() + "adapter"))).channel(directChannel())
            .wireTap("tcpInboundMessageLogChannel").channel(directChannel())
            .handle(Jms.outboundAdapter(activeMQConnectionFactory)
                    .destination("jmsInbound"))
            .get();

    this.flowContext.registration(flow).id("outflow").register();

and My flow(2) from broker to http outbound :

flow = IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(activeMQConnectionFactory)
                    .destination("jmsInbound"))
            .channel(directChannel())
            .handle(Http.outboundChannelAdapter(hostConnection.getUrl()).httpMethod(HttpMethod.POST)
                    .expectedResponseType(String.class)
                    .mappedRequestHeaders("abc"))
            .get();
    this.flowContext.registration(flow).id("inflow").register();

Issue:

  • In case of any exception during delivery for example my destination URL is not working then it re attempt to send the message.

  • After unsuccessfull attempt it retry 7 times i.e max attempt to 7

  • If still the attempt is not successful then it send the message to ActiveMQ.DLQ (Dead letter Queue) and does not re-attempt again as message is dequeued from actual queue and send to ActiveMQ.DLQ.

So, i want the scenario that no message will be lost and message will be processed in order.

1

1 Answers

0
votes

First: I believe that you can configure jmsInbound for the infinite retries:

/**
 * Configuration options for a messageConsumer used to control how messages are re-delivered when they
 * are rolled back.
 * May be used server side on a per destination basis via the Broker RedeliveryPlugin
 *
 * @org.apache.xbean.XBean element="redeliveryPolicy"
 *
 */
public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {

On the other hand you can configure a .handle(Http.outboundChannelAdapter( for the RequestHandlerRetryAdvice for similar retry behavior but inside the application without round trips to the JMS and back: https://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/messaging-endpoints-chapter.html#retry-advice

Here is some sample how it can be configured from the Java DSL perspective:

    @Bean
    public IntegrationFlow errorRecovererFlow() {
        return IntegrationFlows.from(Function.class, "errorRecovererFunction")
                .handle((GenericHandler<?>) (p, h) -> {
                    throw new RuntimeException("intentional");
                }, e -> e.advice(retryAdvice()))
                .get();
    }

    @Bean
    public RequestHandlerRetryAdvice retryAdvice() {
        RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
        requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
        return requestHandlerRetryAdvice;
    }

    @Bean
    public MessageChannel recoveryChannel() {
        return new DirectChannel();
    }

The RequestHandlerRetryAdvice can be configured with the RetryTemplate to apply something like AlwaysRetryPolicy. See Spring Retry project for more info: https://github.com/spring-projects/spring-retry