I'd like to implement a Flow to handle paginated results (e.g., underlying service returns some results, but also indicates that more results are available by making another request, passing in e.g. a cursor).
Things I've done so far:
I have implemented the following flow and test, but the flow doesn't complete.
object AdditionalRequestsFlow { private def keepRequest[Request, Response](flow: Flow[Request, Response, NotUsed]): Flow[Request, (Request, Response), NotUsed] = { Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => import GraphDSL.Implicits._ val in = builder.add(Flow[Request]) val bcast = builder.add(Broadcast[Request](2)) val merge = builder.add(Zip[Request, Response]()) in ~> bcast ~> merge.in0 bcast ~> flow ~> merge.in1 FlowShape(in.in, merge.out) }) } def flow[Request, Response, Output]( inputFlow: Flow[Request, Response, NotUsed], anotherRequest: (Request, Response) => Option[Request], extractOutput: Response => Output, mergeOutput: (Output, Output) => Output ): Flow[Request, Output, NotUsed] = { Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val start = b.add(Flow[Request]) val merge = b.add(Merge[Request](2)) val underlying = b.add(keepRequest(inputFlow)) val unOption = b.add(Flow[Option[Request]].mapConcat(_.toList)) val unzip = b.add(UnzipWith[(Request, Response), Response, Option[Request]] { case (req, res) => (res, anotherRequest(req, res)) }) val finish = b.add(Flow[Response].map(extractOutput)) // this is wrong as we don't keep to 1 Request -> 1 Output, but first let's get the flow to work start ~> merge ~> underlying ~> unzip.in unzip.out0 ~> finish merge <~ unOption <~ unzip.out1 FlowShape(start.in, finish.out) }) } }
The test:
import akka.NotUsed import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Flow, Sink, Source} import org.scalatest.FlatSpec import org.scalatest.Matchers._ import cats.syntax.option._ import org.scalatest.concurrent.ScalaFutures.whenReady class AdditionalRequestsFlowSpec extends FlatSpec { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() case class Request(max: Int, batchSize: Int, offset: Option[Int] = None) case class Response(values: List[Int], nextOffset: Option[Int]) private val flow: Flow[Request, Response, NotUsed] = { Flow[Request] .map { request => val start = request.offset.getOrElse(0) val end = Math.min(request.max, start + request.batchSize) val nextOffset = if (end == request.max) None else Some(end) val result = Response((start until end).toList, nextOffset) result } } "AdditionalRequestsFlow" should "collect additional responses" in { def anotherRequest(request: Request, response: Response): Option[Request] = { response.nextOffset.map { nextOffset => request.copy(offset = nextOffset.some) } } def extract(x: Response): List[Int] = x.values def merge(a: List[Int], b: List[Int]): List[Int] = a ::: b val requests = Request(max = 35, batchSize = 10) :: Request(max = 5, batchSize = 10) :: Request(max = 100, batchSize = 1) :: Nil val expected = requests.map { x => (0 until x.max).toList } val future = Source(requests) .via(AdditionalRequestsFlow.flow(flow, anotherRequest, extract, merge)) .runWith(Sink.seq) whenReady(future) { x => x shouldEqual expected } } }
Implemented the same flow in a terrible, blocking way to illustrate what I'm trying to achieve:
def uglyHackFlow[Request, Response, Output]( inputFlow: Flow[Request, Response, NotUsed], anotherRequest: (Request, Response) => Option[Request], extractOutput: Response => Output, mergeOutput: (Output, Output) => Output ): Flow[Request, Output, NotUsed] = { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() Flow[Request] .map { x => def grab(request: Request): Output = { val response = Await.result(Source.single(request).via(inputFlow).runWith(Sink.head), 10.seconds) // :( val another = anotherRequest(request, response) val output = extractOutput(response) another.map { another => mergeOutput(output, grab(another)) } getOrElse output } grab(x) } }
This works (but we should not be materializing anything /
Await
-ing at this point).Reviewed http://doc.akka.io/docs/akka/2.4/scala/stream/stream-graphs.html#Graph_cycles__liveness_and_deadlocks which I believe contains the answer, however I cannot seem to find it there. In my case, I would expect the cycle should contain one element at most times so neither buffer overflow nor complete starvation should occur - but evidently does.
Tried to debug the stream using
.withAttributes(Attributes(LogLevels(...)))
however it doesn't result in any output despite seemingly correctly configured loggers.
I'm looking for hints how to fix the flow
method keeping the same signature and semantics (test would pass).
Or perhaps I'm doing something completely off-base here (e.g., there is an existing feature in, say, akka-stream-contrib
which solves this)?
<entry/
> tags and then a<link href="..." rel="next"/>
for the next URL to invoke to get the next response with more entries. I've currently connected myuglyHackFlow
and it works fine for this use case. – John M