1
votes

While trying to use the Apache camel aggregation strategy, I am running into an issue as described below:

My application : Exposed a simple soap service that takes in the names of the departments in an organisation (finance, sales...). Inside my camel route, I route the request accordingly to the department specific routes. In these routes, I query a table, and get the employees for that department.

As a response for the soap service, I want to aggregate all the employees together and send.

Below is my camel Route:

 <camelContext id="camel" 
 xmlns="http://camel.apache.org/schema/spring">
    <camel:dataFormats>
        <camel:jaxb contextPath="org.example.departments" id="jaxb"/>
    </camel:dataFormats>
    <route id="simple-route">
        <!-- <camel:to id="unmarshallDeps" uri="bean:departmentProcessor"></camel:to> -->
        <from id="_from1" uri="cxf:bean:departmentsEndpoint?dataFormat=PAYLOAD&amp;loggingFeatureEnabled=true"/>
        <camel:unmarshal id="_unmarshal1" ref="jaxb"/>
        <camel:setHeader headerName="departments" id="_setHeader1">
            <camel:method method="getDepartmentRoute" ref="depRouter"/>
        </camel:setHeader>
        <camel:recipientList id="_recipientList1">
            <camel:header>departments</camel:header>
        </camel:recipientList>
        <camel:log id="_log1" message="Body in original cxf after aggregation ******** ${body} and exchange id is ${exchangeId}"/>
    </route>
    <camel:route id="_route1">
        <camel:from id="_from2" uri="direct:finance"/>
        <camel:to id="_to1" uri="mySqlComponent:select id,Location,Head,email,create_date from finance"/>
        <camel:to id="_to2" pattern="InOut" uri="seda:departmentAggregator"/>
    </camel:route>
    <camel:route id="_route2">
        <camel:from id="_from3" uri="direct:sales"/>
        <camel:to id="_to3" uri="mySqlComponent:select id,Location,Head,email,create_date from sales"/>
        <camel:to id="_to4" pattern="InOut" uri="seda:departmentAggregator"/>
    </camel:route>
    <camel:route id="_route3">
        <camel:from id="_from4" uri="direct:hr"/>
        <camel:to id="_to5" uri="mySqlComponent:select id,Location,Head,email,create_date from hr"/>
        <camel:to id="_to6" pattern="InOut" uri="seda:departmentAggregator"/>
    </camel:route>
    <camel:route id="_route4">
        <camel:from id="_from5" uri="seda:departmentAggregator"/>
        <camel:aggregate completionSize="2" id="_aggregate1" strategyRef="myAggregator">
            <camel:correlationExpression>
                <camel:constant>Constant</camel:constant>
            </camel:correlationExpression>
            <camel:log message="Aggregated Body : ${body} and exchange id is ${exchangeId}"></camel:log>
            <!-- <camel:marshal id="_marshal1" ref="jaxb"/> -->
            <!-- <camel:setBody id="_setBody1">
                <camel:simple>${body}</camel:simple>
            </camel:setBody> -->
        </camel:aggregate>
        <camel:log id="_log4" message="Body outside aggregate  : ${body} and exchange id is ${exchangeId}"/>
    </camel:route>
</camelContext>

Now, what I notice is that the body when printed inside the aggregate is indeed an aggregated body, containing all the employees, but when I print the body outside the aggregate, it prints the latest exchange and not the aggregated exchange. Below is my aggregation strategy:

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) 
{

    ArrayList<Map<String, Object>> depList = newExchange.getIn().getBody(ArrayList.class);
    int depCount = depList.size();

    System.out.println("Department Count is : " + depCount);

    if (oldExchange == null) {

        for (int i = 0; i < depCount; i++) {

            Map<String, Object> row = (Map) depList.get(i);
            Department newDepartment = new Department();
            newDepartment.setLocation((String) row.get("Location"));
            newDepartment.setHead((String) row.get("Head"));
            newDepartment.setEmail((String) row.get("email"));
            departments.getDepartment().add(newDepartment);
        }
        newExchange.getIn().setBody(departments);
        return newExchange;
    } else {
        System.out.println("New Exchange: Department Count is  : " + depCount);
        departments = oldExchange.getIn().getBody(Departments.class);
        System.out.println("Aggregate Department count : " + departments.getDepartment().size());

        for (int j = 0; j < depCount; j++) {

            Map<String, Object> row = (Map) newExchange.getIn().getBody(ArrayList.class).get(j);
            Department newDepartment = new Department();
            newDepartment.setLocation((String) row.get("Location"));
            newDepartment.setHead((String) row.get("Head"));
            newDepartment.setEmail((String) row.get("email"));
            departments.getDepartment().add(newDepartment);

        }
        newExchange.getIn().setBody(departments);
        oldExchange.getIn().setBody(departments);
    }

    // System.out.println("exchange is out capable ? : " +
    // newExchange.getPattern().isOutCapable());
    return oldExchange;
}
1
This is perfectly normal behavior. The aggregated exchange does not exist outside of the aggregate block, so if you need to use it somewhere else just send it to your preferred endpoint from within the block. - Erik Karlstrand
Thanks noMad... just starting out in Camel... i was assuming that the aggregated exchange would be available for the cxf route as well.. but as you point out, that is indeed not the case. - Sumit Baurai

1 Answers

0
votes

Aggregation Strategy is usually something that gets seamlessly embedded into EIPs; you don't have to call it manually, Camel does it for you.

recipientList is precisely one of those EIPs that accepts an aggregation strategy as a parameter. See example in Camel doc.

Thus your "seda:departmentAggregator" route is in fact not needed (along with all the calls towards it in your "direct:[department]" routes). Instead, its content should be +/- included into the recipientList definition.