1
votes

I am trying to build an Akka Stream Source which receives data by making Future API calls (The nature of API is scrolling, which incrementally fetches results). To build such Source, I am using GraphStage.

I have modified the NumberSource example which simply pushes an Int at a time. The only change I did was to replace that Int with getvalue(): Future[Int] (to simulate the API call):

class NumbersSource extends GraphStage[SourceShape[Int]] {
  val out: Outlet[Int] = Outlet("NumbersSource")
  override val shape: SourceShape[Int] = SourceShape(out)

  // simple example of future API call
  private def getvalue(): Future[Int] = Future.successful(Random.nextInt())


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
            // Future API call
            getvalue().onComplete{
              case Success(value) =>
                println("Pushing value received..") // this is currently being printed just once
                push(out, counter)
              case Failure(exception) =>
            }
          }
        }
      })
    }
}

// Using the Source and Running the stream

  val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource
  val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)

  val done: Future[Done] = mySource.runForeach{
    num => println(s"Received: $num") // This is currently not printed
  }
  done.onComplete(_ => system.terminate())

The above code doesn't work. The println statement inside setHandler is executed just once and nothing is pushed downstream.

How should such Future calls be handled ? Thanks.

UPDATE

I tried to use getAsyncCallback by making changes as follow:

class NumbersSource(futureNum: Future[Int]) extends GraphStage[SourceShape[Int]] {
  val out: Outlet[Int] = Outlet("NumbersSource")
  override val shape: SourceShape[Int] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {

      override def preStart(): Unit = {
        val callback = getAsyncCallback[Int] { (_) =>
          completeStage()
        }
        futureNum.foreach(callback.invoke)
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          val value: Int = ??? // How to get this value ??
          push(out, value)
        }
      })
    }
}

// Using the Source and Running the Stream

def random(): Future[Int] = Future.successful(Random.nextInt())

  val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource(random())
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)

  val done: Future[Done] = mySource.runForeach{
    num => println(s"Received: $num") // This is currently not printed
  }
  done.onComplete(_ => system.terminate())

But, now I am stuck at how to grab the value computed from Future. In case of a GraphStage, Flow, I could use:

val value = grab(in) // where in is Inlet of a Flow

But, what I have is a GraphStage, Source, so I have no idea how to grab the Int value of computed Future above.

2
Try to change the source as lazySource to see what happens. Do you need to create a GraphStageLogic?. I think that you could create a Source from a Future: doc.akka.io/docs/akka/current/stream/operators/Source/… and use statefulMapConcat for the counter.Emiliano Martinez
I tried as: lazy val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph). But got same results, so didn't work. Yes, I need GraphStageLogic because the Future API call is a scrolling API. It requires passing the scroller state from one response to another, to incrementally fetch results. Also, I need to have some buffering and error handling. From my understanding, it can't be built using Source from a Future. It could be achieved using Source.actorPublisher, but that's now deprecated and docs is suggest to use GraphStage, which is what I am trying.oblivion
Why is this question being voted as Close ?? All I am asking is how to use Future inside GraphStage[SourceShape[T]]. If it's not possible to use, then that should be explicitly suggested.oblivion
If the downstream has not pulled already when the future completes and the async callback gets the value (you are ignoring the value now with the (_)) you'll have to hang on to the value in a field until there is a pull. Note that as others have also said, there already are a few operators for interacting with futures though, so unless this is just a learning exercise I'd recommend using one of them (mapAsync, Source.future, unfoldAsync etc)johanandren

2 Answers

0
votes

I'm not sure if I understand correctly, but if you are trying to implement an infinite source out of elements computed in Futures then there is really no need to do it with your own GraphStage. You can do it simply as below:

Source.repeat(())
    .mapAsync(parallelism) { _ => Future.successful(Random.nextInt()) }

The Source.repeat(()) is simply an infinite source of some arbitrary values (of type Unit in this case, but you can change () to whatever you want since it's ignored here). mapAsync then is used to integrate the asynchronous computations into the flow.

0
votes

I would join to the other answer to try to avoid creating your own graphstage. After some experimentation this is what seems to work for me:

type Data = Int

trait DbResponse {
  // this is just a callback for a compact solution
  def nextPage: Option[() => Future[DbResponse]] 
  def data: List[Data]
}

def createSource(dbCall: DbResponse): Source[Data, NotUsed] = {
  val thisPageSource = Source.apply(dbCall.data)
  val nextPageSource = dbCall.nextPage match {
    case Some(dbCallBack) => Source.lazySource(() => Source.future(dbCallBack()).flatMapConcat(createSource))
    case None             => Source.empty
  }
  thisPageSource.concat(nextPageSource)
}

val dataSource: Source[Data, NotUsed] = Source
   .future(???: Future[DbResponse]) // the first db call
   .flatMapConcat(createSource)

I tried it out and it works almost perfectly, I couldn't find out why, but the second page is instantaneously requested, but the rest will work as expected (with backpressure and what not).