I am trying to build an Akka Stream Source which receives data by making Future
API calls (The nature of API is scrolling, which incrementally fetches results). To build such Source, I am using GraphStage.
I have modified the NumberSource example which simply pushes an Int
at a time. The only change I did was to replace that Int
with getvalue(): Future[Int]
(to simulate the API call):
class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
// simple example of future API call
private def getvalue(): Future[Int] = Future.successful(Random.nextInt())
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
// Future API call
getvalue().onComplete{
case Success(value) =>
println("Pushing value received..") // this is currently being printed just once
push(out, counter)
case Failure(exception) =>
}
}
}
})
}
}
// Using the Source and Running the stream
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
val done: Future[Done] = mySource.runForeach{
num => println(s"Received: $num") // This is currently not printed
}
done.onComplete(_ => system.terminate())
The above code doesn't work. The println statement inside setHandler
is executed just once and nothing is pushed downstream.
How should such Future calls be handled ? Thanks.
UPDATE
I tried to use getAsyncCallback by making changes as follow:
class NumbersSource(futureNum: Future[Int]) extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
override def preStart(): Unit = {
val callback = getAsyncCallback[Int] { (_) =>
completeStage()
}
futureNum.foreach(callback.invoke)
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
val value: Int = ??? // How to get this value ??
push(out, value)
}
})
}
}
// Using the Source and Running the Stream
def random(): Future[Int] = Future.successful(Random.nextInt())
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource(random())
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
val done: Future[Done] = mySource.runForeach{
num => println(s"Received: $num") // This is currently not printed
}
done.onComplete(_ => system.terminate())
But, now I am stuck at how to grab the value computed from Future. In case of a GraphStage
, Flow
, I could use:
val value = grab(in) // where in is Inlet of a Flow
But, what I have is a GraphStage
, Source
, so I have no idea how to grab the Int value of computed Future above.
lazy val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
. But got same results, so didn't work. Yes, I needGraphStageLogic
because the Future API call is a scrolling API. It requires passing the scroller state from one response to another, to incrementally fetch results. Also, I need to have some buffering and error handling. From my understanding, it can't be built using Source from a Future. It could be achieved usingSource.actorPublisher
, but that's now deprecated and docs is suggest to useGraphStage
, which is what I am trying. – oblivionFuture
insideGraphStage[SourceShape[T]]
. If it's not possible to use, then that should be explicitly suggested. – oblivion(_)
) you'll have to hang on to the value in a field until there is a pull. Note that as others have also said, there already are a few operators for interacting with futures though, so unless this is just a learning exercise I'd recommend using one of them (mapAsync
,Source.future
,unfoldAsync
etc) – johanandren