1
votes

When using the splitter and routing slip to route pieces of the body to different endpoints, I discovered that an .end() is needed to avoid anything outside the split block from being included.

The desired behavior was to split the body, route each part using a routingslip to a different endpoint. Once the split block was complete, then continue processing with the exchange (and body) being as it was before the split.

The test code has two identical routes with the exception of a .end() right after the .routingSlip(). When the tests run, you can see that the one with the .end() has 3 Inner processor messages, and one Outer processor message. It will also have the correct payload type after the split block completes. While the other test using the second route that does NOT have the .end() after the routingSlip() will produce 3 interleaved Inner and Outer processor messages.

While I may have missed something in the documentation, I couldn't find anything with an example of using the splitter and routingSlip this way that would have warned me that I needed the .end() to have it behave the way I intend. If this is not a bug, I would suggest more obvious documentation of this issue. I may have found it sooner, but my original code involved a custom splitter and it was not obvious that this was the issue rather than my code.

I also don't know if this issue is also true for recipientList or dynamicRouter.

package org.apache.camel.processor.routingslip;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;

public class SpliterRoutingSlipTest extends CamelTestSupport {

    private static final String TEST_DIR = "target/test";
    private static final String TEST_OUT_ENDPOINT_WEND = "file:"+TEST_DIR+"/Wend";
    private static final String TEST_OUT_ENDPOINT_WOEND = "file:"+TEST_DIR+"/WOend";
    private static final String TEST_ROUTE_ID_WEND = "splitBodyTestWEnd";
    private static final String TEST_ROUTE_ID_WOEND = "splitBodyTestWOEnd";
    private static final String TEST_IN_ENDPOINT_WEND = "direct:"+TEST_ROUTE_ID_WEND;
    private static final String TEST_IN_ENDPOINT_WOEND = "direct:"+TEST_ROUTE_ID_WOEND;
    private static final String TEST_ROUTING_SLIP_HEADER = "toEndpoint";

    private static final List<String> TEST_BODY = Arrays.asList(new String[]  {
        "This is line 1",
        "This is line 2",
        "This is line 3",
    });

    @BeforeClass
    public static void init() throws IOException {
        File dirToRemove = new File(TEST_DIR);
        if (dirToRemove.exists())
            FileUtils.forceDelete(dirToRemove);
    }

    /**
     * Test split and routing slip WITH an '.end()' after the routing slip.
     * 
     * The result is that the Inner processor gets called for EACH iteration within the split
     * but the Outer process only gets called after the split is complete AND the exchange
     * is the one from before being split.
     * 
     * This IS the desired behavior.
     * 
     * @throws Exception
     */
    @Test
    public void testSplitByBodyAndRouteWithOuterPostProcessing() throws Exception {
        MockEndpoint end = getMockEndpoint("mock:end");
        end.expectedMessageCount(1);

        template.sendBodyAndHeader(TEST_IN_ENDPOINT_WEND, TEST_BODY, TEST_ROUTING_SLIP_HEADER, TEST_OUT_ENDPOINT_WEND);

        assertMockEndpointsSatisfied();
    }

    /**
     * Test split and routing slip WITH OUT an '.end()' after the routing slip.
     * 
     * The result is that the inner and outer processors BOTH get called for EACH iteration within the split.
     * 
     * This is NOT the desired effect.
     * 
     * @throws Exception
     */
    @Test
    public void testSplitByBodyAndRouteWithIncorrectOuterPostProcessing() throws Exception {
        MockEndpoint end = getMockEndpoint("mock:end");
        end.expectedMessageCount(3);

        template.sendBodyAndHeader(TEST_IN_ENDPOINT_WOEND, TEST_BODY, TEST_ROUTING_SLIP_HEADER, TEST_OUT_ENDPOINT_WOEND);

        assertMockEndpointsSatisfied();
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from(TEST_IN_ENDPOINT_WEND).id(TEST_ROUTE_ID_WEND)
                    .split(body())
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                System.out.println("This is the INNER processor w/ end().");
                                Message in = exchange.getIn();
                                System.out.println("\tin="+in);
                                Object body = in.getBody();
                                System.out.println("\tbody="+body);
                                System.out.println("\tbody.class="+body.getClass());
                            }
                        })
                        .setHeader(TEST_ROUTING_SLIP_HEADER, simple(TEST_OUT_ENDPOINT_WEND))
                        .setHeader("tempFileName", simple("${file:name}.tmp"))
                        .log(LoggingLevel.INFO, "Destination endpoint for filename ${file:name} is ${header.toEndpoint}")
                        .routingSlip(header(TEST_ROUTING_SLIP_HEADER))
                        .end()
                        .log(LoggingLevel.INFO, "Sent body to ${header.toEndpoint}/${file:name}")
                    .end()
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            System.out.println("This is the OUTER processor w/ end().");
                            Message in = exchange.getIn();
                            System.out.println("in="+in);
                            Object body = in.getBody();
                            System.out.println("body="+body);
                            System.out.println("body.class="+body.getClass());
                        }
                    })
                    .to("mock:end")
                .end()
                ;

                from(TEST_IN_ENDPOINT_WOEND).id(TEST_ROUTE_ID_WOEND)
                    .split(body())
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                System.out.println("This is the INNER processor W/O end().");
                                Message in = exchange.getIn();
                                System.out.println("\tin="+in);
                                Object body = in.getBody();
                                System.out.println("\tbody="+body);
                                System.out.println("\tbody.class="+body.getClass());
                            }
                        })
                        .setHeader(TEST_ROUTING_SLIP_HEADER, simple(TEST_OUT_ENDPOINT_WOEND))
                        .setHeader("tempFileName", simple("${file:name}.tmp"))
                        .log(LoggingLevel.INFO, "Destination endpoint for filename ${file:name} is ${header.toEndpoint}")
                        .routingSlip(header(TEST_ROUTING_SLIP_HEADER))
//                      .end()
                        .log(LoggingLevel.INFO, "Sent body to ${header.toEndpoint}/${file:name}")
                    .end()
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            System.out.println("This is the OUTER processor W/O end().");
                            Message in = exchange.getIn();
                            System.out.println("in="+in);
                            Object body = in.getBody();
                            System.out.println("body="+body);
                            System.out.println("body.class="+body.getClass());
                        }
                    })
                    .to("mock:end")
                .end()
                ;
            }
        };
    }
}
1

1 Answers

3
votes

I do agree with you. I've been developing in Camel for more than 5 years now, and .end() vs .endChoice() are still one of the most confusing stuff :-(

What I would advice:

1) Think about how your route would be expressed in Spring DSL. In this xml-based DSL, you always have to delimit your block (with closing tag)

<from uri="direct:a"/>
    <routingSlip ignoreInvalidEndpoints="true"/> <!-- START OF BLOCK -->
        <header>myHeader</header>
    </routingSlip> <!-- END OF BLOCK -->
</route>

Do the same in Java !

2) The fact (and the confusing part) is that for trivial processing (=what you always see in tutorials/non real-life Camel examples), Java-DSL allows to omit to end the block:

from("direct:a")
    .routingSlip(header("myHeader")) 
    .ignoreInvalidEndpoints();

But the correct way is:

from("direct:a")
    .routingSlip(header("myHeader")) 
        .ignoreInvalidEndpoints()
    .end();

3) I had exactly the same problem as you with the recipentList which also needs to be closed !

.split(simple("${body}"))
        .streaming()
        .aggregate(simple("${body.blockId}"), new PutInBlockStrategy())
            .ignoreInvalidCorrelationKeys()
            .completionTimeout(5*1000)                                                          
            .log(TRACE, LOGNAME, "Next block:\n${body}")
            .recipientList( method(this, "getRecipents") ).end()
            .parallelProcessing()
        .end()  
    .end()
    .log(INFO, LOGNAME, "File: ${headers.CamelFileName} successfully processed");

4) In case of doubt, have a look on the source or javadoc of the definition of the EIP pattern, and see whether it has an explicit end() method:

https://camel.apache.org/maven/camel-2.15.0/camel-core/apidocs/org/apache/camel/model/RoutingSlipDefinition.html#end() https://camel.apache.org/maven/camel-2.15.0/camel-core/apidocs/org/apache/camel/model/RecipientListDefinition.html#end()

If so, always end() your block !

5) Interesting post: https://www.3riverdev.com/apache-camel-tips-caveats-from-the-trenches/