0
votes

I have started using Akka Streams and Op-Rabbit and am a bit confused.

I need to split the stream based on a predicate and then combine them much like I have done when creating graphs and using the Partition and Merge.

I have been able to do things like this using the GraphDSL.Builder, but can't seem to get it to work with AckedSource/Flow/Sink

the graph would look like:

                        | --> flow1 --> |
source--> partition --> |               | --> flow3 --> sink
                        | --> flow2 --> |

I'm not sure if splitWhen is what I should use because I always need exactly 2 flows.

This is a sample that does not do the partitioning and does not use the GraphDSL.Builder:

def splitExample(source: AckedSource[String, SubscriptionRef],
                 queueName: String)
                (implicit actorSystem: ActorSystem): RunnableGraph[SubscriptionRef] = {
  val toStringFlow: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]]
    .map[AckTup[String]](tup => {
      val (p,m) = tup
      (p, new String(m.data))
    })

  val printFlow1: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
    .map[AckTup[String]](tup => {
      val (p, s) = tup
      println(s"flow1 processing $s")
      tup
     })

  val printFlow2: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
    .map[AckTup[String]](tup => {
      val (p, s) = tup
      println(s"flow2 processing $s")
      tup
    })

  source
    .map(Message.queue(_, queueName))
    .via(AckedFlow(toStringFlow))
    // partition if string.length < 10
    .via(AckedFlow(printFlow1))
    .via(AckedFlow(printFlow2))
    .to(AckedSink.ack)
}

This is the code that I can't seem to get working:

import GraphDSL.Implicits._
def buildModelAcked(source: AckedSource[String, SubscriptionRef] , queueName: String)(implicit actorSystem: ActorSystem):  Graph[ClosedShape, Future[Done]] = {
    import GraphDSL.Implicits._
    GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
    import GraphDSL.Implicits._
    source.map(Message.queue(_, queueName)) ~> AckedFlow(toStringFlow) ~> AckedSink.ack
//      source.map(Message.queue(_, queueName)).via(AckedFlow(toStringFlow)).to(AckedSink.ack)
    ClosedShape

}}

The compiler can't resolve the ~> operator

So my questions are:

  1. Is there an example project that uses the scala dsl to build graphs of Acked/Source/Flow/Sink?

  2. Is there an example project that partitions and merges that is similar to what I am trying to do here?

2
Note that in order to get ~> and other fancy operators, you need to import GraphDSL.Implicits._ inside the body of the DSL building function.Vladimir Matveev
Thanks, I have updated the code to have the import, but alas it still does not resolve the ` ~> ` operatorDoug Anderson

2 Answers

1
votes

Keep in mind the following definitions when dealing the acked-stream.

  1. AckedSource[Out, Mat] is a wrapper for Source[AckTup[Out], Mat]]
  2. AckedFlow[In, Out, Mat] is a wrapper for Flow[AckTup[In], AckTup[Out], Mat]
  3. AckedSink[In, Mat] is a wrapper for Sink[AckTup[In], Mat]
  4. AckTup[T] is an alias for (Promise[Unit], T)
  5. the classic flow combinators will operate on the T part of the AckTup
  6. the .acked combinator will complete the Promise[Unit] of an AckedFlow

The GraphDSL edge operator (~>) will work against a bunch of Akka predefined shapes (see the code for GraphDSL.Implicits), but it won't work against custom shapes defined by the acked-stream lib.

You got 2 ways out:

  1. you define your own ~> implicit operator, along the lines of the ones in GraphDSL.Implicits
  2. you unwrap the acked stages to obtain standard stages. You are able to access the wrapped stage using .wrappedRepr - available on AckedSource, AckedFlow and AckedSink.
0
votes

Based on Stefano Bonetti's excellent direction, here is a possible solution:

graph:    
                        |--> short --|
  rabbitMq --> before --|            |--> after
                        |--> long  --|

solution:

val before: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]].map[AckTup[String]](tup => {
  val (p,m) = tup
  (p, new String(m.data))
})

val short: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
  val (p, s) = tup
  println(s"short: $s")
  tup
})
val long: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
  val (p, s) = tup
  println(s"long: $s")
  tup
})
val after: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
  val (p, s) = tup
  println(s"all $s")
  tup
})

def buildSplitGraph(source: AckedSource[String, SubscriptionRef]
                    , queueName: String
                    , splitLength: Int)(implicit actorSystem: ActorSystem):  Graph[ClosedShape, Future[Done]] = {
 GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
   val toShort = 0
   val toLong = 1

   // junctions
   val split = builder.add(Partition[AckTup[String]](2, (tup: AckTup[String]) => {
                                                           val (p, s) = tup
                                                           if (s.length < splitLength) toShort else toLong
                                                         }
   ))
   val merge = builder.add(Merge[AckTup[String]](2))

   //graph
   val beforeSplit = source.map(Message.queue(_, queueName)).wrappedRepr ~> AckedFlow(before).wrappedRepr
   beforeSplit ~> split
   // must do short, then long since the split goes in that order
   split ~> AckedFlow(short).wrappedRepr ~> merge
   split ~> AckedFlow(long).wrappedRepr ~> merge
   // after the last AckedFlow, be sure to '.acked' so that the message will be removed from the queue
   merge ~> AckedFlow(after).acked ~> s

  ClosedShape
}}

As Stefano Bonetti said, the key was to use the .wrappedRepr associated with the AckedFlow and then to use the .acked combinator as the last step.