2
votes

One of the stages of my computation graph is a flow of type Flow[Seq[Request], Seq[Response], NotUsed]. Obviously, this stage should assign a response to every request, and emit the seq once all of the requests are resolved.

Now, the underlying API has a harsh rate limiting policy, so I can only fire a single request per second. If I had a Flow of single Requests, I could zip this stream with a one that emits a single element per second (How to limit an Akka Stream to execute and send down one message only once per second?), but I don't see a similar solution in this case.

Is there a nice way to express this? The idea that comes to my mind is using the low level Graph DSL and having a one-second-tick stream as state there, and using it to process the sequences of the requests, but I doubt that it will turn out good-looking.

2
Have you considered flow.throttle? - Viktor Klang
Yes, but as I have a Seq[Request], I need to wait between each request in this Seq. So, I need dome kind of an inner throttle as well - roman-roman
flatMapConcat(seq => Source(seq).throttle(…).grouped(seq.size))? - Viktor Klang
@ViktorKlang as far as I understand, this will throttle the flow within each group, but not between the groups themselves, as for each seq we create a new independent throttle. - roman-roman
What does your test show? - Viktor Klang

2 Answers

2
votes

As Victor said you should probably use default throttle. But in case you want to do it yourself it may look like this

private def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val ticker = Source.tick(rate, rate, Unit)

  val zip = builder.add(Zip[T, Unit.type])
  val map = Flow[(T, Unit.type)].map { case (value, _) => value }
  val messageExtractor = builder.add(map)

  ticker ~> zip.in1
  zip.out ~> messageExtractor.in

  FlowShape.of(zip.in0, messageExtractor.out)
})

// And it will be used in your flow as follows
// .via(throttleFlow(FiniteDuration(200, MILLISECONDS)))

Also since you're limiting access to some API you may want to limit calls to it in centralized fashion. Say you have multiple places in your project that make calls to the same external API but because calls come from same IP throttling should be applied to all of them. For such case consider using MergeHub.source for your (supposedly) akka-http flow. Each caller will create and execute new graph to make a call.

2
votes

Here is what I'm ending up using:

  case class FlowItem[I](i: I, requests: Seq[HttpRequest], responses: Seq[String]) {
    def withResponse(resp: String) = copy(responses = resp +: responses)
    def extractNextRequest = (requests.head, copy(requests = requests.tail))
  }


 def apiFlow[I, O](requestPer: FiniteDuration,
                    buildRequests: I => Seq[HttpRequest],
                    buildOut: (I, Seq[String]) => O
                   )(implicit system: ActorSystem, materializer: ActorMaterializer) = {
    GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val in: FlowShape[I, FlowItem[I]] =
        b.add(Flow[I].map(i => FlowItem(i, buildRequests(i), Seq.empty)))

      val merge: MergePreferredShape[FlowItem[I]] =
        b.add(MergePreferred[FlowItem[I]](1))

      val throttle: FlowShape[FlowItem[I], FlowItem[I]] =
        b.add(Flow[FlowItem[I]].throttle(1, requestPer, 1, ThrottleMode.shaping))

      val prepareRequest: FlowShape[FlowItem[I], (HttpRequest, FlowItem[I])] =
        b.add(Flow[FlowItem[I]].map(_.extractNextRequest))

      val log =
        b.add(Flow[(HttpRequest, FlowItem[I])].map { r => Console.println(s"rquest to ${r._1.uri}"); r})

      val pool: FlowShape[(HttpRequest, FlowItem[I]), (Try[HttpResponse], FlowItem[I])] =
        b.add(Http(system).superPool[FlowItem[I]]())

      val transformResponse: FlowShape[(Try[HttpResponse], FlowItem[I]), FlowItem[I]] =
        b.add(Flow[(Try[HttpResponse], FlowItem[I])].mapAsync(1) {
          case (Success(HttpResponse(StatusCodes.OK, headers, entity, _)), flowItem) =>
            entity.toStrict(1.second).map(resp => flowItem.withResponse(resp.data.utf8String))
        })

      val split: UniformFanOutShape[FlowItem[I], FlowItem[I]] =
        b.add(Partition[FlowItem[I]](2, fi => if (fi.requests.isEmpty) 0 else 1))


      val out: FlowShape[FlowItem[I], O] =
        b.add(Flow[FlowItem[I]].map(fi => buildOut(fi.i, fi.responses)))

        in ~> merge ~> throttle ~> prepareRequest ~> log ~> pool ~> transformResponse ~> split ~> out
              merge.preferred   <~                                                       split

      FlowShape(in.in, out.out)
    }
  }

The idea is to pass the elements throw the throttle as many times as there are requests, and store the additional (not yet performed) requests along with the messages. The split element checks if there are more requests left.