What I am trying to achieve is implementing something like a synchronized feedback loop with akka streams.
Let's say you've got a Flow[Int].filter(_ % 5 == 0)
. When you broadcast a stream of Int
's to this flow and zip the tuples directly behind it, you get something like
(0,0)
(5,1)
(10,2)
Is there a way to emit an Option[Int]
, which indicates, whether the flow emitted an element after I pushed the next through it or not?
(Some(0),0)
(None, 1)
(None, 2)
(None, 3)
(None, 4)
(Some(5), 5)
(None, 6)
...
I thought about implementing my own DetachedStage
's right in front and behind the Flow
to hold a state, whenever the flow pulled on the stage before, I knew he needs the next element. When the stage behind did not receive an element, it was None.
Unfortunately, the results are not good and off by many positions.
side notes
The filter Flow is just an example, it can be a realy long flow, where I can't provide the ability to emit an Option
in every stage in it, so I really have to know, whether the flow pushed the next or did not an requested the next from the downstream instead
I also played around with conflate
and expand
, but these we're even worse with position offsets of the results
One thing I changed in the configuration was an initial
and max
buffer for the flow, so that I can be sure the indicated demand really is after the element I pushed through it.
It would be nice to get some suggestions on how to solve this problem!