My data from an unbound stream source looks something like this:
value1,
value3,
...,
START,
value155,
...,
value202,
END,
...,
value234,
value235,
...
START,
value298,
...,
value310,
END,
...,
value377,
...
Based on Akka-Streams collecting data (Source -> Flow -> Flow (collect) -> Sink), I came up with the following code using Akka Streams to accumulate messages between a fixed "start key" and "end key" (here START and END):
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
val list = List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10", "start", "d11", "d12", "d13", "d14", "end", "d15")
val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"; also the cycle of "start" and "end" repeats
implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()
Source(list)
.scan(Seq.empty[String]) { (coll, s) =>
if(s.equals("start") || coll.head.equals("start"))
coll :+ s
else
Seq.empty[String] // return empty Seq unless new element == "start"
// or first element of Seq == "start"
}
.filter(_.last.equals("end"))
.to(Sink.foreach(println)).run()
Alas, nothing gets past the filter at all! No output.
Replacing coll.head.equals
and coll.last.equals
with .contains
, returns a result, of course it is not correct, since "end" is at some point always included.
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
val list = List("d1", "d2", "d3", "start", "d4", "d5", "d6", "d7", "end", "d9", "d10", "start", "d11", "d12", "d13", "d14", "end", "d15")
val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"; also the cycle of "start" and "end" repeats
implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()
Source(list)
.scan(Seq.empty[String]) { (coll, s) =>
if(s.equals("start") || coll.contains("start"))
coll :+ s
else
Seq.empty[String]
}
.filter(_.contains("end"))
.to(Sink.foreach(println)).run()
As expected, the output is:
List(start, d4, d5, d6, d7, end)
List(start, d4, d5, d6, d7, end, d9)
List(start, d4, d5, d6, d7, end, d9, d10)
Any suggestions on how to solve this? I suspect some "materialization" needs to be forced along the way, or I might just run into some lazy eval/actor/async issue that I am not aware of. Thanks in advance!
(at the time of writing, https://doc.akka.io/docs/akka/current/stream/stream-quickstart.html has a ready-made ScaleFiddle for quickly playing around with Akka Streams)
Edit:
Clarify "unbound" - what I meant was, that the list of messages is not only unbound, but also the "START" and "END" cycles repeat as well. I have updated the example accordingly.