I am trying to understand following example: https://github.com/spring-projects/spring-integration-samples/tree/d8e71c687e86e7a7e35d515824832a92df9d4638/basic/barrier
and rewrite it using java DSL.
First of all I want to understand what is going in that example. Sure I've read explanation from github but I still don't get the idea. I ask here because I know that SI team leads(authors of that example) answer all question tagged spring-integration.
server configuration:
<int-http:inbound-gateway request-channel="receiveChannel"
path="/postGateway"
error-channel="errorChannel"
supported-methods="POST"/>
<int-http:inbound-gateway request-channel="createPayload"
path="/getGateway"
error-channel="errorChannel"
supported-methods="GET"/>
<int:transformer input-channel="createPayload" output-channel="receiveChannel" expression="'A,B,C'" />
<int:channel id="receiveChannel" />
<int:header-enricher input-channel="receiveChannel" output-channel="processChannel">
<int:header name="ackCorrelation" expression="headers['id']" />
</int:header-enricher>
<int:publish-subscribe-channel id="processChannel" />
<int:chain input-channel="processChannel" order="1">
<int:header-filter header-names="content-type, content-length" />
<int:splitter delimiters="," />
<int-amqp:outbound-channel-adapter amqp-template="rabbitTemplate"
exchange-name="barrier.sample.exchange" routing-key="barrier.sample.key"
confirm-ack-channel="confirmations"
confirm-nack-channel="confirmations"
return-channel="errorChannel"
confirm-correlation-expression="#this"/>
</int:chain>
<!-- Suspend the HTTP thread until the publisher confirms are asynchronously received -->
<int:barrier id="barrier" input-channel="processChannel" order="2"
correlation-strategy-expression="headers['ackCorrelation']"
output-channel="transform" timeout="10000" />
<int:transformer input-channel="transform" expression="payload[1]" />
<!-- Aggregate the publisher confirms and send the result to the barrier release channel -->
<int:chain input-channel="confirmations" output-channel="release">
<int:header-filter header-names="replyChannel, errorChannel" />
<int:service-activator expression="payload" /> <!-- INT-3791; use s-a to retain ack header -->
<int:aggregator>
<bean class="org.springframework.integration.samples.barrier.AckAggregator" />
</int:aggregator>
</int:chain>
<int:channel id="release" />
<int:outbound-channel-adapter channel="release" ref="barrier.handler" method="trigger" />
<!-- Consumer -> nullChannel -->
<int-amqp:inbound-channel-adapter channel="nullChannel"
queue-names="barrier.sample.queue"
connection-factory="rabbitConnectionFactory" />
<!-- Infrastructure -->
<rabbit:queue name="barrier.sample.queue" auto-delete="true" />
<rabbit:direct-exchange name="barrier.sample.exchange" auto-delete="true">
<rabbit:bindings>
<rabbit:binding queue="barrier.sample.queue" key="barrier.sample.key" />
</rabbit:bindings>
</rabbit:direct-exchange>
As far I understand there are 3 parties:
- client
- server
- rabbitMq
Client sends A,B,C
to the server
RequestGateway requestGateway = client.getBean("requestGateway", RequestGateway.class);
String request = "A,B,C";
System.out.println("\n\n++++++++++++ Sending: " + request + " ++++++++++++\n");
String reply = requestGateway.echo(request);
Server accepts that request:
<int-http:inbound-gateway request-channel="receiveChannel"
path="/postGateway"
error-channel="errorChannel"
supported-methods="POST"
then some values are added to the message headers:
<int:header-enricher input-channel="receiveChannel" output-channel="processChannel">
<int:header name="ackCorrelation" expression="headers['id']" />
</int:header-enricher>
some processing:
<int:chain input-channel="processChannel" order="1">
<int:header-filter header-names="content-type, content-length" />
<int:splitter delimiters="," />
<int-amqp:outbound-channel-adapter amqp-template="rabbitTemplate"
exchange-name="barrier.sample.exchange" routing-key="barrier.sample.key"
confirm-ack-channel="confirmations"
confirm-nack-channel="confirmations"
return-channel="errorChannel"
confirm-correlation-expression="#this"/>
</int:chain>
If I am not mistaken - message splitted by ,
so we have 3 messages A
B
and C
and send into rabbitMq exchange barrier.sample.exchange
with keybarrier.sample.key
That so. Next steps unclear for me. Could you please clarify ?
P.S.
I understand that we want to await all ACKs messages from rabbit to be sure that messages we delivered to the rabbit and when we get all ACKs from rabbit we want to answer to client. But I don't understand SI components definition which lead to that result. Could you provide any schema/picture/explanation how it happens ?
More concrete questions:
Why do we need enrich headers and how we do it ?
What is the aim of receiveChannel ? - stupid question
Where are 2 http services - /postGateway
and /getGateway
. /postGateway
is used for getting msg from client, splitting into parts, send to the rabbit, get Acks and then answer to the client back. But what is the aim of /getGateway
?
P.S.
For now my flow declration looks like this:
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.enrich(enricherSpec -> {
enricherSpec.header("correlationId", 1); //or ackCorrelationId ?
})
.split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
.log()
//.barrier(1000L) is it correct place for barrier?
.log()
.handle(Amqp.outboundAdapter(amqpTemplate())
.exchangeName("barrierExchange")
.routingKey("barrierKey"))
.get();
}
I don't know how to use barrier here.
From server side everything is partially working - I see messages in queue(on rabbitMq web interface) but from client side I started to see following stacktrace:
2019-08-28 22:38:43.432 ERROR 12936 --- [ask-scheduler-8] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: HTTP request execution failed for URI [http://localhost:8080/spring_integration_post]; nested exception is org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null, failedMessage=GenericMessage [payload=6, headers={id=36781a7f-3d4f-e17d-60e6-33450c9307e4, timestamp=1567021122424}]
at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:171)
at org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler.handleRequestMessage(AbstractHttpRequestExecutingMessageHandler.java:289)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:234)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null
at org.springframework.web.client.HttpServerErrorException.create(HttpServerErrorException.java:79)
at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:124)
at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:102)
at org.springframework.web.client.ResponseErrorHandler.handleError(ResponseErrorHandler.java:63)
at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:778)
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:736)
at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:710)
at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:598)
at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:165)
... 30 more