When using the splitter and routing slip to route pieces of the body to different endpoints, I discovered that an .end()
is needed to avoid anything outside the split block from being included.
The desired behavior was to split the body, route each part using a routingslip to a different endpoint. Once the split block was complete, then continue processing with the exchange (and body) being as it was before the split.
The test code has two identical routes with the exception of a .end()
right after the .routingSlip()
. When the tests run, you can see that the one with the .end()
has 3 Inner processor messages, and one Outer processor message. It will also have the correct payload type after the split block completes. While the other test using the second route that does NOT have the .end()
after the routingSlip()
will produce 3 interleaved Inner and Outer processor messages.
While I may have missed something in the documentation, I couldn't find anything with an example of using the splitter and routingSlip this way that would have warned me that I needed the .end()
to have it behave the way I intend. If this is not a bug, I would suggest more obvious documentation of this issue. I may have found it sooner, but my original code involved a custom splitter and it was not obvious that this was the issue rather than my code.
I also don't know if this issue is also true for recipientList or dynamicRouter.
package org.apache.camel.processor.routingslip;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;
public class SpliterRoutingSlipTest extends CamelTestSupport {
private static final String TEST_DIR = "target/test";
private static final String TEST_OUT_ENDPOINT_WEND = "file:"+TEST_DIR+"/Wend";
private static final String TEST_OUT_ENDPOINT_WOEND = "file:"+TEST_DIR+"/WOend";
private static final String TEST_ROUTE_ID_WEND = "splitBodyTestWEnd";
private static final String TEST_ROUTE_ID_WOEND = "splitBodyTestWOEnd";
private static final String TEST_IN_ENDPOINT_WEND = "direct:"+TEST_ROUTE_ID_WEND;
private static final String TEST_IN_ENDPOINT_WOEND = "direct:"+TEST_ROUTE_ID_WOEND;
private static final String TEST_ROUTING_SLIP_HEADER = "toEndpoint";
private static final List<String> TEST_BODY = Arrays.asList(new String[] {
"This is line 1",
"This is line 2",
"This is line 3",
});
@BeforeClass
public static void init() throws IOException {
File dirToRemove = new File(TEST_DIR);
if (dirToRemove.exists())
FileUtils.forceDelete(dirToRemove);
}
/**
* Test split and routing slip WITH an '.end()' after the routing slip.
*
* The result is that the Inner processor gets called for EACH iteration within the split
* but the Outer process only gets called after the split is complete AND the exchange
* is the one from before being split.
*
* This IS the desired behavior.
*
* @throws Exception
*/
@Test
public void testSplitByBodyAndRouteWithOuterPostProcessing() throws Exception {
MockEndpoint end = getMockEndpoint("mock:end");
end.expectedMessageCount(1);
template.sendBodyAndHeader(TEST_IN_ENDPOINT_WEND, TEST_BODY, TEST_ROUTING_SLIP_HEADER, TEST_OUT_ENDPOINT_WEND);
assertMockEndpointsSatisfied();
}
/**
* Test split and routing slip WITH OUT an '.end()' after the routing slip.
*
* The result is that the inner and outer processors BOTH get called for EACH iteration within the split.
*
* This is NOT the desired effect.
*
* @throws Exception
*/
@Test
public void testSplitByBodyAndRouteWithIncorrectOuterPostProcessing() throws Exception {
MockEndpoint end = getMockEndpoint("mock:end");
end.expectedMessageCount(3);
template.sendBodyAndHeader(TEST_IN_ENDPOINT_WOEND, TEST_BODY, TEST_ROUTING_SLIP_HEADER, TEST_OUT_ENDPOINT_WOEND);
assertMockEndpointsSatisfied();
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from(TEST_IN_ENDPOINT_WEND).id(TEST_ROUTE_ID_WEND)
.split(body())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the INNER processor w/ end().");
Message in = exchange.getIn();
System.out.println("\tin="+in);
Object body = in.getBody();
System.out.println("\tbody="+body);
System.out.println("\tbody.class="+body.getClass());
}
})
.setHeader(TEST_ROUTING_SLIP_HEADER, simple(TEST_OUT_ENDPOINT_WEND))
.setHeader("tempFileName", simple("${file:name}.tmp"))
.log(LoggingLevel.INFO, "Destination endpoint for filename ${file:name} is ${header.toEndpoint}")
.routingSlip(header(TEST_ROUTING_SLIP_HEADER))
.end()
.log(LoggingLevel.INFO, "Sent body to ${header.toEndpoint}/${file:name}")
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the OUTER processor w/ end().");
Message in = exchange.getIn();
System.out.println("in="+in);
Object body = in.getBody();
System.out.println("body="+body);
System.out.println("body.class="+body.getClass());
}
})
.to("mock:end")
.end()
;
from(TEST_IN_ENDPOINT_WOEND).id(TEST_ROUTE_ID_WOEND)
.split(body())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the INNER processor W/O end().");
Message in = exchange.getIn();
System.out.println("\tin="+in);
Object body = in.getBody();
System.out.println("\tbody="+body);
System.out.println("\tbody.class="+body.getClass());
}
})
.setHeader(TEST_ROUTING_SLIP_HEADER, simple(TEST_OUT_ENDPOINT_WOEND))
.setHeader("tempFileName", simple("${file:name}.tmp"))
.log(LoggingLevel.INFO, "Destination endpoint for filename ${file:name} is ${header.toEndpoint}")
.routingSlip(header(TEST_ROUTING_SLIP_HEADER))
// .end()
.log(LoggingLevel.INFO, "Sent body to ${header.toEndpoint}/${file:name}")
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("This is the OUTER processor W/O end().");
Message in = exchange.getIn();
System.out.println("in="+in);
Object body = in.getBody();
System.out.println("body="+body);
System.out.println("body.class="+body.getClass());
}
})
.to("mock:end")
.end()
;
}
};
}
}