0
votes

Let's assume I have an Enumerator[Array[Byte]] producing chunks of byte arrays of variable size (say POST body accepted in controller)

And I know this streams actually contains packets of data where each packet is:

  • 4 bytes representing integer denoting the size of the packet body size in bytes
  • packet body of that size

Each packet can have different body size.

How do we implement an Enumeratee that transforms initial stream into stream of byte arrays where each array is a packet body.

Simplified example with ints (first comes int denoting packet body size):

List(1, 2, 3), List(4, 5), List(6), List(2, 8), List(9) -> List(2) List(4, 5 ,6), List(8, 9)

1
I was able to come up with enumeratee that regroups stream into fixed size cunks, but that is not enough. gist.github.com/chernetsov/404409350b0011bb4de7 - Michael Chernetsov

1 Answers

0
votes

I met the exact same problem when trying to parse JPG header from an HTTP Response stream.

First, you need to transform your flow to Bytes instead of Chunks, using Enumerator.mapConcat. Then you must group your bytes by group depending on the first bytes. To do this, there is the function Enumeratee.grouped which takes an Iteratee[From, To] as argument, and repeatedly use it to treat the incoming flow of From and transform it in a flow of To.

This is what I came up with:

val takeSegment: Iteratee[Byte, List[Byte]] = for {
    //find segment length (the first four bytes of the segment, assumed unsigned and bigEndian)
    size <- Enumeratee.take(4) &>> Iteratee.fold[Byte, Int](0){ 
              case (acc, b) => acc * 256 + (b & 0xFF)
            }
    //fetch rest of segment
    rest <- Enumeratee.take(size) &>> Iteratee.getChunks[Byte]
    } yield rest

val segmentStream: Enumerator[List[Byte]] = 
                    inputStream &> 
                    // mapConcat takes an argument of type From => Seq[To]
                    Enumeratee.mapConcat[Array[Byte], Byte](bytes => bytes) &>
                    Enumeratee.grouped[Byte, List[Byte]](takeSegment)

Of course, if your integers are encoded differently (I assumed unsigned bigEndian, but you may have a different case), you need to change the size line of the for comprehension accordingly.