Tuesday, November 19, 2013

Building an Iteratee, a step at a time.

After my excellent experience with the Cooking with for Comprehension, I’m back for the next logical step, Iteratees. The previous work focused on how for-comprehension works when composing generators. In a comment from pagoda_5b mentioned that I have implemented a Reader Monad. It makes sense since the same input is woven through all generator, each one generating a different output.

While I would love to continue using the cooking sample, I could not find a easy way to extend this and still make sense. So we will start our first step with a generalized version of the reader monad of the previous post. Since we are going to extend this monad to create our iteratee, I'll name the reader monad as a Consumer. Below we have a reader monad that takes a single input of type I and output a value of type T.  
trait Consumer[I, T] {
    self =>
    def consume(input: I): T

    def flatMap[S](f: T => Consumer[I, S]): Consumer[I, S] = {
      new Consumer[I, S] {
        def consume(input: I): S = f(self.consume(input)).consume(input)
      }
    }

    def map[S](f: T => S): Consumer[I, S] = {
      new Consumer[I, S] {
        def consume(input: I): S = f(self.consume(input))
      }
    }
  }

Step 1 - Basic structure


In order to move from our reader monad to an iteratee we need a good use case. Reading a message is a good example. Let’s assume we have a message composed of several lines, like the one below:
# Header
. Body
! Trailer

Our reader monad will give the same input to all generators. While we could make each once receive the entire message and read only the part it wants, this makes the handling of irrelevant lines my problem. I want to read a line at a time and have each generator read its line then pass control to the next one. I’d also like to express this via for comprehensions, as shown below:
  val readMessage = for {
    head <- readHeader
    body <- readBody
    tail <- readTrailer
  } yield (head, body, tail)

At each step I want to feed a line to my consumer and get back the consumer that will handle the next line. As a first step (step 1 on github), a consumer must be able to tell when it’s done or to pass control to the next consumer down the line. We create a ConsumerState with Done and Continue to represent these two possible outcomes:
  sealed trait ConsumerState[I, T]

  case class Done[I, T](value: T) extends ConsumerState[I, T]

  case class Continue[I, T](next: Consumer[I, T]) extends ConsumerState[I, T]

In our initial case each consumer looks at one line. We start with a very accepting consumer that only strips the line prefix and is done:
  val readHeader, readBody, readTrailer = new Consumer[String, String] {
    def consume(input: String): ConsumerState[String, String] = Done(input.substring(2))
  }

For the this initial step, all the readers have the same behavior.

Now how do we link them together? Again flatMap and map come to the rescue. In this case we have to handle the different consumer state.
    // Our consume function
    def consume(input: I): ConsumerState[I, T]

    def flatMap[S](f: T => Consumer[I, S]): Consumer[I, S] = {
      new Consumer[I, S] {
        def consume(input: I): ConsumerState[I, S] = self.consume(input) match {
          case Done(value) => Continue(f(value))
          case Continue(nextCook) => Continue(nextCook flatMap f)
        }
      }
    }

    def map[S](f: T => S): Consumer[I, S] = {
      new Consumer[I, S] {
        def consume(input: I): ConsumerState[I, S] = self.consume(input) match {
          case Done(value) => Done(f(value))
          case Continue(nextCook) => Continue(nextCook map f)
        }
      }
    }

Notice that flatMap goes from a Done to a Continue state. So once one consumer is done, flatMap hands control over to the next consumer. All while aligning the output types, as in the reader monad case.

If we feed the first line of the input to a one of our consumers, we get a Done. However if we feed the first line to the composed consumer we get a Continue:
scala> readHeader.consume(msg(0))
ConsumerState[String,String]: Done(Header)
scala> readMessage.consume(msg(0))
ConsumerState[String,(String, String, String)]: Continue(iteratee.IterateeStep1$Consumer$$anon$4@1ac0bc3a)

So we now need something that loops through the input feeding each line to the next consumer. We do this with the following function:
    def consumeAll(inputs: List[I]): Option[T] = {
      self.consume(inputs.head) match {
        case Done(value) => Some(value)
        case Continue(next) => next.consumeAll(inputs.tail)
      }
    }

And using this function we can read the entire message with a single command, and get the weaving of all the consumers:
scala> readMessage.consumeAll(msg)
Option[(String, String, String)]: Some((Header,Body,Trailer))

But our consumer is really not picky, and will accept anything as an input:
scala> readMessage.consumeAll(msg.reverse)
Option[(String, String, String)]: Some((Trailer,Body,Header))

Step 2: Handling errors

This is clearly not what we want. So we move to the next step (see step 2 on github). We add the ability to handle errors. We do this by adding an Error state to our consumer state. We use Throwable to get the line in which the error occurred in an output message.
  sealed trait ConsumerState[I, T]

  case class Done[I, T](value: T) extends ConsumerState[I, T]

  case class Continue[I, T](next: Consumer[I, T]) extends ConsumerState[I, T]

  case class Error[I, T](error: Throwable) extends ConsumerState[I, T]


This new state is simply passed on in the map and flatMap functions.

    def flatMap[S](f: T => Consumer[I, S])(implicit manifest: Manifest[S]): Consumer[I, S] = {
      new Consumer[I, S] {
        def consume(guestCount: I): ConsumerState[I, S] = self.consume(guestCount) match {
          case Error(e) => Error(e)
          case Done(value) => Continue(f(value))
          case Continue(nextCook) => Continue(nextCook flatMap f)
        }
      }
    }

    def map[S](f: T => S)(implicit manifest: Manifest[S]): Consumer[I, S] = {
      new Consumer[I, S] {
        def consume(guestCount: I): ConsumerState[I, S] = self.consume(guestCount) match {
          case Done(value) => Done(f(value))
          case Error(e) => Error(e)
          case Continue(nextCook) => Continue(nextCook map f)
        }
      }
    }

And our consumer can now validate our input as shown below  (see step 2 on github for the all the readers):
  val readHeader = new Consumer[String, String] {
    def consume(input: String): ConsumerState[String, String] =
      if (input.startsWith("# ")) Done(input.substring(2))
      else Error(new Exception("Not header line"))
  }

The consumeAll has to be adjusted to handle the possible error outcome. In this case we use Scala’s Either. Left being the error state, and Right being a good computation. 
    def consumeAll(inputs: List[I]): Either[Throwable, T] = {
      self.consume(inputs.head) match {
        case Done(value) => Right(value)
        case Error(error) => Left(error)
        case Continue(next) => next.consumeAll(inputs.tail)
      }
    }

Our message reader now requires the correct content. So feeding a good input produces a Right answer:
scala> readHeader.consume(msg(0))
iteratee.IterateeStep2.ConsumerState[String,String]: Done(Header)
//Read full message
scala> readMessage.consumeAll(msg)
Either[Throwable,(String, String, String)]: Right((Header,Body,Trailer))

While feeding an incorrect message will produce a Left with the error messages:
scala> readHeader.consume(msg(1))
iteratee.IterateeStep2.ConsumerState[String,String]: Error(java.lang.Exception: Not header line)

This is pretty cool already.

Step 3: Repeating parts

The next step (see step in github) is to handle multiple body lines. For example the message:
# Header
. Body 1
. Body 2
. Body 3
! Trailer

To read this we need to change our body consumer to handle and accumulate multiple lines. We must accumulate the results and we read them and produce the accumulated data as output. This is a way to do this:
  val readBody = recursiveBody(Nil)

  def recursiveBody(accumulated: List[String]): Consumer[String, List[String]] = new Consumer[String, List[String]] {
    def consume(input: String): ConsumerState[String, List[String]] =
      if (input.startsWith(". ")) Continue(recursiveBody(input.substring(2) :: accumulated))
      else Done(accumulated)
  }

When we run this with consumeAll from above, we get an exception NoSuchElementException. This is cause by consuming the entire input list. What is happening is that when recursiveBody fails to match a body line, it consumes the input. The next call to consumeAll gets an empty list. Our iteratee is losing the trailer line.

The first thing to do is make consumeAll handle an empty list. This version will produce an error on an empty list:
    def consumeAll(inputs: List[I]): Either[Throwable, T] = {
      if (inputs.isEmpty) {
        Left(new RuntimeException("Premature end of stream"))
      } else {
        self.consume(inputs.head) match {
          case Done(value) => Right(value)
          case Error(error) => Left(error)
          case Continue(next) => next.consumeAll(inputs.tail)
        }
      }
    }

However we still get a premature end of stream, since we haven't really fixed the issue of consuming the input. The only way around this is having two trailer lines:
// New consumeAll empty list handling
scala> readMessage.consumeAll(message)
Either[Throwable,(String, List[String], String)]: Left(java.lang.RuntimeException: Premature end of stream)
// Workaround adding two trailer
scala> readMessage.consumeAll(message ::: List("! Trailer 2"))
Either[Throwable,(String, List[String], String)]: Right((Header,List(Body 3, Body 2, Body 1),Trailer 2))

Step 4: Fixing repeat discarding unused input

This is clearly not what we want. To fix this, the consumer must be able to indicate it did not consume the input (see step 4 on github). This can be done by allowing Done and Continue to return the unconsumed input. We also introduce an Input trait to wrap the data being passed to the Iteratee. The Empty case indicates that there is no input, or that input was consume by the consumer.  The Chunk case is used to indicate that we have a bit of information to handle, or that was not consumed. We have to adjust the Done and Continue states to return an Input. This could be done with Option, but we will need to expand this soon.
  sealed trait ConsumerState[I, T]

  case class Done[I, T](value: T, remainder: Input[I]) extends ConsumerState[I, T]

  case class Continue[I, T](next: Consumer[I, T], remainder: Input[I]) extends ConsumerState[I, T]

  case class Error[I, T](error: Throwable) extends ConsumerState[I, T]
// New Input types
  sealed trait Input[+I]

  case class Chunk[I](input: I) extends Input[I]

  case object Empty extends Input[Nothing]

We have to change consumeAll since it now has to handled unconsumed input. This is achieved this way:
    def consumeAll(inputs: List[I]): Either[Throwable, T] = {
      if (inputs.isEmpty) {
        Left(new RuntimeException("Premature end of stream"))
      } else {
        self.consume(Chunk(inputs.head)) match {
          case Done(value, _) => Right(value)
          case Error(error) => Left(error)
          case Continue(next, Empty) => next.consumeAll(inputs.tail)
          case Continue(next, Chunk(c)) => next.consumeAll(c :: inputs.tail)
        }
      }
    }

If consumeAll get a Continue with Emtpy remainder, it knows the consumer has used it's input. Otherwise it passes the returned input to next one down the line. This is choice means the consumer can actually alter the input received by the next consumer. This is acceptable here, but we could be more strict.

Changes to map and flatMap are trivial (see step 4 on github). With these changes we can read inputs with no body and with several lines in the body.
scala> readMessage.consumeAll(msg)
Either[Throwable,(String, List[String], String)]: Right((Header,List(Body 1, Body 2, Body 3),Trailer))
//Read a message with not trailer still fails
scala> readMessage.consumeAll(msg.init)
Either[Throwable,(String, List[String], String)]: Left(java.lang.RuntimeException: Premature end of stream)
// Look no body
scala> readMessage.consumeAll(List("# a head", "! a tail"))
Either[Throwable,(String, List[String], String)]: Right((a head,List(),a tail))

The consumers have to handle the empty case, which normally means continuing to itself. This is important because you may see Empty in some cases. This allows us to stall the iteratee by feeding it empty.  I have seen post that this feature can be used for practical reasons, where data is not always available to feed to the iteratee. Here is an example of stalling the iteratee:
    val afterEmpty = readHeader.consume(Empty)
    afterEmpty match {
      case Continue(next, _) =>
        println("After Empty: " + next.consume(Chunk("# The Head")))
      case _ =>
        println("After Empty unexpected: " + afterEmpty)
    }


Step 5: Handling optional parts

One last step on our basic Iteratee is having the ability to handle premature end of input better (see step 5 on github). If our message does not have the all lines required, our current implementation will return:
Left(java.lang.RuntimeException: Premature end of stream)

One reason to do this, it that parts of the message may be optional, and not having them is acceptable. But to be sure these optional parts are not there, we need to inform the iteratee that the input stream is finished. To do this we change our Input trait to include an EOF input value:
  sealed trait Input[+I]

  case class Chunk[I](input: I) extends Input[I]

  case object Empty extends Input[Nothing]

  case object EOF extends Input[Nothing]

We have to change our consumeAll to propagate EOF. The implantation here will push a EOF if the list is empty, and if a Continue returns EOF as a remainder, we will truncate the input allowing an consumer to finish the stream. This change also cleans up the horrible if statement:
    def consumeAll(inputs: List[I]): Either[Throwable, T] = {
      self.consume(if (inputs.isEmpty) EOF else Chunk(inputs.head)) match {
        case Done(value, _) => Right(value)
        case Error(error) => Left(error)
        case Continue(next, EOF) => next.consumeAll(Nil)
        case Continue(next, Empty) => next.consumeAll(inputs.tail)
        case Continue(next, Chunk(c)) => next.consumeAll(c :: inputs.tail)
      }
    }

With this change we can provide an incomplete message and get a meaningful message about missing trailer:
scala> readMessage.consumeAll(msg.init)
Either[Throwable,(String, List[String], String)]: Left(java.lang.Exception: Trailer expected)

The new EOF signal also allows us to handle an optional trailer in the message. To make the trailer optional we change the trailerReader to this:
  val readOptionalTrailer = new Consumer[String, Option[String]] {
    def consume(input: Input[String]): ConsumerState[String, Option[String]] =
      input match {
        case Chunk(c) if c.startsWith("! ") => Done(Some(c.substring(2)), Empty)
        case Chunk(c) => Error(new Exception("Not trailer line"))
        case Empty => Continue(this, Empty)
        case EOF => Done(None, EOF)
      }
  }

What this consumer does is return an Option, if we have an EOF our trailer option is None, otherwise we get its value. With this we can handle messages with many body lines and an options trailer, all under the same nice to read for comprehension.
  val readMessageWithOptional = for {
    head <- readHeader
    body <- readBody
    tail <- readOptionalTrailer
  } yield (head, body, tail)

Here are some example outputs:
// No trailer
scala> readMessageWithOptional.consumeAll(msg.init)
Either[Throwable,(String, List[String], Option[String])]: Right((Header,List(Body 1, Body 2, Body 3),None))
// Just a header
scala> readMessageWithOptional.consumeAll(msg.take(1))
Either[Throwable,(String, List[String], Option[String])]: Right((Header,List(),None))
// But we have to have a message :)
scala> readMessageWithOptional.consumeAll(Nil)
Either[Throwable,(String, List[String], Option[String])]: Left(java.lang.Exception: Expected header)

We now have a basic Iteratee implementation. There are some alternatives to how to get this implemented. This implementation does capture the basic behavior. It’s amazing how clear the for-comprehension states what the iteratee does. Consumers are a bit large, but they have a single responsibility, handling one part of the message. 

In the next post I’ll explore how to improve these consumers, making them more specific and combining them with simple functions.