I load xml configuration files from "dynConfigFlows"-directory and then create IntegrationFlows dynamically.
One IntegrationFlow per xml file. If config file was edited then IntegrationFlow should be updated. IntegrationFlow's registration id is set to config file name.
So current flow should be smoothly stopped (no input messages more and currently processed should be completed). After this it should be removed and updated version of the flow should be registered.
Now the questions:
a) In JUnit test below I'm able to register two flows with the same ID in case I do not call registration.remove(id); Is it considered behavior? Shouldn't ids be unique?
b) After call of remove(...) I try to send one more message to removed InputChannel. I get an error but I still see log output from working removed flow.
How should I implement refresh / reload functionality of dynamic IntegrationFlows?
@Test
public void testRegisterUnregister() {
final String FLOW_ID = "regFlow";
Flux<Message<?>> messageFlux = Flux.just("1,2,3,4").map(v -> v.split(",")).flatMapIterable(Arrays::asList)
.map(Integer::parseInt).map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow regFlow = createFlow("first", messageFlux, resultChannel);
IntegrationFlowRegistration register1 = this.flowContext.registration(regFlow).id(FLOW_ID).register();
final String FIRST_ID = register1.getId();
System.out.println("!!!!!!!!!!!!!!!! register flow: "+FIRST_ID);
// validate incoming of 4 integers
assertThat(resultChannel.getQueueSize()).as("queueSize").isEqualTo(4);
// save reference to first InputChannel
MessageChannel input1 = register1.getInputChannel();
// stop and remove first flow
register1.stop();
this.flowContext.remove(FIRST_ID);
// wait for the case if flows shutdown needs some time
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// create second flow
regFlow = createFlow("second", messageFlux, resultChannel);
IntegrationFlowRegistration register2 = this.flowContext.registration(regFlow).id(FLOW_ID).register();
System.out.println("!!!!!!!!!!!!!!!! register flow: "+register2.getId());
// send extra message to the second flow
register2.getInputChannel().send(MessageBuilder.withPayload("!!!!!!!!!!! new registered flow message").build());
try {
// send extra message to the first flow. It should fail because first flow is removed.
// It fails but we still see "!!!!!!!!!!! new registered flow message" from first flow. It should't happen!
input1.send(MessageBuilder.withPayload("!!!!!!!!!!! new registered flow message").build());
Assert.fail("we should get an exception because first flow is removed");
} catch (MessageDeliveryException e) {
e.printStackTrace();
}
assertThat(resultChannel.getQueueSize()).as("queueSize=2 x 4 + 1").isEqualTo(9);
}
private IntegrationFlow createFlow(String markerText, Flux<Message<?>> messageFlux, QueueChannel resultChannel) {
return IntegrationFlows
.from(messageFlux)
.log(l -> "!!!!!!!!!!!!!!!!!!!!!!! "+markerText+"="+l)
.channel(resultChannel)
.get();
}