1
votes

My question is (1) is there a better strategy for solving my problem (2) is it possible to tweak/improve my solution so it works and doesn't split the aggregation in a reliable manner (3 the less important one) how can i debug it more intelligently? figuring out wtf the aggregator is doing is difficult because it only fails on giant batches that are hard to debug because of their size. answers to any of these would be very useful, most importantly the first two.

I think the problem is I'm not expressing to camel correctly that I need it to treat the CSV file coming in as a single lump and i dont want the aggregator to stop till all the records have been aggregated.

I'm writing a route to digest a million line CSV file, split then aggregate the data on some key primary fields, then write the aggregated records to a table

unforuntaely the primary constraints of the table are getting violated (which also correspond to the aggregation keys), implying that the aggregator is not waiting for the whole input to finish.

it works fine for small files of a few thousand records, but on the large sizes it will actually face in production, (1,000,000 records) it fails.

Firstly it fails with a JavaHeap memory error on the split after the CSV unmarshall. I fix this with .streaming(). This impacts the aggregator, where the aggregator 'completes' too early.

to illustrate:

A 1 
A 2 
B 2
--- aggregator split --- 
B 1
A 2

--> A(3),B(2) ... A(2),B(1) = constraint violation because 2 lots of A's etc.
when what I want is A(5),B(3)

with examples of 100, 1000 etc, records it works fine and correctly. but when it processes 1,000,000 records, which is the real-size it needs to handle, firstly the split() gets an OutOfJavaHeapSpace exception.

I felt that simply changing the heap-size would be a short-term solution and just pushing the problem back until the next upper-limit of records comes through, so I got around it by using the .streaming() on the split.

Unfortunately now, the aggregator is being drip-fed the records, not getting them in a big cludge and it seems to be completing early and doing another aggregation, which is violating my primary constraint

from( file://inbox )
.unmarshall().bindy().
.split().body().streaming()

.setHeader( "X" Expression building string of primary-key fields)
.aggregate( header("X") ... ).completionTimeout( 15000 )

etc.

I think part of the problem is that I'm depending on the streaming split not timeing out longer than a fixed amount of time, which just isn't foolproof - e.g. a system task might reasonably cause this, etc. Also everytime I increase this timeout it makes it longer and longer to debug and test this stuff.

PRobably a better solution would be to read the size of the CSV file that comes in and not allow the aggregator to complete until every record has been processed. I have no idea how I'd express this in camel however.

Very possibly I just have a fundamental stategy misunderstanding of how I should be approaching / describing this problem. There may be a much better (simpler) approach that I dont know.

there's also such a large amount of records going in, I can't realistically debug them by hand to get an idea of what's happening (I'm also breaking the timeout on the aggregator when I do, I suspect)

1

1 Answers

3
votes

You can split the file first one a line by line case, and then convert each line to CSV. Then you can run the splitter in streaming mode, and therefore have low memory consumption, and be able to read a file with a million records.

There is some blog links from this page http://camel.apache.org/articles about splitting big files in Camel. They cover XML though, but would be related to splitting big CSV files as well.