I've asked similar questions, and had great responses..but my request here seems sufficiently different enough to ask separately.
The Camel Aggregator, as awesome as it is, is not going to cut it for me. I need to aggregate exchange data and when I hit a certain size, forward this onto a queue. When that happens I can then ACK the the original source messages off the queue. The persistence choices of the aggregator isn't really an option based on environmental reasons. No rdms around, and other options would be locally managed state. If the route went down, or the box then I need to be able to carry on processing, and if I had messasges in that db then it is a recovery job. Thanks ZK and camels integration to it!
I'm basically thinking I need to implement a processor/or a bean (what are the subtle differences?) that will take exchanges and put them in a map. When I hit a size forward on the joined exchange to an endpoint, and then somehow ack all the messages.
What I want to know is what api do I use to control the exchange to effectively stop it with out acking and pull what I need to be able to ack later.
Can anyone provide some guidance and point me at the relevant functions on the objects of interest?
I have a nice simple idea to this. I was going to extend the Rabbit* classes and specifically the RabbitConsumer doHandleDelivery and have that do my noddy aggregation. That would call Exchange exchange = consumer.getEndpoint().createRabbitExchange(envelope, properties, body);
once the aggregation has complete. And depending on the result of consumer.getProcessor().process(exchange);
it would ack or rej all the messages. On the face of it I would say it would all work quite well. Ok I would need some synchronisation in the RabbitConsumer..