Suppose I have a Source of different fruits, and I want to insert their count to a database.
I can do something like this:
Flow[Fruits]
.map { item =>
insertItemToDatabase(item)
}
But that is obviously slow – why insert to a database with every item, when I can group them up? So I came up with a better solution:
Flow[Fruits]
.grouped(10000)
.map { items =>
insertItemsToDatabase(items)
}
But that means that I have to hold 10 000 elements [banana, orange, orange, orange, banana, ...]
in memory until they are flushed to database. Isn't this inefficient? Perhaps I can do something like this:
Flow[Fruits]
.grouped(100)
.map { items =>
consolidate(items) // this will return Map[String, Int]
}
.grouped(100)
// here I have Seq[Map[String, Int]]
.map { mapOfItems=>
insertMapToDatabase(mapOfItems)
}
From my understanding, this should also process 10 000 elements at once, but shouldn't take up as much memory (providing the elements are repeated often). But each key is still repeated 100 times in memory. Sure I can do .grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()
... But isn't there a better way? Perhaps something like this:
Flow[Fruits]
.map { item =>
addToMap(item)
if(myMap.length == 10000) {
insertToDatabase(myMap)
clearMyMap()
}
}
But doesn't it break the concept of Akka streams, namely independency (and therefore concurrency) of processing stages?
groupedWithin
. It takes two parameters : a max bound of elements and a time rate. For example.groupedWithnin(5000, 1.seconds)
will give 5000 elements to process if you reached it before 1 seconds or it will give the number of elements accumulated in 1 second. – alifirat