3
votes

What I am trying to achieve is implementing something like a synchronized feedback loop with akka streams.

Let's say you've got a Flow[Int].filter(_ % 5 == 0). When you broadcast a stream of Int's to this flow and zip the tuples directly behind it, you get something like

(0,0)
(5,1)
(10,2)

Is there a way to emit an Option[Int], which indicates, whether the flow emitted an element after I pushed the next through it or not?

(Some(0),0)
(None, 1)
(None, 2)
(None, 3)
(None, 4)
(Some(5), 5)
(None, 6)
...

I thought about implementing my own DetachedStage's right in front and behind the Flow to hold a state, whenever the flow pulled on the stage before, I knew he needs the next element. When the stage behind did not receive an element, it was None.

Unfortunately, the results are not good and off by many positions.

side notes

The filter Flow is just an example, it can be a realy long flow, where I can't provide the ability to emit an Option in every stage in it, so I really have to know, whether the flow pushed the next or did not an requested the next from the downstream instead

I also played around with conflate and expand, but these we're even worse with position offsets of the results

One thing I changed in the configuration was an initial and max buffer for the flow, so that I can be sure the indicated demand really is after the element I pushed through it.

It would be nice to get some suggestions on how to solve this problem!

1

1 Answers

2
votes

I can't produce exactly what you are looking for. But I can finagle a Future of what you are looking for, e.g.:

(Future(Some(0)), 0)
(Future(None)   , 1)
(Future(None)   , 2)
...

Expanding on your example, if given a Flow that cannot be changed:

val flow = Flow[Int].filter(_ % 5 == 0)

Then this flow can be evaluated on a singular input and the result converted to an Option:

import scala.concurrent.{Future, Promise}
import akka.stream.{Materializer, ActorMaterializer}
import akka.stream.scaladsl.{Source,Sink}

def evalFlow(in : Int, flow : Flow[Int, Int, _])(implicit mat : Materializer, ec : ExecutionContext) = {
  val fut : Future[Int] = 
    Source.single(in)
          .via(flow)
          .runWith(Sink.head) //Throws an Exception if filter fails

  fut.map(Some(_))              //       val => Some(val)
     .fallbackTo(Promise(None)) // Exception => None
} 

This function returns a Future[Option[Int]]. We can then use the evaluation to simply combine the result with the input:

def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) =
  (evalFlow(in, flow), in)//(Future[Option[Int]], Int)

And, finally, the evalAndCombine function can be placed after your Source of Ints:

import akka.actor.ActorSystem

implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher

val exampleSource = Source(() => (1 to 6).toIterator)

val tupleSource = exampleSource map evalAndCombine(flow)

Similarly, if you want a Future[(Option[Int], Int)] instead of (Future[Option[Int]], Int), e.g. :

Future[(Some(0), 0)]
Future[(None   , 1)]
...

Then slightly modify the combine function:

def evalAndCombine(flow : Flow[Int, Int, _])(in : Int)(implicit mat : Materializer, ec : ExecutionContext) =
  evalFlow(in, flow) map (option => (option, in))//Future[(Option[Int], Int)]