1
votes

I load xml configuration files from "dynConfigFlows"-directory and then create IntegrationFlows dynamically.
One IntegrationFlow per xml file. If config file was edited then IntegrationFlow should be updated. IntegrationFlow's registration id is set to config file name.
So current flow should be smoothly stopped (no input messages more and currently processed should be completed). After this it should be removed and updated version of the flow should be registered.
Now the questions:
a) In JUnit test below I'm able to register two flows with the same ID in case I do not call registration.remove(id); Is it considered behavior? Shouldn't ids be unique?
b) After call of remove(...) I try to send one more message to removed InputChannel. I get an error but I still see log output from working removed flow.

How should I implement refresh / reload functionality of dynamic IntegrationFlows?

@Test
public void testRegisterUnregister() {
    final String FLOW_ID = "regFlow";
    Flux<Message<?>> messageFlux = Flux.just("1,2,3,4").map(v -> v.split(",")).flatMapIterable(Arrays::asList)
            .map(Integer::parseInt).map(GenericMessage<Integer>::new);

    QueueChannel resultChannel = new QueueChannel();

    IntegrationFlow regFlow = createFlow("first", messageFlux, resultChannel);

    IntegrationFlowRegistration register1 = this.flowContext.registration(regFlow).id(FLOW_ID).register();
    final String FIRST_ID = register1.getId();
    System.out.println("!!!!!!!!!!!!!!!! register flow: "+FIRST_ID);
    // validate incoming of 4 integers
    assertThat(resultChannel.getQueueSize()).as("queueSize").isEqualTo(4);
    // save reference to first InputChannel
    MessageChannel input1 = register1.getInputChannel();

    // stop and remove first flow
    register1.stop();
    this.flowContext.remove(FIRST_ID);
    // wait for the case if flows shutdown needs some time
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    // create second flow
    regFlow = createFlow("second", messageFlux, resultChannel);
    IntegrationFlowRegistration register2 = this.flowContext.registration(regFlow).id(FLOW_ID).register();
    System.out.println("!!!!!!!!!!!!!!!! register flow: "+register2.getId());
    // send extra message to the second flow
    register2.getInputChannel().send(MessageBuilder.withPayload("!!!!!!!!!!! new registered flow message").build());
    try {
        // send extra message to the first flow. It should fail because first flow is removed.
        // It fails but we still see "!!!!!!!!!!! new registered flow message" from first flow. It should't happen!
        input1.send(MessageBuilder.withPayload("!!!!!!!!!!! new registered flow message").build());
        Assert.fail("we should get an exception because first flow is removed");
    } catch (MessageDeliveryException e) {
        e.printStackTrace();
    }

    assertThat(resultChannel.getQueueSize()).as("queueSize=2 x 4 + 1").isEqualTo(9);
}

private IntegrationFlow createFlow(String markerText, Flux<Message<?>> messageFlux, QueueChannel resultChannel) {
    return IntegrationFlows
            .from(messageFlux)
            .log(l -> "!!!!!!!!!!!!!!!!!!!!!!! "+markerText+"="+l)
            .channel(resultChannel)
            .get();
}
1

1 Answers

1
votes

The first concern is definitely a bug: https://jira.spring.io/browse/INT-4413.

We can override the existing beans in the application context because it is allowed by default:

if (oldBeanDefinition != null) {
        if (!isAllowBeanDefinitionOverriding()) {
            throw new BeanDefinitionStoreException(beanDefinition.getResourceDescription(), beanName,
                    "Cannot register bean definition [" + beanDefinition + "] for bean '" + beanName +
                    "': There is already [" + oldBeanDefinition + "] bound.");
        }

However in't not OK from the IntegrationFlowContext. We will reject that state in the mentioned JIRA.

The second issue is not related to the stop or destroy. It is about a ChannelInterceptor which is exactly what we have with the .log() operator: https://docs.spring.io/spring-integration/docs/5.0.2.RELEASE/reference/html/java-dsl.html#java-dsl-log. In other words it's not a subscriber to the channel. That's why get your logs on the matter from the .log(), but still fail with the doesn't have subscribers to accept messages, because the flow has been destroyed.

On the other hand it is a bit confusing for you because we don't have beans, but we still have objects. To remove it completely we should null all the variables we have on the matter :-).