4
votes

Suppose I have a Source of different fruits, and I want to insert their count to a database.

I can do something like this:

Flow[Fruits]
.map { item =>
    insertItemToDatabase(item)
}

But that is obviously slow – why insert to a database with every item, when I can group them up? So I came up with a better solution:

Flow[Fruits]
.grouped(10000)
.map { items =>
    insertItemsToDatabase(items)
}

But that means that I have to hold 10 000 elements [banana, orange, orange, orange, banana, ...] in memory until they are flushed to database. Isn't this inefficient? Perhaps I can do something like this:

Flow[Fruits]
.grouped(100)
.map { items =>
    consolidate(items)  // this will return Map[String, Int]
}
.grouped(100)
// here I have Seq[Map[String, Int]]
.map { mapOfItems=>
    insertMapToDatabase(mapOfItems)
}

From my understanding, this should also process 10 000 elements at once, but shouldn't take up as much memory (providing the elements are repeated often). But each key is still repeated 100 times in memory. Sure I can do .grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()... But isn't there a better way? Perhaps something like this:

Flow[Fruits]
.map { item =>
    addToMap(item)
    if(myMap.length == 10000) {
        insertToDatabase(myMap)
        clearMyMap()
    }
}

But doesn't it break the concept of Akka streams, namely independency (and therefore concurrency) of processing stages?

1
Have a look to the function groupedWithin. It takes two parameters : a max bound of elements and a time rate. For example .groupedWithnin(5000, 1.seconds) will give 5000 elements to process if you reached it before 1 seconds or it will give the number of elements accumulated in 1 second.alifirat
Thanks @alifirat for your suggestion, but that is just a different way to group. What I need is a different way to process the data I have, both memory friendly and database friendly.Honza Zíka

1 Answers

2
votes

If the cardinality of the Fruit set is low then you can keep a singular Map with all of the counts and then flush that to the database after streaming through all of the Fruit values.

First, construct a Flow that will keep the running count:

type Count = Int

type FruitCount = Map[Fruit, Count]

val zeroCount : FruitCount = 
  Map.empty[Fruit, Count] withDefaultValue 0

val appendFruitToCount : (FruitCount, Fruit) => FruitCount = 
  (fruitCount, fruit) => fruitCount + (fruit -> fruitCount(fruit) + 1)

val fruitCountFlow : Flow[Fruit, FruitCount, NotUsed] =
  Flow[Fruit].scan(zeroCount)(appendFruitToCount)

Now create a Sink that will receive the last FruitCount and materialize the stream:

val lastFruitCountSink : Sink[FruitCount, _] = Sink.lastOption[FruitCount]

val fruitSource : Source[Fruit, NotUsed] = ???

val lastFruitCountFut : Future[Option[FruitCount]] = 
  fruitSource
    .via(fruitCountFlow)
    .to(lastFruitCountSink)
    .run()

The lastFruitCountFut can then be used to send values to the database:

lastFruitCountFut foreach (_ foreach (_ foreach { (fruit, count) =>
  insertItemsToDatabase( Iterator.fill(count)(fruit) )
}))

An Iterator is used because it is the most memory efficient collection for constructing a TraversableOnce of Fruit items.

This solution will only keep 1 Map in memory which will have 1 key for each distinct Fruit type & 1 Integer for each key.