0
votes

how can i frame Flow<ByteString, ByteString, NotUsed> by size? All examples I have found assumes that there is some delimiter, which is not my case, I just need to frame by length / size.

2

2 Answers

2
votes

Framing via Framing.delimiter does require a designated delimiter, and there doesn't seem to be any built-in stream operator that does framing simply by a fixed chunk size. One of the challenges in coming up with a custom framing/chunking solution is to properly handle the last chunk of elements.

One solution would be to assemble a custom GraphStage like the "chunking" example illustrated in the Akka Stream-cookbook:

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, Inlet, Outlet, FlowShape}
import akka.util.ByteString

class Chunking(val chunkSize: Int) extends GraphStage[FlowShape[ByteString, ByteString]] {
  val in = Inlet[ByteString]("Chunking.in")
  val out = Outlet[ByteString]("Chunking.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private var buffer = ByteString.empty

    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        val elem = grab(in)
        buffer ++= elem
        emitChunk()
      }
      override def onUpstreamFinish(): Unit = {
        if (buffer.isEmpty)
          completeStage()
        else {
          if (isAvailable(out)) emitChunk()
        }
      }
    })

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        if (isClosed(in)) emitChunk()
        else pull(in)
      }
    })

    private def emitChunk(): Unit = {
      if (buffer.isEmpty) {
        if (isClosed(in)) completeStage() else pull(in)
      }
      else {
        val (chunk, nextBuffer) = buffer.splitAt(chunkSize)
        buffer = nextBuffer
        push(out, chunk)
      }
    }

  }
}

Note that emitChunk() handles the fixed-size chunking and onUpstreamFinish() is necessary for processing the last chunk of elements in the internal buffer.

Test-running with a sample text file "/path/to/file" which has content as below:

Millions of people worldwide are in for a disastrous future of hunger, drought and disease, according to a draft report from the United Nations' Intergovernmental Panel on Climate Change, which was leaked to the media this week.

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import java.nio.file.Paths

implicit val system = ActorSystem("system")
implicit val executionContext = system.dispatcher

val chunkSize = 32

FileIO.fromPath(Paths.get("/path/to/file")).
  via(new Chunking(chunkSize)).
  map(_.utf8String).
  runWith(Sink.seq)

// res1: scala.concurrent.Future[Seq[String]] = Future(Success(Vector(
//   "Millions of people worldwide are",
//   " in for a disastrous future of h",
//   "unger, drought and disease, acco",
//   "rding to a draft report from the",
//   " United Nations' Intergovernment",
//   "al Panel on Climate Change, whic",
//   "h was leaked to the media this w",
//   "eek."
// )))
0
votes

Something like (in Scala, disclaimer: only mentally compiled) this, using statefulMapConcat, which allows

  • emitting zero or more frames per input element
  • maintaining state from element to element of what's yet to be emitted
val frameSize: Int = ???

require(frameSize > 0, "frame size must be positive")
Flow[ByteString].statefulMapConcat { () =>
  var carry: ByteString = ByteString.empty

  { in =>
    val len = carry.length + in.length

    if (len < frameSize) {
      // append to carry and emit nothing
      carry = carry ++ in
      Nil
    } else if (len == frameSize) {
      if (carry.nonEmpty) {
        carry = ByteString.empty
        List(carry ++ in)
      } else List(in)
    } else {
      if (carry.isEmpty) {
        val frames = len / frameSize
        val (emit, suffix) = in.splitAt(frames * frameSize)
        carry = suffix
        emit.grouped(frameSize)
      } else {
        val (appendToCarry, inn) = in.splitAt(frameSize - carry.length)
        val first = carry ++ appendToCarry
        val frames = inn.length / frameSize

        if (frames > 0) {
          val (emit, suffix) = inn.splitAt(frames * frameSize)
          carry = suffix
          Iterator.single(first) ++ emit.grouped(frameSize)
        } else {
          carry = inn
          List(first)
        }
      }
    }
  }

If in Java, note that carry ++ in can be expressed as carry.concat(in). It may be useful, in order to get around the restriction in Java around closing over non-final variables, to use a 1-element ByteString[] (e.g. ByteString[] carry = { ByteString.empty }).