I may just be missing something very simple here (or misusing something), but I was attempting to set up two direct channels such that one flow would pass some data to each sequentially. So using the Spring Integration JAVA DSL I had something like this (significantly simplified for this example):
public static final String TEST_CHANNEL = "testGateway";
public static final String TEST_UPPER_CHANNEL = "testChannelUpper";
public static final String TEST_LOWER_CHANNEL = "testChannelLower";
@Bean(name = TEST_CHANNEL)
public MessageChannel testGatewayChannel() {
return MessageChannels.direct(TEST_CHANNEL).get();
}
@Bean(name = TEST_UPPER_CHANNEL)
public MessageChannel testChannelUpperChannel() {
return MessageChannels.direct(TEST_UPPER_CHANNEL).get();
}
@Bean(name = TEST_LOWER_CHANNEL)
public MessageChannel testChannelLowerChannel() {
return MessageChannels.direct(TEST_LOWER_CHANNEL).get();
}
@Bean
public IntegrationFlow testFlow() {
return IntegrationFlows
.from(TEST_CHANNEL)
.channel(TEST_UPPER_CHANNEL)
.channel(TEST_LOWER_CHANNEL)
.get();
}
@Bean
public IntegrationFlow testUpperFlow() {
return IntegrationFlows
.from(TEST_UPPER_CHANNEL)
.<String, String>transform(String::toUpperCase)
.handle(System.out::println)
.get();
}
@Bean
public IntegrationFlow testLowerFlow() {
return IntegrationFlows
.from(TEST_LOWER_CHANNEL)
.<String, String>transform(String::toLowerCase)
.handle(System.out::println)
.get();
}
I'm using a REST endpoint to invoke the flow via a gateway, but when I do so it seems only one of the channels is invoked. The channel also seems to be random across invocations (sometimes going to the testChannelUpper and sometimes to the testChannelLower).
I basically end up with this across the executions : (each time I am just hitting this endpoint http://localhost:9090/test?test=HellOoi)
Execution 1: GenericMessage [payload=HELLOOI, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testUpperFlow.channel#0, id=4aa7b075-23cc-6ab3-10a1-c7cb73bae49b, timestamp=1447686848477}]
Execution 2: GenericMessage [payload=HELLOOI, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testUpperFlow.channel#0, id=a18dcd01-da18-b00d-30c0-e1a03ce19104, timestamp=1447686853549}]
Execution 3: GenericMessage [payload=hellooi, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testLowerFlow.channel#0, id=5f0abcb9-378e-7a3c-9c93-a04ff6352927, timestamp=1447686857545}]
I believe that what I'm attempting here is also shown in the channelFlow example of the DSL wiki : https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference
Sooo the specs on what I'm using are :
Spring Boot v1.2.2.RELEASE
Spring v4.1.5.RELEASE
spring-integration-java-dsl 1.0.2.RELEASE
JDK 1.8.0_40-b25
So... has anyone else seen this kind of behavior? Am I just abusing the channel implementation? Any other ideas? Thanks in advance!
As Gary pointed out the best way to do this is to have a pub-sub and order the consumers on this :
@Bean(name = TEST_CHANNEL)
public MessageChannel testGatewayChannel() {
return MessageChannels.publishSubscribe(TEST_CHANNEL).get();
}
@Bean
public IntegrationFlow testUpperFlow() {
return IntegrationFlows
.from(TEST_CHANNEL)
.<String, String>transform(String::toUpperCase, e -> e.order(1))
.handle(System.out::println)
.get();
}
@Bean
public IntegrationFlow testLowerFlow() {
return IntegrationFlows
.from(TEST_CHANNEL)
.<String, String>transform(String::toLowerCase, e -> e.order(2))
.handle(System.out::println)
.get();
}