I have started using Akka Streams and Op-Rabbit and am a bit confused.
I need to split the stream based on a predicate and then combine them much like I have done when creating graphs and using the Partition and Merge.
I have been able to do things like this using the GraphDSL.Builder, but can't seem to get it to work with AckedSource/Flow/Sink
the graph would look like:
| --> flow1 --> |
source--> partition --> | | --> flow3 --> sink
| --> flow2 --> |
I'm not sure if splitWhen is what I should use because I always need exactly 2 flows.
This is a sample that does not do the partitioning and does not use the GraphDSL.Builder:
def splitExample(source: AckedSource[String, SubscriptionRef],
queueName: String)
(implicit actorSystem: ActorSystem): RunnableGraph[SubscriptionRef] = {
val toStringFlow: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]]
.map[AckTup[String]](tup => {
val (p,m) = tup
(p, new String(m.data))
})
val printFlow1: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
.map[AckTup[String]](tup => {
val (p, s) = tup
println(s"flow1 processing $s")
tup
})
val printFlow2: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
.map[AckTup[String]](tup => {
val (p, s) = tup
println(s"flow2 processing $s")
tup
})
source
.map(Message.queue(_, queueName))
.via(AckedFlow(toStringFlow))
// partition if string.length < 10
.via(AckedFlow(printFlow1))
.via(AckedFlow(printFlow2))
.to(AckedSink.ack)
}
This is the code that I can't seem to get working:
import GraphDSL.Implicits._
def buildModelAcked(source: AckedSource[String, SubscriptionRef] , queueName: String)(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = {
import GraphDSL.Implicits._
GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
import GraphDSL.Implicits._
source.map(Message.queue(_, queueName)) ~> AckedFlow(toStringFlow) ~> AckedSink.ack
// source.map(Message.queue(_, queueName)).via(AckedFlow(toStringFlow)).to(AckedSink.ack)
ClosedShape
}}
The compiler can't resolve the ~>
operator
So my questions are:
Is there an example project that uses the scala dsl to build graphs of Acked/Source/Flow/Sink?
Is there an example project that partitions and merges that is similar to what I am trying to do here?
~>
and other fancy operators, you need to importGraphDSL.Implicits._
inside the body of the DSL building function. – Vladimir Matveev