I try to convert the integration cafe demo to Java 8 DSL, there are some existing examples in the official Spring Integration Samples.
I want to combine all the good parts of these examples.
- Java 8 DSL instead of XML
- AMQP to slipt the workflow in smaller flows.
- Using Jackson to handle the payload in JSON.
The codes(not work as expected, some issues) are here.
In the official XML samples, it is easy to set up a request/reply channel in Amqp gateway. But go to Java 8 DSL, the options are missing.
And when the application, it will complain "reply channel or output channel is required".
BTW, is there any better option to debug/unit testing the Spring integration application?
Update 1: confused when I was writing the flow of coldDrinks flow.
The XML is from the original cafe-amqp project.
<!-- To receive an AMQP Message from a Queue, and respond to its reply-to address, configure an inbound-gateway. -->
<int-amqp:inbound-gateway
id="coldDrinksBarista"
request-channel="coldJsonDrinks"
queue-names="cold-drinks"
connection-factory="rabbitConnectionFactory" />
<int:chain input-channel="coldJsonDrinks">
<int:json-to-object-transformer type="org.springframework.integration.samples.cafe.OrderItem"/>
<int:service-activator method="prepareColdDrink">
<bean class="org.springframework.integration.samples.cafe.xml.Barista"/>
</int:service-activator>
<int:object-to-json-transformer content-type="text/x-json"/>
</int:chain>
How to convert this into Java DSL effectively. I added my thoughts inline
@Bean
public IntegrationFlow coldDrinksFlow(AmqpTemplate amqpTemplate) {
return IntegrationFlows
.from("coldDrinks")
.handle(
Amqp.outboundGateway(amqpTemplate)
.exchangeName(TOPIC_EXCHANGE_CAFE_DRINKS)
.routingKey(ROUTING_KEY_COLD_DRINKS)
)
.log("coldDrinksFlow")
.channel(preparedDrinksChannel())
.get();
}
@Bean
public IntegrationFlow coldDrinksBaristaFlow(ConnectionFactory connectionFactory, Barista barista) {
return IntegrationFlows
.from(Amqp.inboundGateway(connectionFactory, QUEUE_COLD_DRINKS)
.configureContainer(
c -> c.receiveTimeout(10000)
)// If setup replyChannel the below `handle` is not worked as expected.
)
.handle(OrderItem.class, (payload, headers) -> (Drink) barista.prepareColdDrink(payload))
//If adding a channel here, the flow will NOT return back the `coldDrinksFlow` will cause another exception, "its requiresReply is set to true..."
.get();
}
In my before experience, I would like to break the whole flow by the protocols(HTTP, FTP, etc) as edges(inbound at the beginning and outbound as the end) in small flows. The inbound/outbound gateway is easy to set it to work without setting a reply channel etc, it should reply by default through the original route using its built-in protocols instead of channels. In my inboundGateway RSocket example, I don't set a reply channel there, but the message returns to the roscket routes and receive by the client side(outboudGateway).
Update: Finally it works, check here. An issue I encountered here when trying to use Amqp to send and receive object message, the are some class cast exception thrown, the TypeId in headers is NOT changed when using handle
etc MessageHandler
, have to transform bwteen json/object like the xml based cafe-amqp did to make it finally work. What is missing here?