How can I setup a reactive flow using DSL for the following steps:
- Receive an SQS Message using
SqsMessageDrivenChannelAdapter
- Validate the Json message [
JsonSchemaValidator
class withvalidate
method] - Transform the json to objects
- Pass the objects to a service activator (
BusinessService
: business logic, state machine) - Persist the Objects R2DBC outbound adapter
I was looking at this : https://github.com/spring-projects/spring-integration/blob/master/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java
In the above example, there are dedicated flows created that return a Publisher
and in the tests the Publishers are subscribed
. However, my flow will be triggered when SqsMessageDrivenChannelAdapter
brings in a message into a channel.
How to achieve a reactive flow configuration, for the scenario above steps 1 to 5?
Update : Sample code added
@Bean
public IntegrationFlow importFlow() {
IntegrationFlows.from(sqsInboundChannel())
.handle((payload, messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
.transform(Transformers.fromJson(Entity.class))
.handle((payload, messageHeaders) ->businessService.process((Entity) payload))
.handle(
Jpa.outboundAdapter(this.entityManagerFactory)
.entityClass(Entity)
.persistMode(PersistMode.PERSIST),
ConsumerEndpointSpec::transactional)
.get();
}
@Bean
public MessageProducer sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
new SqsMessageDrivenChannelAdapter(asyncSqsClient, queueName);
sqsMessageDrivenChannelAdapter.setAutoStartup(true);
sqsMessageDrivenChannelAdapter.setOutputChannel(sqsInboundChannel());
return sqsMessageDrivenChannelAdapter;
}
@Bean
public MessageChannel sqsInboundChannel() {
return MessageChannels.flux().get();
}
Update 2 : Moved JPA to a diff thread using executor channel
@Bean
public IntegrationFlow importFlow() {
IntegrationFlows.from(sqsInboundChannel())
.handle((payload, messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
.transform(Transformers.fromJson(Entity.class))
.handle((payload, messageHeaders) ->businessService.process((Entity) payload))
.channel(persistChannel())
.handle(
Jpa.outboundAdapter(this.entityManagerFactory)
.entityClass(Entity)
.persistMode(PersistMode.PERSIST),
ConsumerEndpointSpec::transactional)
.get();
}
@Bean
public MessageProducer sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
new SqsMessageDrivenChannelAdapter(asyncSqsClient, queueName);
sqsMessageDrivenChannelAdapter.setAutoStartup(true);
sqsMessageDrivenChannelAdapter.setOutputChannel(sqsInboundChannel());
return sqsMessageDrivenChannelAdapter;
}
@Bean
public MessageChannel sqsInboundChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel persistChannel() {
return MessageChannels.executor(Executors.newCachedThreadPool()).get();
}