My understanding of a route (in Apache Camel verbiage) is that it represents a flow of data from one endpoint to another, and that it will stop at various processors along the way that perform EIP-type operations on the data.
If that's a correct/fair assessment of a route, then I am modeling a problem that I believe requires several routes inside the same CamelContext
(I'm using a Spring):
- Route 1: Extracts data from Source-1, processes it, converts it to a
List<SomePOJO>
and then sends it to an aggregator - Route 2: Extracts data from Source-2, processes it, also converts it to a
List<SomePOJO>
and then sends it to an aggregator - Route 3: Contains an aggregator that waits until it receives a
List<SomePOJO>
from both Route 1 and Route 2, and then continues processing an aggregated list
Here's the thing: both List<SomePOJO>
s need to arrive at the aggregator at the same time, or rather, the aggregator bean has to wait until it's received data from both routes before it can aggregate the 2 lists into a single List<SomePOJO>
and send the aggregated list off to the rest of Route 3.
So far I have the following pseudo-coded <camelContext>
:
<camelContext id="my-routes" xmlns="http://camel.apache.org/schema/spring">
<!-- Route 1 -->
<route id="route-1">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<!-- Extracts data from Source 1, processes it, and then produces a List<SomePOJO>. -->
<to uri="bean:extractor1?method=process" />
<!-- Send to aggregator. -->
<to uri="direct:aggregator" />
</route>
<!-- Route 2 -->
<route id="route-2">
<from uri="time://runOnce?repeatCount=1&delay=10" />
<!-- Extracts data from Source 2, processes it, and then produces a List<SomePOJO>. -->
<to uri="bean:extractor2?method=process" />
<!-- Send to aggregator. -->
<to uri="direct:aggregator" />
</route>
<!-- Route 3 -->
<route id="route-3">
<from uri="direct:aggregator" />
<aggregate strategyRef="listAggregatorStrategy">
<correlationExpression>
<!-- Haven't figured this part out yet. -->
</correlationExpression>
<to uri="bean:lastProcessor?method=process" />
</aggregate>
</route>
</camelContext>
<bean id="listAggregatorStrategy" class="com.myapp.ListAggregatorStrategy" />
Then in Java:
public class ListAggregatorStrategy implements AggregatoryStrategy {
public Exchange aggregate(Exchange exchange) {
List<SomePOJO> route1POJOs = extractRoute1POJOs(exchange);
List<SomePOJO> route2POJOs = extractRoute2POJOs(exchange);
List<SomePOJO> aggregateList = new ArrayList<SomePOJO>(route1POJOs);
aggregateList.addAll(route2POJOs);
return aggregateList;
}
}
My questions
- Is my basic setup correct? In other words, am I using the
direct:aggregator
endpoint correctly to send data out ofroute-1
androute-2
and intoroute-3
's aggregator? - Will my aggregator work the way I am expecting it to here? Say the
extractor1
bean inroute-1
takes only 5 seconds to run, but theextractor2
bean inroute-2
take 2 minutes to run. At t=5, the aggregator should receive the data fromextractor1
and start waiting (for 2 mins) untilextractor2
finishes and gives it the rest of the data to aggregate. Yes?