3

I'm trying to implement an infinite stream with a filter operation. I want to make it not crash with a stack overflow error by using lazy evaluation for the tail.

abstract class MyStream[+A] { def head: A def tail: MyStream[A] def #::[B >: A](element: B): MyStream[B] // prepend operator def filter(predicate: A => Boolean): MyStream[A] } class FiniteStream[+A](val head: A, val tail: MyStream[A]) extends MyStream[A] { override def #::[B >: A](element: B): MyStream[B] = new FiniteStream[B](element, this) override def filter(predicate: A => Boolean): MyStream[A] = { lazy val filteredTail = tail.filter(predicate) if (predicate(head)) filteredTail else filteredTail } } class InfiniteStream[+A](override val head: A, generator: A => A) extends MyStream[A] { override def tail: MyStream[A] = { lazy val tail = new InfiniteStream[A](generator(head), generator) tail } override def #::[B >: A](element: B): MyStream[B] = new FiniteStream[B](element, this) override def filter(predicate: A => Boolean): MyStream[A] = { lazy val filteredTail = tail.filter(predicate) if (predicate(head)) head #:: filteredTail else filteredTail } } object MyStream { def from[A](start: A)(generator: A => A): MyStream[A] = new InfiniteStream[A](start, generator) } val stream: MyStream[Int] = MyStream.from(1)((n: Int) => n + 1) val filtered = stream.filter(_ % 2 == 0) 

But this program does indeed crash with a stack overflow error. It seems like my lazy evaluation strategy does not work. The tail is still being evaluated. Why?

2
  • 1
    FiniteStream? Where/what is that? Commented Apr 12, 2019 at 10:49
  • Sorry, forgot to include it. It's there now Commented Apr 12, 2019 at 10:55

1 Answer 1

6

The problem is caused by InfiniteStream.filter, it creates the tail filter as a lazy value but then accesses it immediately which forces the value to be evaluated. This causes the whole stream to be evaluated as recursive calls blowing up the stack.

A lazy val delays the execution of the expression used to construct a variable until it is accessed. So you need to delay access to tail.filter(predicate) until the user of the stream accesses the tail.

The easiest and more functional way to achieve this would be to implement filter with a view. That is filter returns a new stream that only filters the tail on demand.

EG

class FilterStream[+A] private (predicate: predicate: A => Boolean, stream: MyStream) extends MyStream[A] { override def head: A = stream.head override def tail: MyStream[A] = FilterStream.dropWhile(!predicate(_), stream) } object FilterStream { def apply[A](predicate: predicate: A => Boolean, stream: MyStream[A]): MyStream[A] = { new FilterStream(predicate, dropWhile(!predicate(_), stream)) } @tailrec def dropWhile[A](predicate: predicate: A => Boolean, stream: MyStream[A]): MyStream[A] = { if (stream.isEmpty || predicate(stream.head)) stream else dropWhile(predicate, stream.tail) } } 

Finally you should consider implementing an empty stream with its own type and object for many reasons but also so that you can terminate an infinite stream if your generator decides it wants to.

object Nil extends MyStream[Nothing] { override def head: A = throw NoSuchElement override def tail: MyStream[A] = throw NoSuchElement } 

Head and tail are always unsafe methods, another improvement would be to use case classes to expose the shape of your stream, then users would pattern match on the stream. This would protect your users from having to use unsafe methods like head and tail.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you for your answer. I believe you have an unbalanced parenthesis in the else clause of the dropWhile method.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.