2
votes

Surprise surprise I'm having a few problems with Iteratees and Error handling.

The problem;

Read some bytes from an InputStream (from the network, must be InputStream), do some chunking/grouing on this InputStream (for work distribution), followed by a transformation to turn this into a case class DataBlock(blockNum: Int, data: ByteString) for sending to actors (Array[Bytes] converted to a CompactByteString).

The flow;

InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors

The code;

class IterateeTest {
  val actor: ActorRef = myDataBlockRxActor(...)
  val is = new IntputStream(fromBytes...)
  val source = Enumerator.fromStream(is)
  val chunker = Traversable.takeUpTo[Array[Byte]](1000)
  val transform:Iteratee[Array[Byte], Int] = Iteratee.fold[Array[Byte],Int](0) {
    (bNum, bytes) => DataBlock(bNum, CompactByteString(bytes)); bNum + 1
  }
  val fut = source &> chunker |>> transform
}

case class DataBlock(blockNum: Int, data: CompactByteString)

The question;

My current Iteratee code works well. However, I want to be able to handle failures on either side;

  1. When the InputStream read method fails - I want to know how many bytes/block have been processed successfully and resume reading the stream from that point. When read in the Enumerator throws an error, the fut just returns the exception, there is no state, so I dont know which block I am up to unless I pass it to the rxing actor (which I dont want to do)
  2. If the output side fails or can no longer receive DataBlock messages because the Actor's buffer is full hold reading from the input stream

How should I do this?

How could I/would I be better of trying this using reactive-streams/Akka-stream (experimental) or scalaz iteratees over Play's iteratees because I need defined error handling?

1
Is there a way to do #1 without using a shared var - NightWolf

1 Answers

2
votes

(1) can be implemented with an Enumeratee.

val (counter, getCount) = {
  var count = 0
  (Enumeratee.map { x => count += 1; x },
   () => count)
}
val fut = source &> counter &> chunker |>> transform

You can then use getCount inside a recover or such on fut.

You get (2) for free with Play Iteratees. No further reads will happen until the Iteratee is ready for more data, and if it fails no more reads will occur. The InputStream is automatically closed when the Enumerator is done, whether from failure or normal termination.