I'm working on an application using Spring Boot 1.5.13.RELEASE and Spring Integration 4.3.16.RELEASE.
I'm pretty new to Spring Integration and I have encountered a problem.
So the basic idea is that on some external triggers (could be and HTTP call) I need to create an IntegrationFlow which will consume messages from a rabbitMQ queue, do some work with 'em and then (maybe) produce to another rabbitMQ endpoint.
Now this is supposed to happen a lot of times so I will have to create multiple IntegrationFlows.
I am using IntegrationFlowContext to register each one of the IntegrationFlows like this:
IntegrationFlowContext flowContext;
...
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(uuid);
...
flowContext.registration(integrationFlow).id(callUUID).register();
I have to clarify that this can be happening concurrently, creating multiple IntegrationFlows at the same time.
So each time I'm trying to create an IntegrationFlow, my "source" is a Gateway that looks like this:
MessagingGatewaySupport sourceGateway = Amqp
.inboundGateway(rabbitTemplate.getConnectionFactory(), rabbitTemplate, dynamicQueuePrefix+uuid)
.concurrentConsumers(1)
.adviceChain(retryInterceptor)
.autoStartup(false)
.id("sgX-" + uuid)
.get();
It's not a @Bean (yet) but I expect it to get registered when each IntegrationFlow is registered.
My "target" is an AmqpOutBoundAdapter that looks like this:
@Bean
public AmqpOutboundEndpoint outboundAdapter(
RabbitTemplate rabbitTemplate,
ApplicationMessagingProperties applicationMessagingProperties
) {
return Amqp.outboundAdapter(rabbitTemplate)
.exchangeName("someStandardExchange")
.routingKeyExpression("headers.get('rabbitmq.ROUTING_KEY')")
.get();
}
Now this one IS a bean already and is injected each time I'm trying to create a flow.
And my flow(s) looks like this:
public IntegrationFlow configure() {
return IntegrationFlows
.from(sourceGateway)
.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper))
.filter(injectedGenericSelectorFilter)
.<HashMap<String, String>>handle((payload, headers) -> {
String uuid = payload.get("uuid");
boolean shouldForwardMessage = myInjectedApplicationService.isForForwarding(payload);
myInjectedApplicationService.handlePayload(payload);
return MessageBuilder
.withPayload(payload)
.setHeader("shouldForward", shouldForwardMessage)
.setHeader("rabbitmq.ROUTING_KEY", uuid)
.build();
})
.filter("headers.get('shouldForward').equals(true)")
.transform(Transformers.toJson(jsonObjectMapper))
.handle(outboundAdapter)
.get();
}
My problem is that while the application starts fine and creates the first IntegrationFlows,etc. later on, I'm getting this kind of exceptions:
java.lang.IllegalStateException: Could not register object [org.springframework.integration.transformer.MessageTransformingHandler#872] under bean name 'org.springframework.integration.transformer.MessageTransformingHandler#872': there is already object [org.springframework.integration.transformer.MessageTransformingHandler#872] bound
I even tried setting an id to each of the components used, which is supposed to be used as beanName , like this:
.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper), tf -> tf.id("tf1-"+uuid))
But, even though bean name problems with components like .filter were resolved, I still get the same exception about a MessageTransformingHandler.
UPDATE
I didn't mention the fact that once each IntegrationFlow
is done with its work, it is getting removed using the IntegrationFlowContext
like this:
flowContext.remove(flowId);
So what seems to have (kind of) worked is synchronizing both the flow registration block and the flow removing block by using the same object as a lock.
So my class responsible for registering and removing flows looks like this:
...
private final Object lockA = new Object();
...
public void appendNewFlow(String callUUID){
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(callUUID);
synchronized (lockA) {
flowContext.registration(integrationFlow).id(callUUID).register();
}
}
public void removeFlow(String flowId){
synchronized (lockA) {
flowContext.remove(flowId);
}
}
...
My problem now is this kind of lock is kinda heavy for the application, since I'm getting quite a lot:
...Waiting for workers to finish.
...
...Successfully waited for workers to finish.
which doesn't happen as fast as I'd like to.
But I guess that is expected since each time a thread acquires the lock, it will take some time to either register the flow and all its components or deregister the flow and all its components.