how can i frame Flow<ByteString, ByteString, NotUsed> by size? All examples I have found assumes that there is some delimiter, which is not my case, I just need to frame by length / size.
2 Answers
Framing via Framing.delimiter does require a designated delimiter, and there doesn't seem to be any built-in stream operator that does framing simply by a fixed chunk size. One of the challenges in coming up with a custom framing/chunking solution is to properly handle the last chunk of elements.
One solution would be to assemble a custom GraphStage like the "chunking" example illustrated in the Akka Stream-cookbook:
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, Inlet, Outlet, FlowShape}
import akka.util.ByteString
class Chunking(val chunkSize: Int) extends GraphStage[FlowShape[ByteString, ByteString]] {
val in = Inlet[ByteString]("Chunking.in")
val out = Outlet[ByteString]("Chunking.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var buffer = ByteString.empty
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
buffer ++= elem
emitChunk()
}
override def onUpstreamFinish(): Unit = {
if (buffer.isEmpty)
completeStage()
else {
if (isAvailable(out)) emitChunk()
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (isClosed(in)) emitChunk()
else pull(in)
}
})
private def emitChunk(): Unit = {
if (buffer.isEmpty) {
if (isClosed(in)) completeStage() else pull(in)
}
else {
val (chunk, nextBuffer) = buffer.splitAt(chunkSize)
buffer = nextBuffer
push(out, chunk)
}
}
}
}
Note that emitChunk() handles the fixed-size chunking and onUpstreamFinish() is necessary for processing the last chunk of elements in the internal buffer.
Test-running with a sample text file "/path/to/file" which has content as below:
Millions of people worldwide are in for a disastrous future of hunger, drought and disease, according to a draft report from the United Nations' Intergovernmental Panel on Climate Change, which was leaked to the media this week.
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import java.nio.file.Paths
implicit val system = ActorSystem("system")
implicit val executionContext = system.dispatcher
val chunkSize = 32
FileIO.fromPath(Paths.get("/path/to/file")).
via(new Chunking(chunkSize)).
map(_.utf8String).
runWith(Sink.seq)
// res1: scala.concurrent.Future[Seq[String]] = Future(Success(Vector(
// "Millions of people worldwide are",
// " in for a disastrous future of h",
// "unger, drought and disease, acco",
// "rding to a draft report from the",
// " United Nations' Intergovernment",
// "al Panel on Climate Change, whic",
// "h was leaked to the media this w",
// "eek."
// )))
Something like (in Scala, disclaimer: only mentally compiled) this, using statefulMapConcat, which allows
- emitting zero or more frames per input element
- maintaining state from element to element of what's yet to be emitted
val frameSize: Int = ???
require(frameSize > 0, "frame size must be positive")
Flow[ByteString].statefulMapConcat { () =>
var carry: ByteString = ByteString.empty
{ in =>
val len = carry.length + in.length
if (len < frameSize) {
// append to carry and emit nothing
carry = carry ++ in
Nil
} else if (len == frameSize) {
if (carry.nonEmpty) {
carry = ByteString.empty
List(carry ++ in)
} else List(in)
} else {
if (carry.isEmpty) {
val frames = len / frameSize
val (emit, suffix) = in.splitAt(frames * frameSize)
carry = suffix
emit.grouped(frameSize)
} else {
val (appendToCarry, inn) = in.splitAt(frameSize - carry.length)
val first = carry ++ appendToCarry
val frames = inn.length / frameSize
if (frames > 0) {
val (emit, suffix) = inn.splitAt(frames * frameSize)
carry = suffix
Iterator.single(first) ++ emit.grouped(frameSize)
} else {
carry = inn
List(first)
}
}
}
}
If in Java, note that carry ++ in can be expressed as carry.concat(in). It may be useful, in order to get around the restriction in Java around closing over non-final variables, to use a 1-element ByteString[] (e.g. ByteString[] carry = { ByteString.empty }).