2
votes

I used graph dsl to create some stream processing jobs based on some example code I saw. Everything runs great, I am just having trouble understanding the notation: (updated for 2.4)

def elements: Source[Foos] = ...
def logEveryNSink = // a sink that logs
def cleaner: Flow[Foos, Bars, Unit] = ...

def boolChecker(bar: Bar)(implicit ex: ExecutionContext): Future[Boolean] = ...

val mySink = Sink.foreach[Boolean](println(_))

val lastly = Flow[Bars].mapAsync(2)(x => boolChecker(x).toMat(mySink)(Keep.right)

val materialized = RunnableGraph.fromGraph(
 GraphDSL.create(lastly) { implicit builder =>
  baz => {
   import GraphDSL.Implicits._
   val broadcast1 = builder.add(Broadcast[Foos](2))
   val broadcast2 = builder.add(Broadcast[Bars](2))
   elements ~> broadcast1 ~> logEveryNSink(1)
               broadcast1 ~> cleaner ~> broadcast2 ~> baz
                                     ~> broadcast2 ~> logEveryNSink(1)
   ClosedShape
 }
}
).run()

I understand the implicit builder that is included, but Im uncertain what the baz represents in { implicit builder => baz => { .... is it just an implicit name for the entire shape?

1

1 Answers

7
votes

The GraphDSL.create method is heavily overloaded to take in many variants of amounts of input shapes (including 0). If you pass in no initial shapes, then the signature of the buildBlock function arg (the body where you actually define how the graph is to be built) is as follows:

(Builder[NotUsed]) => S

So this is simply a Function1[Builder[NotUsed], S], that is, a function that takes an instance of a Builder[NotUsed] and returns a Shape instance which is the final graph. The NotUsed here is synonymous with Unit in that you are saying that by not passing in any input shares that you do not care about the materialized value of the output graph being produced.

If you do decide to pass in input shapes, then the signature of that buildBlock function changes a bit to accomadate the input shapes. In your case, you are passing in 1 input shape, so the signature of buildBlock changes to:

(Builder[Mat]) => Graph.Shape => S

Now, this is essentially a Function1[Builder[Mat], Function1[Graph.Shape, S]], or a function that takes a Builder[Mat] (where Mat is the materialized value type of the input shape) and returns a function that takes a Graph.Shape and returns an instance of S (which is a Shape).

Long story short, if you pass in shapes, then you also need to declare them as bound params on the graph building block function but as a second input function (hence the additional =>).