1
votes

I want to create a IntegrationFlow flow for the below flow.

enter image description here

  1. From Start to Deliver is the Sync flow.
  2. How to extract / fork a async end node from Build Items and Validate Items.

	@Bean
	public IntegrationFlow buildCart() {
		return f -> f.handle(validate, "buildPreCheck")
.handle(preProcessProcessor)
.handle(getOffersProcessor)
.handle(buildItems)
**.wireTap(log())**
.handle(validateItems)
.handle(deliver);
	}

EDIT:

Hi Artem, I added Wire Tap as in the below code. Still its excuting the WireTap node as Sequencal and waiting for that node.

Please help in making it a Aysnc node.

@Bean
public IntegrationFlow log() {
    return f -> f.handle(auditProcessor).channel("nullChannel");
}



@ServiceActivator
@Description("Call and get the Offers Request")
public void getDetails(Message<Context> message) throws InterruptedException {
    log.info("getDetails-Sleep-Start");
    Thread.sleep(3000);
    log.info("getDetails-Sleep-End");
}
1
You definitely need to add a comment to my answer, so I will be notified. Otherwise your edit will stay without attention.Artem Bilan

1 Answers

3
votes

With the Spring Integration Java DSL we all are missing that one of the very important components in Spring Integration is a MessageChannel. And the fact is that channels can be added to the flow whenever we need more than a default DirectChannel. For async execution we have an ExecutorChannel. But before we go that forked flow we need somehow to jump there without breaking a main one. In terms of EIP this is called Wire-Tap: https://www.enterpriseintegrationpatterns.com/patterns/messaging/WireTap.html.

Spring Integration Java DSL suggests an implementation like a .wireTap() operator in the flow. The audit logic can be implemented in the tapped sub-flow or via channel, but don’t forget about an ExecutorChannel!

You can see more info in the Reference Manual: https://docs.spring.io/spring-integration/reference/html/java-dsl.html#java-dsl-wiretap

UPDATE

The

.handle(buildItems)
.wireTap(log())

is correct way to go: you are going to audit the result of the buildItems and proceed to the next step.

The log() must be modified like this:

@Bean
public IntegrationFlow log() {
    return f -> f.channel(c -> c.executor(taskExecutorBean())).handle(auditProcessor).channel("nullChannel");
}

Pay attention to the c.executor(). This way we are adding an async hand off for our log() sub-flow.