1
votes

I am trying to understand following example: https://github.com/spring-projects/spring-integration-samples/tree/d8e71c687e86e7a7e35d515824832a92df9d4638/basic/barrier

and rewrite it using java DSL.

First of all I want to understand what is going in that example. Sure I've read explanation from github but I still don't get the idea. I ask here because I know that SI team leads(authors of that example) answer all question tagged spring-integration.

server configuration:

<int-http:inbound-gateway request-channel="receiveChannel"
                            path="/postGateway"
                            error-channel="errorChannel"
                            supported-methods="POST"/>

    <int-http:inbound-gateway request-channel="createPayload"
                            path="/getGateway"
                            error-channel="errorChannel"
                            supported-methods="GET"/>

    <int:transformer input-channel="createPayload" output-channel="receiveChannel" expression="'A,B,C'" />

    <int:channel id="receiveChannel" />

    <int:header-enricher input-channel="receiveChannel" output-channel="processChannel">
        <int:header name="ackCorrelation" expression="headers['id']" />
    </int:header-enricher>

    <int:publish-subscribe-channel id="processChannel" />

    <int:chain input-channel="processChannel" order="1">
        <int:header-filter header-names="content-type, content-length" />
        <int:splitter delimiters="," />
        <int-amqp:outbound-channel-adapter amqp-template="rabbitTemplate"
            exchange-name="barrier.sample.exchange" routing-key="barrier.sample.key"
            confirm-ack-channel="confirmations"
            confirm-nack-channel="confirmations"
            return-channel="errorChannel"
            confirm-correlation-expression="#this"/>
    </int:chain>

    <!-- Suspend the HTTP thread until the publisher confirms are asynchronously received -->

    <int:barrier id="barrier" input-channel="processChannel" order="2"
        correlation-strategy-expression="headers['ackCorrelation']"
        output-channel="transform" timeout="10000" />

    <int:transformer input-channel="transform" expression="payload[1]" />

    <!-- Aggregate the publisher confirms and send the result to the barrier release channel -->

    <int:chain input-channel="confirmations" output-channel="release">
        <int:header-filter header-names="replyChannel, errorChannel" />
        <int:service-activator expression="payload" /> <!-- INT-3791; use s-a to retain ack header -->
        <int:aggregator>
            <bean class="org.springframework.integration.samples.barrier.AckAggregator" />
        </int:aggregator>
    </int:chain>

    <int:channel id="release" />

    <int:outbound-channel-adapter channel="release" ref="barrier.handler" method="trigger" />

    <!-- Consumer -> nullChannel -->

    <int-amqp:inbound-channel-adapter channel="nullChannel"
        queue-names="barrier.sample.queue"
        connection-factory="rabbitConnectionFactory" />

    <!-- Infrastructure -->

    <rabbit:queue name="barrier.sample.queue" auto-delete="true" />

    <rabbit:direct-exchange name="barrier.sample.exchange" auto-delete="true">
        <rabbit:bindings>
            <rabbit:binding queue="barrier.sample.queue" key="barrier.sample.key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

As far I understand there are 3 parties:

  • client
  • server
  • rabbitMq

Client sends A,B,C to the server

RequestGateway requestGateway = client.getBean("requestGateway", RequestGateway.class);
String request = "A,B,C";
System.out.println("\n\n++++++++++++ Sending: " + request + " ++++++++++++\n");
String reply = requestGateway.echo(request);

Server accepts that request:

<int-http:inbound-gateway request-channel="receiveChannel"
                        path="/postGateway"
                        error-channel="errorChannel"
                        supported-methods="POST"

then some values are added to the message headers:

<int:header-enricher input-channel="receiveChannel" output-channel="processChannel">
        <int:header name="ackCorrelation" expression="headers['id']" />
    </int:header-enricher>

some processing:

<int:chain input-channel="processChannel" order="1">
        <int:header-filter header-names="content-type, content-length" />
        <int:splitter delimiters="," />
        <int-amqp:outbound-channel-adapter amqp-template="rabbitTemplate"
            exchange-name="barrier.sample.exchange" routing-key="barrier.sample.key"
            confirm-ack-channel="confirmations"
            confirm-nack-channel="confirmations"
            return-channel="errorChannel"
            confirm-correlation-expression="#this"/>
    </int:chain>

If I am not mistaken - message splitted by , so we have 3 messages A B and C and send into rabbitMq exchange barrier.sample.exchange with keybarrier.sample.key

That so. Next steps unclear for me. Could you please clarify ?

P.S.

I understand that we want to await all ACKs messages from rabbit to be sure that messages we delivered to the rabbit and when we get all ACKs from rabbit we want to answer to client. But I don't understand SI components definition which lead to that result. Could you provide any schema/picture/explanation how it happens ?

More concrete questions:

Why do we need enrich headers and how we do it ?

What is the aim of receiveChannel ? - stupid question

Where are 2 http services - /postGateway and /getGateway. /postGateway is used for getting msg from client, splitting into parts, send to the rabbit, get Acks and then answer to the client back. But what is the aim of /getGateway ?

P.S.

For now my flow declration looks like this:

    @Bean
public IntegrationFlow integrationFlow() {
    return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(String.class))
            .enrich(enricherSpec -> {
                enricherSpec.header("correlationId", 1); //or ackCorrelationId ?
            })
            .split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
            .log()
            //.barrier(1000L) is it correct place for barrier?
            .log()
            .handle(Amqp.outboundAdapter(amqpTemplate())
                    .exchangeName("barrierExchange")
                    .routingKey("barrierKey"))
            .get();
}

I don't know how to use barrier here.

From server side everything is partially working - I see messages in queue(on rabbitMq web interface) but from client side I started to see following stacktrace:

2019-08-28 22:38:43.432 ERROR 12936 --- [ask-scheduler-8] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: HTTP request execution failed for URI [http://localhost:8080/spring_integration_post]; nested exception is org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null, failedMessage=GenericMessage [payload=6, headers={id=36781a7f-3d4f-e17d-60e6-33450c9307e4, timestamp=1567021122424}]
    at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:171)
    at org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler.handleRequestMessage(AbstractHttpRequestExecutingMessageHandler.java:289)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:234)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null
    at org.springframework.web.client.HttpServerErrorException.create(HttpServerErrorException.java:79)
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:124)
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:102)
    at org.springframework.web.client.ResponseErrorHandler.handleError(ResponseErrorHandler.java:63)
    at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:778)
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:736)
    at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:710)
    at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:598)
    at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:165)
    ... 30 more
1

1 Answers

0
votes

It's quite straightforward; it's not clear what you don't understand.

  • we get a request from http
  • we add a correlation header to the message (header enricher)
  • we send 3 messages to rabbitmq
  • we need to wait for confirmations - so the HTTP thread is suspended in the barrier
  • asynchronously the 3 confirmations come back
  • we aggregate the responses into a single message (using the correlation header)
  • we send that message to the barrier which releases the HTTP thread

The receiveChannel is the input channel to header enricher.

You need to add the flow to receive the confirmations, aggregate, and trigger the barrier.