While trying to use the Apache camel aggregation strategy, I am running into an issue as described below:
My application : Exposed a simple soap service that takes in the names of the departments in an organisation (finance, sales...). Inside my camel route, I route the request accordingly to the department specific routes. In these routes, I query a table, and get the employees for that department.
As a response for the soap service, I want to aggregate all the employees together and send.
Below is my camel Route:
<camelContext id="camel"
xmlns="http://camel.apache.org/schema/spring">
<camel:dataFormats>
<camel:jaxb contextPath="org.example.departments" id="jaxb"/>
</camel:dataFormats>
<route id="simple-route">
<!-- <camel:to id="unmarshallDeps" uri="bean:departmentProcessor"></camel:to> -->
<from id="_from1" uri="cxf:bean:departmentsEndpoint?dataFormat=PAYLOAD&loggingFeatureEnabled=true"/>
<camel:unmarshal id="_unmarshal1" ref="jaxb"/>
<camel:setHeader headerName="departments" id="_setHeader1">
<camel:method method="getDepartmentRoute" ref="depRouter"/>
</camel:setHeader>
<camel:recipientList id="_recipientList1">
<camel:header>departments</camel:header>
</camel:recipientList>
<camel:log id="_log1" message="Body in original cxf after aggregation ******** ${body} and exchange id is ${exchangeId}"/>
</route>
<camel:route id="_route1">
<camel:from id="_from2" uri="direct:finance"/>
<camel:to id="_to1" uri="mySqlComponent:select id,Location,Head,email,create_date from finance"/>
<camel:to id="_to2" pattern="InOut" uri="seda:departmentAggregator"/>
</camel:route>
<camel:route id="_route2">
<camel:from id="_from3" uri="direct:sales"/>
<camel:to id="_to3" uri="mySqlComponent:select id,Location,Head,email,create_date from sales"/>
<camel:to id="_to4" pattern="InOut" uri="seda:departmentAggregator"/>
</camel:route>
<camel:route id="_route3">
<camel:from id="_from4" uri="direct:hr"/>
<camel:to id="_to5" uri="mySqlComponent:select id,Location,Head,email,create_date from hr"/>
<camel:to id="_to6" pattern="InOut" uri="seda:departmentAggregator"/>
</camel:route>
<camel:route id="_route4">
<camel:from id="_from5" uri="seda:departmentAggregator"/>
<camel:aggregate completionSize="2" id="_aggregate1" strategyRef="myAggregator">
<camel:correlationExpression>
<camel:constant>Constant</camel:constant>
</camel:correlationExpression>
<camel:log message="Aggregated Body : ${body} and exchange id is ${exchangeId}"></camel:log>
<!-- <camel:marshal id="_marshal1" ref="jaxb"/> -->
<!-- <camel:setBody id="_setBody1">
<camel:simple>${body}</camel:simple>
</camel:setBody> -->
</camel:aggregate>
<camel:log id="_log4" message="Body outside aggregate : ${body} and exchange id is ${exchangeId}"/>
</camel:route>
</camelContext>
Now, what I notice is that the body when printed inside the aggregate is indeed an aggregated body, containing all the employees, but when I print the body outside the aggregate, it prints the latest exchange and not the aggregated exchange. Below is my aggregation strategy:
public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
{
ArrayList<Map<String, Object>> depList = newExchange.getIn().getBody(ArrayList.class);
int depCount = depList.size();
System.out.println("Department Count is : " + depCount);
if (oldExchange == null) {
for (int i = 0; i < depCount; i++) {
Map<String, Object> row = (Map) depList.get(i);
Department newDepartment = new Department();
newDepartment.setLocation((String) row.get("Location"));
newDepartment.setHead((String) row.get("Head"));
newDepartment.setEmail((String) row.get("email"));
departments.getDepartment().add(newDepartment);
}
newExchange.getIn().setBody(departments);
return newExchange;
} else {
System.out.println("New Exchange: Department Count is : " + depCount);
departments = oldExchange.getIn().getBody(Departments.class);
System.out.println("Aggregate Department count : " + departments.getDepartment().size());
for (int j = 0; j < depCount; j++) {
Map<String, Object> row = (Map) newExchange.getIn().getBody(ArrayList.class).get(j);
Department newDepartment = new Department();
newDepartment.setLocation((String) row.get("Location"));
newDepartment.setHead((String) row.get("Head"));
newDepartment.setEmail((String) row.get("email"));
departments.getDepartment().add(newDepartment);
}
newExchange.getIn().setBody(departments);
oldExchange.getIn().setBody(departments);
}
// System.out.println("exchange is out capable ? : " +
// newExchange.getPattern().isOutCapable());
return oldExchange;
}
aggregate
block, so if you need to use it somewhere else just send it to your preferred endpoint from within the block. - Erik Karlstrand