1
votes

I want to use Java DSL for Spring Integration, but I can't figure out how to use message headers during transformation.

My old implementation had a Transformer like this:

@Transformer(inputChannel = "inputChannel", outputChannel = "outputChannel")
public EventB transform(
        EventA eventA,
        @Header("a_header") String aHeader,
        @Header("other_header") String otherHeader){
    return new EventB(eventA.getSomeField(), aHeader, otherHeader);
}

Now I have the following DSL:

@Bean
public IntegrationFlow aFlow(){
    return IntegrationFlows.from(EventASink.INPUT)
        .filter("headers['operation'] == 'OPERATION_A'")
        .transform() //<-- What should I do here?
        .handle(Http.outboundGateway(uri).httpMethod(HttpMethod.POST))
        .get();
}

I looked at the implementation of transform() method and I found that it can receive a GenericTransformer as parameter, but it seems to work only with message payload and I also need the headers.

I also saw that some kind of reflection can be used, but I don't like it because its not refactor-safe.

Any advice? Thanks in advance.

2

2 Answers

4
votes

Since the DSL is a part of the Framework and it is compiled before you start to use it, we can't infer any custom POJO methods, therefore there is no so clean way to count with any custom headers like in your sample.

The closet way to re-use your transform() with those annotations on parameters is with this .transform() contract:

/**
 * Populate the {@code MessageTransformingHandler} for the {@link MethodInvokingTransformer}
 * to invoke the service method at runtime.
 * @param service the service to use.
 * @param methodName the method to invoke.
 * @return the current {@link IntegrationFlowDefinition}.
 * @see MethodInvokingTransformer
 */
public B transform(Object service, String methodName)

So, you would need to declare a bean with that method and use it in the service argument meanwhile mention the method in the methodName argument.

Another way to get access to headers is to request the whole Message type for lambda:

/**
 * Populate the {@link MessageTransformingHandler} instance for the provided
 * {@link GenericTransformer} for the specific {@code payloadType} to convert at
 * runtime.
 * Use {@link #transform(Class, GenericTransformer)} if you need access to the
 * entire message.
 * @param payloadType the {@link Class} for expected payload type. It can also be
 * {@code Message.class} if you wish to access the entire message in the transformer.
 * Conversion to this type will be attempted, if necessary.
 * @param genericTransformer the {@link GenericTransformer} to populate.
 * @param <P> the payload type - 'transform from' or {@code Message.class}.
 * @param <T> the target type - 'transform to'.
 * @return the current {@link IntegrationFlowDefinition}.
 * @see MethodInvokingTransformer
 * @see LambdaMessageProcessor
 */
public <P, T> B transform(Class<P> payloadType, GenericTransformer<P, T> genericTransformer) {

In this case the code could be like this:

  .transform(Message.class, m -> m.getHeaders())
1
votes

I had the same issue. I needed both headers and payload. After a lot of tinkering I found a solution. I used .handle instead of .transform. GenericHandler's handle method provides both payload and headers. In your case it would look something like:

@Bean
public IntegrationFlow aFlow(){
return IntegrationFlows.from(EventASink.INPUT)
    .filter("headers['operation'] == 'OPERATION_A'")
    .<EventA>handle((eventA, h) -> new EventB(
        eventA.getSomeField(),
        h.get("a_header", String.class),
        h.get("other_header", String.class)))
    .handle(Http.outboundGateway(uri).httpMethod(HttpMethod.POST))
    .get();
}