1
votes

I may just be missing something very simple here (or misusing something), but I was attempting to set up two direct channels such that one flow would pass some data to each sequentially. So using the Spring Integration JAVA DSL I had something like this (significantly simplified for this example):

   public static final String TEST_CHANNEL = "testGateway";
public static final String TEST_UPPER_CHANNEL = "testChannelUpper";
public static final String TEST_LOWER_CHANNEL = "testChannelLower";

@Bean(name = TEST_CHANNEL)
public MessageChannel testGatewayChannel() {
    return MessageChannels.direct(TEST_CHANNEL).get();
}

@Bean(name = TEST_UPPER_CHANNEL)
public MessageChannel testChannelUpperChannel() {
    return MessageChannels.direct(TEST_UPPER_CHANNEL).get();
}

@Bean(name = TEST_LOWER_CHANNEL)
public MessageChannel testChannelLowerChannel() {
    return MessageChannels.direct(TEST_LOWER_CHANNEL).get();
}


@Bean
public IntegrationFlow testFlow() {
    return IntegrationFlows
            .from(TEST_CHANNEL)
            .channel(TEST_UPPER_CHANNEL)
            .channel(TEST_LOWER_CHANNEL)
            .get();
}


@Bean
public IntegrationFlow testUpperFlow() {
    return IntegrationFlows
            .from(TEST_UPPER_CHANNEL)
            .<String, String>transform(String::toUpperCase)
            .handle(System.out::println)
            .get();
}

@Bean
public IntegrationFlow testLowerFlow() {
    return IntegrationFlows
            .from(TEST_LOWER_CHANNEL)
            .<String, String>transform(String::toLowerCase)
            .handle(System.out::println)
            .get();
}

I'm using a REST endpoint to invoke the flow via a gateway, but when I do so it seems only one of the channels is invoked. The channel also seems to be random across invocations (sometimes going to the testChannelUpper and sometimes to the testChannelLower).

I basically end up with this across the executions : (each time I am just hitting this endpoint http://localhost:9090/test?test=HellOoi)

Execution 1: GenericMessage [payload=HELLOOI, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testUpperFlow.channel#0, id=4aa7b075-23cc-6ab3-10a1-c7cb73bae49b, timestamp=1447686848477}]

Execution 2: GenericMessage [payload=HELLOOI, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testUpperFlow.channel#0, id=a18dcd01-da18-b00d-30c0-e1a03ce19104, timestamp=1447686853549}]

Execution 3: GenericMessage [payload=hellooi, headers={jobName=someActivity, history=someGateway,testGateway,testChannelUpper,testLowerFlow.channel#0, id=5f0abcb9-378e-7a3c-9c93-a04ff6352927, timestamp=1447686857545}]

I believe that what I'm attempting here is also shown in the channelFlow example of the DSL wiki : https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference

Sooo the specs on what I'm using are :

Spring Boot v1.2.2.RELEASE

Spring v4.1.5.RELEASE

spring-integration-java-dsl 1.0.2.RELEASE

JDK 1.8.0_40-b25

So... has anyone else seen this kind of behavior? Am I just abusing the channel implementation? Any other ideas? Thanks in advance!


As Gary pointed out the best way to do this is to have a pub-sub and order the consumers on this :

    @Bean(name = TEST_CHANNEL)
public MessageChannel testGatewayChannel() {
    return MessageChannels.publishSubscribe(TEST_CHANNEL).get();
}


@Bean
public IntegrationFlow testUpperFlow() {
    return IntegrationFlows
            .from(TEST_CHANNEL)
            .<String, String>transform(String::toUpperCase, e -> e.order(1))
            .handle(System.out::println)
            .get();
}

@Bean
public IntegrationFlow testLowerFlow() {
    return IntegrationFlows
            .from(TEST_CHANNEL)
            .<String, String>transform(String::toLowerCase, e -> e.order(2))
            .handle(System.out::println)
            .get();
}
1

1 Answers

0
votes

What is the purpose of this...

@Bean
public IntegrationFlow testFlow() {
    return IntegrationFlows
        .from(TEST_CHANNEL).fixedSubscriberChannel()
        .channel(TEST_UPPER_CHANNEL)
        .channel(TEST_LOWER_CHANNEL)
        .get();
}

?

All that does is bridge the three channels together.

In fact, you end up with 2 consumers on TEST_UPPER_CHANNEL- the bridge in this flow and the transformer in your other flow.

By default, dispatching in direct channels uses round robin distribution. So the first message will go to the bridge, the next to the transformer, etc, etc.