0
votes

I am trying to figure out how to perform the following stateful operation with akka-stream:

Let say i am emitting trough the stream a set of element e which contain an arbitrary set of element a.

I would like to rate limit the number of element e passed downstream, based on the overall amount of element a, that the number of element e received represent.

e.g. 4

Incoming stream

--> e1(a1e1)

--> e2(a1e2, a2e2)

--> e3(a1e3)

--> e4(a1e4, a2e4)

--> e5(a1e5, a2e5)

Would emit

group1 [e1, e2, e3]

group2 [e4, e5]

ultimately, this should be timed as in groupWithin. If a certain amount of time pass then just emit whatever you have.

Sounds like statefulmapContact might be the thing to look at but i am not sure.

If anyone expert in akka-stream could help here, that would be awesome.

1

1 Answers

0
votes

I assume from the description, that you want to control how fast you produce elements to the downstream. Moreover you have different cost of processing each element.

There are few options out of the box which controls the speed of the stream.

  1. Probably you want to use throttle. It controls the throughput of the stream.

    throttle - Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element. import java.time.LocalDateTime

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.Source
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    import scala.util.Random
    
    object ThrottleExample extends App {
    
        implicit val sys: ActorSystem = ActorSystem()
        implicit val mat: ActorMaterializer = ActorMaterializer()
    
        case class E(as: Seq[Int])
    
        val f = Source(1 to 20)
            .map(_ => Random.nextInt(7))
            .map(len => E((1 to len).map(_ => 1)))
            .throttle(5, 1.second, _.as.size)
            .runForeach(e => {
                println(s"${LocalDateTime.now()} -> $e")
            })
    
        f.onComplete(_ => {
            mat.shutdown()
            sys.terminate()
        })
    }
    
  2. Another option would be to use a combination of grouping flow, e.g. groupedWeightedWithin(take element to batch up to some batch cost or time passed) or batchWeighted(makes batches/aggregations if the downstream is slower) together with simple throttle.

    groupedWeightedWithin - Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, whatever happens first.


    batchWeighted - Allow for a slower downstream by passing incoming elements and a summary into an aggregate function as long as there is backpressure and a maximum weight batched elements is not yet reached.
    import java.time.LocalDateTime
    
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.Source
    
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    import scala.util.Random
    
    object GroupedWithingExample extends App {
    
        implicit val sys: ActorSystem = ActorSystem()
        implicit val mat: ActorMaterializer = ActorMaterializer()
    
        case class E(as: Seq[Int])
    
        val f = Source(1 to 20)
            .map(_ => Random.nextInt(5))
            .map(len => E((1 to len).map(_ => 1)))
            .groupedWeightedWithin(7, 1.second)(_.as.length)
            .throttle(1, 1.second)
            .runForeach(e => {
                println(s"${LocalDateTime.now()} -> $e")
            })
    
        f.onComplete(_ => {
            mat.shutdown()
            sys.terminate()
        })
    }