5
votes

I need to do something really similar to this https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala

my problem is that I have an unknown number of groups and if the number of parallelism of the mapAsync is less of the number of groups i got and error in the last sink

Tearing down SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt) due to upstream error (akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon$2)

I tried to put a buffer in the middle as suggested in the pattern guide of akka streams http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html

groupBy {
  case LoglevelPattern(level) => level
  case other                  => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
  // write lines of each group to a separate file
  mapAsync(parallelism = 2) {....

but with the same result

1
I wonder if using mapAsync serves any purpose in the first place? What happens if you just use map instead? - jrudolph
with map the groups are not consumed in parallel / async which is my desired behavior - Sammyrulez
I think that's a misconception. All of the groups are represented by a Source[Something] (after groupBy you have a Source[Source[Something]], right?). So, the only thing you need to do inside of the map (foreach should work as well) would be to run the subflows which is an asynchronous operation. The subflows would then run on their own and your map element would be free to accept the next Source[Something]. - jrudolph

1 Answers

4
votes

Expanding on jrudolph's comment which is completely correct...

You do not need a mapAsync in this instance. As a basic example, suppose you have a source of tuples

import akka.stream.scaladsl.{Source, Sink}

def data() = List(("foo", 1),
                  ("foo", 2),
                  ("bar", 1),
                  ("foo", 3),
                  ("bar", 2))

val originalSource = Source(data)

You can then perform a groupBy to create a Source of Sources

def getID(tuple : (String, Int)) = tuple._1

//a Source of (String, Source[(String, Int),_])
val groupedSource = originalSource groupBy getID

Each one of the grouped Sources can be processed in parallel with just a map, no need for anything fancy. Here is an example of each grouping being summed in an independent stream:

import akka.actor.ActorSystem
import akka.stream.ACtorMaterializer

implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher

def getValues(tuple : (String, Int)) = tuple._2

//does not have to be a def, we can re-use the same sink over-and-over
val sumSink = Sink.fold[Int,Int](0)(_ + _)

//a Source of (String, Future[Int])
val sumSource  = 
  groupedSource map { case (id, src) => 
    id -> {src map getValues runWith sumSink} //calculate sum in independent stream
  }

Now all of the "foo" numbers are being summed in parallel with all of the "bar" numbers.

mapAsync is used when you have a encapsulated function that returns a Future[T] and you're trying to emit a T instead; which is not the case in you question. Further, mapAsync involves waiting for results which is not reactive...