I'm using gpars to process a 250M row MySQL database table in parallel. I create 8 gpars threads, 8 independent database connections and divide the data in such a way that each thread operates independently on different ranges of rows...sort of a cheap MapReduce concept. At the core, the logic is like this:
withExistingPool(pool)
{
connection_array.collectParallel()
{
// Figure out which connection this thread can use.
// We use the index into the array to figure out
// which thread we are, and this tells us where to
// read data.
int i
for (i = 0; i < connection_array.size(); i++)
if (it == connection_array[i])
break
// Each thread runs the same query, with LIMIT controlling
// the position of rows it will read...if we have 8 threads
// reading 40000 rows per call to this routine, each thread
// reads 5000 rows (thread-0 reads rows 0-4999, thread-1 reads
// 5000-9999 and so forth).
def startrow = lastrow + (i * MAX_ROWS)
def rows = it.rows( "SELECT * ... LIMIT ($startrow, $MAX_ROWS)")
// Add our rows to the result set we will return to the caller
// (needs to be serialized since many threads can be here)
lock.lock()
if (!result)
result = rows
else
result += rows
lock.unlock()
}
}
The code works great initially, giving me over 10,000 rows per second when it starts. But after a few million rows, it begins to slow down. By the time we're 25 million rows in, instead of 10,000 rows per second, we're only getting 1,000 rows per second. If we kill the app and restart it from the point we left off, it goes back to 10K rows per second again for a while, but it always slows down as processing continues.
There's plenty of processing power available - this is an 8-way system and the database is over a network, so there's a fair bit of wait time no matter what. The processors generally run no more than 25-30% busy while this is running. Also don't seem to be any memory leaks - we monitor memory statistics and don't see any change once processing is underway. The MySQL Server doesn't seem to be stressed (it runs about 30% busy initially, decreasing as the app slows down).
Are there any tricks to help this sort of thing run more consistently with large numbers of iterations?
(0..<connection_array.size()).collectParallel
instead of your array, then you won't need to search for the current index – tim_yatesresults = connection_array.collectParallel()
and get rid of your last if block... Or try keeping it and change toeachParallel
– tim_yates