2
votes

I'm working on an application using Spring Boot 1.5.13.RELEASE and Spring Integration 4.3.16.RELEASE.

I'm pretty new to Spring Integration and I have encountered a problem.

So the basic idea is that on some external triggers (could be and HTTP call) I need to create an IntegrationFlow which will consume messages from a rabbitMQ queue, do some work with 'em and then (maybe) produce to another rabbitMQ endpoint.

Now this is supposed to happen a lot of times so I will have to create multiple IntegrationFlows.

I am using IntegrationFlowContext to register each one of the IntegrationFlows like this:

IntegrationFlowContext flowContext;
...
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(uuid);
...
flowContext.registration(integrationFlow).id(callUUID).register();

I have to clarify that this can be happening concurrently, creating multiple IntegrationFlows at the same time.

So each time I'm trying to create an IntegrationFlow, my "source" is a Gateway that looks like this:

MessagingGatewaySupport sourceGateway = Amqp
        .inboundGateway(rabbitTemplate.getConnectionFactory(), rabbitTemplate, dynamicQueuePrefix+uuid)
        .concurrentConsumers(1)
        .adviceChain(retryInterceptor)
        .autoStartup(false)
        .id("sgX-" + uuid)
        .get();

It's not a @Bean (yet) but I expect it to get registered when each IntegrationFlow is registered.

My "target" is an AmqpOutBoundAdapter that looks like this:

@Bean
public AmqpOutboundEndpoint outboundAdapter(
        RabbitTemplate rabbitTemplate,
        ApplicationMessagingProperties applicationMessagingProperties
) {
    return Amqp.outboundAdapter(rabbitTemplate)
            .exchangeName("someStandardExchange")
            .routingKeyExpression("headers.get('rabbitmq.ROUTING_KEY')")
            .get();
}

Now this one IS a bean already and is injected each time I'm trying to create a flow.

And my flow(s) looks like this:

public IntegrationFlow configure() {
    return IntegrationFlows
            .from(sourceGateway)
            .transform(Transformers.fromJson(HashMap.class, jsonObjectMapper))
            .filter(injectedGenericSelectorFilter)
            .<HashMap<String, String>>handle((payload, headers) -> {

                String uuid = payload.get("uuid");

                boolean shouldForwardMessage = myInjectedApplicationService.isForForwarding(payload);
                myInjectedApplicationService.handlePayload(payload);

                return MessageBuilder
                        .withPayload(payload)
                        .setHeader("shouldForward", shouldForwardMessage)
                        .setHeader("rabbitmq.ROUTING_KEY", uuid)
                        .build();
            })
            .filter("headers.get('shouldForward').equals(true)")
            .transform(Transformers.toJson(jsonObjectMapper))
            .handle(outboundAdapter)
            .get();
}

My problem is that while the application starts fine and creates the first IntegrationFlows,etc. later on, I'm getting this kind of exceptions:

java.lang.IllegalStateException: Could not register object [org.springframework.integration.transformer.MessageTransformingHandler#872] under bean name 'org.springframework.integration.transformer.MessageTransformingHandler#872': there is already object [org.springframework.integration.transformer.MessageTransformingHandler#872] bound

I even tried setting an id to each of the components used, which is supposed to be used as beanName , like this:

.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper), tf -> tf.id("tf1-"+uuid))

But, even though bean name problems with components like .filter were resolved, I still get the same exception about a MessageTransformingHandler.


UPDATE

I didn't mention the fact that once each IntegrationFlow is done with its work, it is getting removed using the IntegrationFlowContext like this:

flowContext.remove(flowId);

So what seems to have (kind of) worked is synchronizing both the flow registration block and the flow removing block by using the same object as a lock.

So my class responsible for registering and removing flows looks like this:

...
private final Object lockA = new Object();
...

public void appendNewFlow(String callUUID){
    IntegrationFlow integrationFlow = myFlowFactory.makeFlow(callUUID);

    synchronized (lockA) {
        flowContext.registration(integrationFlow).id(callUUID).register();
    }
}

public void removeFlow(String flowId){

    synchronized (lockA) {
        flowContext.remove(flowId); 
    }

}
...

My problem now is this kind of lock is kinda heavy for the application, since I'm getting quite a lot:

...Waiting for workers to finish.
...
...Successfully waited for workers to finish.

which doesn't happen as fast as I'd like to.

But I guess that is expected since each time a thread acquires the lock, it will take some time to either register the flow and all its components or deregister the flow and all its components.

1

1 Answers

0
votes

You also have there this one:

.transform(Transformers.toJson(jsonObjectMapper))

How does it work if you add an .id() there as well?

On the other hand, since you say that this happens concurrently, I wonder if you can make some piece of your code synchonized, e.g. wrap that flowContext.registration(integrationFlow).id(callUUID).register();.

The bean definition and registration process is really not thread-safe and intended to be used only from the one, initializing thread in the beginning of application lifecycle.

We probably really need to make an IntegrationFlowContext as thread-safe in its register(IntegrationFlowRegistrationBuilder builder) function or, at least, its registerBean(Object bean, String beanName, String parentName) since this is exactly a place where we generate bean name and register it.

Feel free to raise a JIRA on the matter.

Unfortunately, Spring Integration Java DSL extension project is already out of support and we can add a fix only to the current 5.x generation. Nevertheless I believe that synchonized workaround should work here, therefore no need to back port it into the Spring Integration Java DSL extension.