I have a CSV I am processing formatted like so:
01,H2,H3
02,B2,B3,B4,B5
02,B2,B3,B4,B5
02,B2,B3,B4,B5
02,B2,B3,B4,B5
01,H2,H3
02,B2,B3,B4,B5
02,B2,B3,B4,B5
01,H2,H3
02,B2,B3,B4,B5
02,B2,B3,B4,B5
02,B2,B3,B4,B5
01 specifies header row and 02 specifies a body row.
I need to take the header data and add it to the body messages so I end up sending messages like this:
H2,H3,B2,B3,B4,B5
H2,H3,B2,B3,B4,B5
H2,H3,B2,B3,B4,B5
I have tried to aggregate but that does not appear to be the right EIP in this scenario, as I am just combining the same message over and over again instead of multiple messages into one... At a fundamental level I need access to the header data in order to process the body (in truth it's just one field). I just don't know how to set a variable as headers and properties are cleared on each exchange. Any tips? Thanks in advance. Let me know if it would help to see the camel route as it stands.
Here is the camel route which may help:
from("direct:inventory")
.split(body().tokenize("\n")).streaming()
.throttle(100)
.choice()
.when(property("CamelSplitComplete").isEqualTo(true))
.log("Processed ${property.CamelSplitSize} updates")
.end()
.unmarshal(csv)
.log("${body}")
.aggregate(header("CamelFileLastModified"), new InventoryAggregationStrategy())
.completionPredicate(header("aggregationComplete").isEqualTo(true))
.to("freemarker://templates/inventory.ftl")
.unmarshal().string("UTF-8")
.unmarshal().json(JsonLibrary.Jackson)
.convertBodyTo(JsonObject.class)
.to("endpoint");