1
votes

The encode method of the fileProcessor bean is responsible for encoding a video file. If it encounters a problem the file should not be deleted, otherwise if everything is well, it is okay to delete. Right now, the only way to preserve the Message flow without changing the payload is to make the encode method return void. I need to return some 'header' information though so that SI can later delete the file. I tried using MessageBuilder to create a Message<File> and return that, but when it arrives in the next channel it has been wrapped and there is a Message inside a Message, therefore my expression doesn't work to trigger the delete.

I suppose I could use a wrapped Message and dig down one level in the object graph, but that seems clunky.

What is the best approach to tacking on some return value without destroying the original Message payload and without polluting my POJO encode method with SI channels and sending?

Here is my configuration:

<!-- ########################## -->
<!-- ###      Encoding      ### -->
<!-- ########################## -->

<file:inbound-channel-adapter 
    directory="${paths.encode}"
    channel="encodeChannel"
    filename-regex="${encode.regex}"
    prevent-duplicates="false">
    <int:poller fixed-rate="5000"/>
</file:inbound-channel-adapter>

<int:service-activator
    input-channel="encodeChannel"
    output-channel="encodeResultChannel"
    ref="fileProcessor"
    method="encode">
</int:service-activator>    

<!-- This is where I'm having trouble.  -->
<!-- I don't expect this router to work. -->
<int:router
    input-channel="encodeResultChannel"
    expression="payload">       
    <int:mapping value="true" channel="encodeDeleteChannel"/>
    <int:mapping value="false" channel="stdout"/>
</int:router>

<int:service-activator
    input-channel="encodeDeleteChannel"
    expression="payload.delete()"
    output-channel="stdout">
</int:service-activator>

<stream:stdout-channel-adapter 
    id="stdout" 
    append-newline="true" />

edit:

I'm using:

<properties>
    <spring-framework.version>3.2.3.RELEASE</spring-framework.version>
</properties>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>${spring-framework.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
    <version>1.1.0.BUILD-SNAPSHOT</version>
</dependency>

edit2:

here's the updated configuration

<!-- ########################## -->
<!-- ###      Encoding      ### -->
<!-- ########################## -->

<file:inbound-channel-adapter 
    directory="${paths.encode}"
    channel="filePickupChannel"
    filename-regex="${encode.regex}"
    prevent-duplicates="false">
    <int:poller fixed-rate="5000"/>
</file:inbound-channel-adapter>

<int:header-enricher
    input-channel="filePickupChannel"
    output-channel="encodeChannel">
    <int:header name="origFile" expression="payload"/>
</int:header-enricher>

<int:service-activator
    input-channel="encodeChannel"
    output-channel="encodeResultChannel"
    ref="fileProcessor"
    method="encode">
</int:service-activator>    

<int:router
    input-channel="encodeResultChannel"
    ignore-send-failures="false"
    default-output-channel="stdout"
    expression="payload">       
    <int:mapping value="true" channel="encodeDeleteChannel"/>
    <int:mapping value="false" channel="stdout"/>
</int:router>

<int:service-activator
    input-channel="encodeDeleteChannel"
    expression="headers['origFile'].delete()"
    output-channel="stdout">
</int:service-activator>
1

1 Answers

3
votes

What version of Spring Integration and Spring Framework are you using?

What does the signature of fileProcessor.encode() look like?

You should not get a nested Message<?>, the AbstractReplyProducingMessageHandler has the following logic...

private Message<?> createReplyMessage(Object reply, MessageHeaders requestHeaders) {
    AbstractIntegrationMessageBuilder<?> builder = null;
    if (reply instanceof Message<?>) {
        if (!this.shouldCopyRequestHeaders()) {
            return (Message<?>) reply;
        }
        builder = this.getMessageBuilderFactory().fromMessage((Message<?>) reply);
    }

...

    if (this.shouldCopyRequestHeaders()) {
        builder.copyHeadersIfAbsent(requestHeaders);
    }
    return builder.build();
}

So, if you return a Message<?> your message is returned (enhanced with any inbound headers that you didn't set).

Are you using Spring Integration 3.0.x with Spring Framework 4.0.x? If so, you need to be careful to return an org.springframework.integration Message, not an org.springframework.messaging message.

If you return an org.springframework.messaging Message, Spring Integration will indeed wrap it in a Spring Integration Message.

Core messaging classes were moved to the spring-messaging module in Spring Framework 4.0, so they can be used for websockets, STOMP etc.

Spring Integration 4.0.x now uses those classes as well so you won't see both on the classpath; avoiding the confusion. When using Spring Integration 3.0.x with Spring Framework 4.0.x, you need to take great care to use the right classes.

In general, however, we don't recommend adding framework classes (such as Message<?>) to your code, use POJOs instead, and the framework will take care of the messaging details...

boolean encode(File file) {...}

If you need to access the payload after the encode, consider promoting it to a header beforehand.

<int:header-enricher ...>
    <int:header name="origFile" expression="payload" />
</int:header-enricher>

Then use expression="headers['origFile'].delete() after the encode.

EDIT:

Or, return the file (so it becomes the new payload) on success and, on failure, return null or throw an exception and the downstream flow won't be executed.