1
votes

When using the Spring Integration DSL builder pattern, often it will fill in the needed channels between elements "automagically". However, sometimes it does not.

At the high level, the wrapping application keeps metadata in a database for creating and destroying flows dynamically as needed across platforms we have (potentially) never seen before. For this reason, the flows are not suitable to instantiate using static notations such as @Bean, but must be dynamically created and destroyed, and registered/deregistered in the spring context at runtime.

I have a published message channel used in the dynamically created main flow, and a channel in the dynamically created subflow, but I can't see how to subscribe to the mainPublishChannel from the subFlow.

This leaves me pushing messages into the channel, but with no subscriptions nothing happens.

Thanks in advance.


Some prior research (not an exhaustive listing:

https://github.com/spring-projects/spring-integration-flow

https://dzone.com/articles/spring-integration-building

https://xpadro.com/2014/05/spring-integration-4-0-a-complete-xml-free-example.html

Spring integration gateway "Dispatcher has no subscribers"

Spring Integration - How to debug 'Dispatcher has no Subscribers'?


Log snip

task-scheduler-1 2020-12-31 00:25:32,526 INFO  o.s.i.g.GatewayProxyFactoryBean - started b653ca1c-038d-4567-bd4e-4c16ecc502a3.org.springframework.integration.config.ConsumerEndpointFactoryBean#3#gpfb
task-scheduler-1 2020-12-31 00:25:32,538 DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'b653ca1c-038d-4567-bd4e-4c16ecc502a3.mainPublishChannel', message: GenericMessage [payload=[{... timestamp=1609395932538}]
task-scheduler-1 2020-12-31 00:25:32,539 DEBUG o.s.i.d.BroadcastingDispatcher - No subscribers, default behavior is ignore
task-scheduler-1 2020-12-31 00:25:32,539 DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'b653ca1c-038d-4567-bd4e-4c16ecc502a3.mainPublishChannel', message: GenericMessage [payload=[{aaa=ee}], headers={aaa=ee, sequenceNumber=1, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7771e96a, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7771e96a, sequenceSize=2, yyy=2020-12-24 11:15:30.915278, correlationId=0eef0e4e-768c-90db-fa7b-2d1767335b26, timestamp=1609395932538}]

Code Snip:

    String channelId=getId().toString()+'.'+"mainPublishChannel";
    MessageChannel channel = MessageChannels.publishSubscribe(channelId, stepTaskExecutor).get();

    final IntegrationFlowBuilder bldr = IntegrationFlows
        .from(setupAdapter,
                c -> c.poller(Pollers.fixedRate(pollerFixedRate, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
        .enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get(\"xxx\")")
                .headerExpression("yyy", "payload[0].get(\"yyy\")"))
        .split(tableSplitter)
        .gateway(channel)   
        .routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules, channel)
                    ) 
        .aggregate()
        .handle(cleanupAdapter)
        ;
...
snip
...
private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
        Collection<RuleMetadata> rules, MessageChannel publishedChannel) {

    // ??? How to subscribe this to publishedChannel??
    recipientListSpec
        .recipient(MessageChannels.publishSubscribe(this.getId().toString()+'.'+"mainReceiveChannel", stepTaskExecutor).get());
    
    rules.forEach(
            rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));
            
    recipientListSpec
        .ignoreSendFailures(true)
        .defaultOutputToParentFlow();
    
    return recipientListSpec;
}
1

1 Answers

1
votes

The publishedChannel must be passed to the child flow as the input channel

        return flowDef
            .channel(receiveChannel) //  <---- This is the reference to the main publish channel in the child flow, which allows the builder to create the subscription
            .log()
            .handle(inboundAdapter)
... snip ...

            ;