8
votes

I'm trying to integrate an akka streams based flow in to my Play 2.5 app. The idea is that you can stream in a photo, then have it written to disk as the raw file, a thumbnailed version and a watermarked version.

I managed to get this working using a graph something like this:

val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
                                    .map(_.result().toArray)

def toByteArray = Flow[ByteString].map(b => b.toArray)

val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
  import GraphDSL.Implicits._
  val streamFan = builder.add(Broadcast[ByteString](3))
  val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
  val output = builder.add(Flow[ByteString].map(x => Success(Done)))

  val rawFileSink = FileIO.toFile(file)
  val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
  val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))

  streamFan.out(0) ~> rawFileSink
  streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
  streamFan.out(2) ~> output.in

  byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
  byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink

  FlowShape(streamFan.in, output.out)
})

graph

}

Then I wire it in to my play controller using an accumulator like this:

val sink = Sink.head[Try[Done]]

val photoStorageParser = BodyParser { req =>
     Accumulator(sink).through(graph).map(Right.apply)
}

The problem is that my two processed file sinks aren't completing and I'm getting zero sizes for both processed files, but not the raw one. My theory is that the accumulator is only waiting on one of the outputs of my fan out, so when the input stream completes and my byteAccumulator spits out the complete file, by the time the processing is finished play has got the materialized value from the output.

So, my questions are:
Am I on the right track with this as far as my approach goes? What is the expected behaviour for running a graph like this? How can I bring all my sinks together to form one final sink?

1
I also think that the reason is that the flows are not merged after the processing. Have you tried Sink.combine (doc.akka.io/docs/akka/2.4.4/scala/stream/…)?devkat
Yeah, I gave Sink.combine a go, but that unifies multiple sinks to send to like a fan out. I think I'm looking for a fan in, but it seems you can't do that with sinks only sources!Tompey
This seems to be a similar example: doc.akka.io/docs/akka/2.4.4/scala/stream/…. Maybe you have to return a SinkShape instead of a FlowShape to declare that your stream is finished?devkat
I've a use case where I download .gz files and then extract them. Of course, each download should go to a separate file, named after the downloaded filename. Any suggestions how can I write to a Sink the name of which is dependent on the input and then extract the files? As usual, Akka Streams documentation provides no examples of combine.Abhijit Sarkar

1 Answers

8
votes

Ok, after a little help (Andreas was on the right track), I've arrived at this solution which does the trick:

val rawFileSink = FileIO.toFile(file)
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))

val graph = Sink.fromGraph(GraphDSL.create(rawFileSink, thumbnailFileSink, watermarkedFileSink)((_, _, _)) {
  implicit builder => (rawSink, thumbSink, waterSink) => {
    val streamFan = builder.add(Broadcast[ByteString](2))
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))

    streamFan.out(0) ~> rawSink
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in

    byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink
    byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink

    SinkShape(streamFan.in)
  }
})

graph.mapMaterializedValue[Future[Try[Done]]](fs => Future.sequence(Seq(fs._1, fs._2, fs._3)).map(f => Success(Done)))

After which it's dead easy to call this from Play:

val photoStorageParser = BodyParser { req =>
  Accumulator(theSink).map(Right.apply)
}

def createImage(path: String) = Action(photoStorageParser) { req =>
  Created
}