3
votes

I'm trying to implement an infinite stream with a filter operation. I want to make it not crash with a stack overflow error by using lazy evaluation for the tail.

abstract class MyStream[+A] {
  def head: A
  def tail: MyStream[A]

  def #::[B >: A](element: B): MyStream[B] // prepend operator

  def filter(predicate: A => Boolean): MyStream[A]
}

class FiniteStream[+A](val head: A, val tail: MyStream[A]) extends MyStream[A] {    
  override def #::[B >: A](element: B): MyStream[B] = new FiniteStream[B](element, this)

  override def filter(predicate: A => Boolean): MyStream[A] = {
    lazy val filteredTail = tail.filter(predicate)
    if (predicate(head)) filteredTail
    else filteredTail
  }
}

class InfiniteStream[+A](override val head: A, generator: A => A) extends MyStream[A] {
  override def tail: MyStream[A] = {
    lazy val tail = new InfiniteStream[A](generator(head), generator)
    tail
  }

  override def #::[B >: A](element: B): MyStream[B] =
    new FiniteStream[B](element, this)

  override def filter(predicate: A => Boolean): MyStream[A] = {
    lazy val filteredTail = tail.filter(predicate)
    if (predicate(head)) head #:: filteredTail
    else filteredTail
  }
}

object MyStream {
    def from[A](start: A)(generator: A => A): MyStream[A] = new InfiniteStream[A](start, generator)
}

val stream: MyStream[Int] = MyStream.from(1)((n: Int) => n + 1)
val filtered = stream.filter(_ % 2 == 0)

But this program does indeed crash with a stack overflow error. It seems like my lazy evaluation strategy does not work. The tail is still being evaluated. Why?

1
FiniteStream? Where/what is that? - jwvh
Sorry, forgot to include it. It's there now - Sahand

1 Answers

6
votes

The problem is caused by InfiniteStream.filter, it creates the tail filter as a lazy value but then accesses it immediately which forces the value to be evaluated. This causes the whole stream to be evaluated as recursive calls blowing up the stack.

A lazy val delays the execution of the expression used to construct a variable until it is accessed. So you need to delay access to tail.filter(predicate) until the user of the stream accesses the tail.

The easiest and more functional way to achieve this would be to implement filter with a view. That is filter returns a new stream that only filters the tail on demand.

EG

class FilterStream[+A] private (predicate: predicate: A => Boolean, stream: MyStream) extends MyStream[A] {
  override def head: A = stream.head
  override def tail: MyStream[A] = FilterStream.dropWhile(!predicate(_), stream)
}


object FilterStream {
  def apply[A](predicate: predicate: A => Boolean, stream: MyStream[A]): MyStream[A] = {
    new FilterStream(predicate, dropWhile(!predicate(_), stream))
  }

  @tailrec
  def dropWhile[A](predicate: predicate: A => Boolean, stream: MyStream[A]): MyStream[A] = {
    if (stream.isEmpty || predicate(stream.head)) stream
    else dropWhile(predicate, stream.tail)
  }
}

Finally you should consider implementing an empty stream with its own type and object for many reasons but also so that you can terminate an infinite stream if your generator decides it wants to.

object Nil extends MyStream[Nothing] {
  override def head: A = throw NoSuchElement
  override def tail: MyStream[A] = throw NoSuchElement
}

Head and tail are always unsafe methods, another improvement would be to use case classes to expose the shape of your stream, then users would pattern match on the stream. This would protect your users from having to use unsafe methods like head and tail.