0
votes

How can I setup a reactive flow using DSL for the following steps:

  1. Receive an SQS Message using SqsMessageDrivenChannelAdapter
  2. Validate the Json message [JsonSchemaValidator class with validate method]
  3. Transform the json to objects
  4. Pass the objects to a service activator (BusinessService : business logic, state machine)
  5. Persist the Objects R2DBC outbound adapter

I was looking at this : https://github.com/spring-projects/spring-integration/blob/master/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

In the above example, there are dedicated flows created that return a Publisher and in the tests the Publishers are subscribed. However, my flow will be triggered when SqsMessageDrivenChannelAdapter brings in a message into a channel.

How to achieve a reactive flow configuration, for the scenario above steps 1 to 5?

Update : Sample code added

   @Bean
    public IntegrationFlow importFlow()  {
        IntegrationFlows.from(sqsInboundChannel())
                .handle((payload, messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
                .transform(Transformers.fromJson(Entity.class))
                .handle((payload, messageHeaders) ->businessService.process((Entity) payload))
                .handle(
                        Jpa.outboundAdapter(this.entityManagerFactory)
                                .entityClass(Entity)
                                .persistMode(PersistMode.PERSIST),
                        ConsumerEndpointSpec::transactional)
                .get();
    }

    @Bean
    public MessageProducer sqsMessageDrivenChannelAdapter() {
        SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
                new SqsMessageDrivenChannelAdapter(asyncSqsClient, queueName);
        sqsMessageDrivenChannelAdapter.setAutoStartup(true);
        sqsMessageDrivenChannelAdapter.setOutputChannel(sqsInboundChannel());
        return sqsMessageDrivenChannelAdapter;
    }

    @Bean
    public MessageChannel sqsInboundChannel() {
        return MessageChannels.flux().get();
    }

Update 2 : Moved JPA to a diff thread using executor channel

   @Bean
    public IntegrationFlow importFlow()  {
        IntegrationFlows.from(sqsInboundChannel())
                .handle((payload, messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
                .transform(Transformers.fromJson(Entity.class))
                .handle((payload, messageHeaders) ->businessService.process((Entity) payload))
                .channel(persistChannel())
                .handle(
                        Jpa.outboundAdapter(this.entityManagerFactory)
                                .entityClass(Entity)
                                .persistMode(PersistMode.PERSIST),
                        ConsumerEndpointSpec::transactional)
                .get();
    }

    @Bean
    public MessageProducer sqsMessageDrivenChannelAdapter() {
        SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
                new SqsMessageDrivenChannelAdapter(asyncSqsClient, queueName);
        sqsMessageDrivenChannelAdapter.setAutoStartup(true);
        sqsMessageDrivenChannelAdapter.setOutputChannel(sqsInboundChannel());
        return sqsMessageDrivenChannelAdapter;
    }

    @Bean
    public MessageChannel sqsInboundChannel() {
        return MessageChannels.flux().get();
    }

    @Bean
    public MessageChannel persistChannel() {
        return MessageChannels.executor(Executors.newCachedThreadPool()).get();
    }
1
and what code have you written?Toerktumlare
I have updated the original post with the codeNikhil

1 Answers

1
votes

You probably need to make yourself more familiar with what we have so far for Reactive Streams in Spring Integration: https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#reactive-streams

The sample you show with that test class is fully not relevant to your use case. In that test we try to cover some API we expose in Spring Integration, kinda unit tests. It has nothing to do with the whole flow.

Your use-case is really just a full black box flow starting with SQS listener and ending in the R2DBC. Therefore there is no point in your flow to try to convert part of it into the Publisher and then bring it back to another part of the flow: you are not going to track some how and subscribe to that Publisher yourself.

You may consider to place a FluxMessageChannel in between endpoints in your flow, but it still does not make sense for your use-case. It won't be fully reactive as you expect just because a org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer is not blocking on the consumer thread to be ready for a back-pressure from downstream.

The only really reactive part of your flow is that R2DBC outbound channel adapter, but probably it does not bring you too much value because the source of data is not reactive.

As I said: you can try to place a channel(channels -> channels.flux()) just after an SqsMessageDrivenChannelAdapter definition to start a reactive flow from that point. At the same time you should try to set a maxNumberOfMessages to 1 to try to make it waiting for a free space in before pulling the next mesasge from SQS.