Monday, June 10, 2019

Build an Iteratee, a step at a time: The clean up.

A bit like Sheldon Cooper I need to knock three times, or in this case write the third part of the Building an Iteratee step by step. You can see the previous part here and here. It’s been a very long time in the making. In this last part we look at how to clean up the code using combinators.

When we last looked we had built an Iteratee that could read a relatively complex message but a lot of the building blocks were really specific. They intermingled reading data with general concepts like optional or repeating elements. We now explore how to build the readers with reusable code.

Step 6: Removing duplication with Combinators

First improvement is to handle the reading of lines using a simple reusable reader. Since the elements to be read are differentiated only by their prefix, we should create a code that build a reader based on the prefix. To make error reporting clear we will add a name to each reader.

  def matchLine(name: String, prefix: String) = new Consumer[String, String] {
    def consume(input: Input[String]): ConsumerState[String, String] =
      input match {
        case Chunk(c) if c.startsWith(prefix) => Done(c.substring(prefix.length), Empty)
        case Chunk(c) => Error(new Exception(s"Not $name line"))
        case Empty => Continue(this, Empty)
        case EOF => Error(new Exception(s"$name expected"))
      }
  }


To create a reader for a given line you now use code like the following:

  val readHeader = matchLine("Header", "# ")
  val readAlternateHeader = matchLine("Header 2", "## ")
  val readSingleBody = matchLine("Body", ". ")
  val readSingleBodyHeader = matchLine("Body Header", "- ")
  val readTrailer = matchLine("Trailer", "! ")


This greatly simplifies the reader code and capture the general logic of each reader succinctly.  However we can no longer can read optional or repeated elements.

Handling Optional Elements


To recover our ability to read optional element we need to create a combinator for this. The idea is to handle the optionality of an element by wrapping a Consumer in the optional combinator.

def optional[I, T](consumer: Consumer[I, T]): Consumer[I, Option[T]] = new Consumer[I, Option[T]] {
  def consume(input: Input[I]): ConsumerState[I, Option[T]] = consumer.consume(input) match {
    case Done(value, remainder) => Done(Some(value), remainder)
    case Error(e) => Done(None, input)
    case Continue(next, remainder) => Continue(optional(next), remainder)
  }
}

This code is really simple. It tries to consume the input,  if it gets an Error this means that it did not get the optional element, so it returns Done(None, input). This indicates the absence of the element and the fact that the step did not consume the input. Otherwise is return the value read and finishes it’s work.

Handling Repeated Elements

Again to be able to handled repeated elements we need to create a new combinator. This one is a bit more complex since it has to accumulate partial results:

  
def repeat[I, T](consumer: Consumer[I, T]) = repeatRecursive(consumer, Nil)

def repeatRecursive[I, T](consumer: Consumer[I, T], accumulated: List[T]): Consumer[I, List[T]] =
  new Consumer[I, List[T]] {
    def consume(input: Input[I]): ConsumerState[I, List[T]] =
      consumer.consume(input) match {
        case Error(e) => Done(accumulated.reverse, input)
        case Continue(step, remainder) => Continue(repeatRecursive(consumer, accumulated), remainder)
        case Done(value, remainder) => Continue(repeatRecursive(consumer, value :: accumulated), remainder)
        }
    }
The repeat combinator wraps our reader in a function that accumulates partial results. At each step the accumulator reader attempts to consume the input. If the value is consumed (meaning it gets a Done result) it stores the value and applies itself again. If consuming the input produces an error we assume that the repeat group has ended and return the accumulated value.

Alternative Headers


The last combinator on this step allows us to attempt two different Consumers at a given stage. Maybe we have two type of header that can appear and we want to read either one of them.  The alternate combinator gets two consumers and will try the input on the first one, if that fails it used the other input.

def alternate[I, T](consumer1: Consumer[I, T], consumer2: Consumer[I, T]): Consumer[I, T] = new Consumer[I, T] {
  def consume(input: Input[I]): ConsumerState[I, T] =
    consumer1.consume(input) match {
      case done@Done(value, remainder) => done
      case Error(_) => Continue(consumer2, input)
      case Continue(next, remainder) => Continue(alternate(next, consumer2), remainder)
    }
 }

This makes have alternate paths really straight forward as shown below:

 val readMessageWithOptionalAlternateHeader = for {
   head <- alternate(readHeader, readAlternateHeader)
   body <- repeat(readSingleBody)
   tail <- optional(readTrailer)
 } yield (head, body, tail)

Putting it together 

The interesting part of this idea is that we can now use the combinators to create new behavior out of simple readers. This means changing the behavior is really easy and each part of code is focused on a single concern.

Our reader to get our message with optional trailer and several body parts can be written as follows:

 val readMessageWithOptionalTrailer = for {
    head <- readHeader
    body <- repeat(readSingleBody)
    tail <- optional(readTrailer)
 } yield (head, body, tail)

And it works as expected:

scala> readMessageWithOptionalTrailer.consumeAll(msg)
res3: Either[...] = Right((Header,List(Body 1, Body 2, Body 3),Some(Trailer)))

So let’s go crazy and attempt to read a more complex message with several body blocks containing a header and some lines. Here is an example of such a message:
 
# Header
- Body 1
. B1.c1
. B1.c2
- Body 2
. B2.c1
! Trailer

The lines that start with dash are the separators between body blocks. So we now have block with a repeat group.

Our reader should be the following:

val readNestedBody = for {
    bodyHeader <- readSingleBodyHeader
    content <- repeat(readSingleBody)
 } yield (bodyHeader, content)

val readNestedMessage = for {
    head <- readHeader
    body <- repeat(readNestedBody)
    tail <- optional(readTrailer)
  } yield (head, body, tail)>

This is really straightforward and easy to understand. The first iteratee readNestedBody will read the body blocks starting with line with a dash then several lines with periods. The second iteratee readNestedMessage, uses a repeat of the first one to read the message.

So lets see how it handles our nested message:

scala> readNestedMessage.consumeAll(nestedMessage)
res1: Either[...] = Right((Header,List(),None)) 

That's not right! You can see that the message was accepted (at least partially), but none of the body was captured. Why did this happen?

Step 7: Handling nested iterators

The error we saw above happens because our repeat combinator assumes we have a single step iteratee. In this part will fix this problem so that we can handle nested iteratees (code available here). When we feed readNestedBody to it, it does not accumulate the values.

When one step ends and another begins, it is flatMap that passes control to the next step. Let’s recall flatMap:

def flatMap[S](f: T => Consumer[I, S])(implicit manifest: Manifest[S]): Consumer[I, S] = {
   new Consumer[I, S] {
     def consume(input: Input[I]): ConsumerState[I, S] =
       self.consume(input) match {
         case Error(e) => Error(e)
         case Done(value, remainder) => Continue(f(value), remainder)
         case Continue(nextCook, remainder) => Continue(nextCook flatMap f, remainder)
       }
     }
   }


Notice that when flatMap gets a Done, it will pass the result to then next step (f) inside the Continue.  But our repeat combinatory assumes there is only one step. So when the nested iteratee finishes, it’s handed a partial result wrapping in the next step, we ignore the next step provided by flatMap and restart the reader from scratch.

To fix this issue we have to acknowledge that the step in provided in the Continue case and weave that in our logic, as follows:

def repeat[I, T](consumer: Consumer[I, T]) = 
   repeatRecursive(consumer, consumer, Nil)

def repeatRecursive[I, T](groupStart: Consumer[I, T], step: Consumer[I, T], accumulated: List[T]): Consumer[I, List[T]] =
  new Consumer[I, List[T]] {
    def consume(input: Input[I]): ConsumerState[I, List[T]] =
      step.consume(input) match {
        case Error(e) if groupStart == step && accumulated.nonEmpty => 
            Done (accumulated.reverse, input)
        case Error(e) => Error(e)
        case Continue(nextStep, remainder) => Continue(repeatRecursive(groupStart, nextStep, accumulated), remainder)
        case Done(value, remainder) => Continue(repeatRecursive(groupStart, groupStart, value :: accumulated), remainder)
      }
  }

In this version repeatRecursive has two Consumer parameters. The first represents the iteratee that forms the repeat group. The second is the next step, passed to it by the flatMap. When flatMap passes a new Consumer we create a new repeatRecursive capturing the initial iteratee and the next step in the process.

Also notice that we have two cases for Error. The first is when the error happens and the current step is the same as the groupStart and we have accumulated some value. What this means is that the input that was not consumed was at the beginning of the group and the accumulator has successfully read something. In this case we simply return the accumulated value. If there is no accumulated value, and it has gotten an error that means that it failed to read using the nested Consumer. In this case the error is a real problem and should be sent on.

If the consuming the input results in a Continue, it progresses to the next step as a new repeatRecursive. If it gets a Done, the nested Consumer has successfully read it’s data, we accumulated the value and start a new repeatRecursive from the being of the original Consumer.

With this code we can now can now handled nested repeat groups. This approach has one shortcoming, if the message is just the header, it will fail to read the input, as show below:

scala> readNestedMessage.consumeAll(nestedMessage)
res1: Either[...] = Right((Header,List((Body 1,List(B1.c1, B1.c2)), (Body 2,List(B2.c1))),Some(Trailer)))
#Fails to read
scala> readNestedMessage.consumeAll(nestedMessage.take(1))
res2: Either[...] = Left(java.lang.Exception: Body Header expected)

Interestingly a message with just the header failed to be processed. It should be allow. Next we explore how to fix this.

Step 8: Allowing zero sized repeat groups


On the last example the Iteratee would fail while reading a message compose only of the primary header. This should be allowed since zero entries should be a valid repeat group. This happened because we explicitly protected against empty accumulated values in the previous step. Will fix this in this step (code here).

To fix this we need to mark the end of a repeat group with a special Error message. This is done using the alternate combinator and a custom Consumer, shown below:

def wrappedRepeatGroup[I, T](consumer: Consumer[I, T]) = alternate(consumer, new Consumer[I, T] {
    def consume(input: Input[I]) = Error(null)
  })

def repeat[I, T](consumer: Consumer[I, T]) = {
  val wrapped = wrappedRepeatGroup(consumer)
  repeatRecursive(wrapped, wrapped, Nil)
}

Now the repeatRecursive code has to be adjusted to handle errors in the special Error marker, as shown here:

 def repeatRecursive[I, T](groupStart: Consumer[I, T], step: Consumer[I, T], accumulated: List[T]): Consumer[I, List[T]] = new Consumer[I, List[T]] {
     def consume(input: Input[I]): ConsumerState[I, List[T]] =
       step.consume(input) match {
         case Error(null) => Done(accumulated.reverse, input)
         case Error(e) => Error(e)
         case Continue(nextStep, remainder) => Continue(repeatRecursive(groupStart, nextStep, accumulated), remainder)
         case Done(value, remainder) => Continue(repeatRecursive(groupStart, groupStart, value :: accumulated), remainder)
        }
} 

Now the code will allow zero or more in a repeat group.

scala> readNestedMessage.consumeAll(nestedMessage.take(1))
res1: Either[...] = Right((Header,List(),None))


It’s important to note that consumeAll will normally ignore leftover input.  So processing inputs may pass with a lot of leftover input if you don’t check that. For the purposes of this blog I did not bother in checking. I leave checking for the leftover input as an exercise for the reader.

Final Considerations


This is just a brief introduction of Iteratee and the use of combinators. I hope it sheds some light on why these functional approaches can provide clarity.  The sample code here can be extended.  I actually have a more complete and fully tested implementation on another project (code and tests).

It is worthwhile to take some of these functional concepts and tear them apart to see what make them tick.

Tuesday, November 19, 2013

Building an Iteratee, a step at a time.

After my excellent experience with the Cooking with for Comprehension, I’m back for the next logical step, Iteratees. The previous work focused on how for-comprehension works when composing generators. In a comment from pagoda_5b mentioned that I have implemented a Reader Monad. It makes sense since the same input is woven through all generator, each one generating a different output.

While I would love to continue using the cooking sample, I could not find a easy way to extend this and still make sense. So we will start our first step with a generalized version of the reader monad of the previous post. Since we are going to extend this monad to create our iteratee, I'll name the reader monad as a Consumer. Below we have a reader monad that takes a single input of type I and output a value of type T.  
trait Consumer[I, T] {
    self =>
    def consume(input: I): T

    def flatMap[S](f: T => Consumer[I, S]): Consumer[I, S] = {
      new Consumer[I, S] {
        def consume(input: I): S = f(self.consume(input)).consume(input)
      }
    }

    def map[S](f: T => S): Consumer[I, S] = {
      new Consumer[I, S] {
        def consume(input: I): S = f(self.consume(input))
      }
    }
  }

Step 1 - Basic structure


In order to move from our reader monad to an iteratee we need a good use case. Reading a message is a good example. Let’s assume we have a message composed of several lines, like the one below:
# Header
. Body
! Trailer

Our reader monad will give the same input to all generators. While we could make each once receive the entire message and read only the part it wants, this makes the handling of irrelevant lines my problem. I want to read a line at a time and have each generator read its line then pass control to the next one. I’d also like to express this via for comprehensions, as shown below:
  val readMessage = for {
    head <- readHeader
    body <- readBody
    tail <- readTrailer
  } yield (head, body, tail)

At each step I want to feed a line to my consumer and get back the consumer that will handle the next line. As a first step (step 1 on github), a consumer must be able to tell when it’s done or to pass control to the next consumer down the line. We create a ConsumerState with Done and Continue to represent these two possible outcomes:
  sealed trait ConsumerState[I, T]

  case class Done[I, T](value: T) extends ConsumerState[I, T]

  case class Continue[I, T](next: Consumer[I, T]) extends ConsumerState[I, T]

In our initial case each consumer looks at one line. We start with a very accepting consumer that only strips the line prefix and is done:
  val readHeader, readBody, readTrailer = new Consumer[String, String] {
    def consume(input: String): ConsumerState[String, String] = Done(input.substring(2))
  }

For the this initial step, all the readers have the same behavior.

Now how do we link them together? Again flatMap and map come to the rescue. In this case we have to handle the different consumer state.
    // Our consume function
    def consume(input: I): ConsumerState[I, T]

    def flatMap[S](f: T => Consumer[I, S]): Consumer[I, S] = {
      new Consumer[I, S] {
        def consume(input: I): ConsumerState[I, S] = self.consume(input) match {
          case Done(value) => Continue(f(value))
          case Continue(nextCook) => Continue(nextCook flatMap f)
        }
      }
    }

    def map[S](f: T => S): Consumer[I, S] = {
      new Consumer[I, S] {
        def consume(input: I): ConsumerState[I, S] = self.consume(input) match {
          case Done(value) => Done(f(value))
          case Continue(nextCook) => Continue(nextCook map f)
        }
      }
    }

Notice that flatMap goes from a Done to a Continue state. So once one consumer is done, flatMap hands control over to the next consumer. All while aligning the output types, as in the reader monad case.

If we feed the first line of the input to a one of our consumers, we get a Done. However if we feed the first line to the composed consumer we get a Continue:
scala> readHeader.consume(msg(0))
ConsumerState[String,String]: Done(Header)
scala> readMessage.consume(msg(0))
ConsumerState[String,(String, String, String)]: Continue(iteratee.IterateeStep1$Consumer$$anon$4@1ac0bc3a)

So we now need something that loops through the input feeding each line to the next consumer. We do this with the following function:
    def consumeAll(inputs: List[I]): Option[T] = {
      self.consume(inputs.head) match {
        case Done(value) => Some(value)
        case Continue(next) => next.consumeAll(inputs.tail)
      }
    }

And using this function we can read the entire message with a single command, and get the weaving of all the consumers:
scala> readMessage.consumeAll(msg)
Option[(String, String, String)]: Some((Header,Body,Trailer))

But our consumer is really not picky, and will accept anything as an input:
scala> readMessage.consumeAll(msg.reverse)
Option[(String, String, String)]: Some((Trailer,Body,Header))

Step 2: Handling errors

This is clearly not what we want. So we move to the next step (see step 2 on github). We add the ability to handle errors. We do this by adding an Error state to our consumer state. We use Throwable to get the line in which the error occurred in an output message.
  sealed trait ConsumerState[I, T]

  case class Done[I, T](value: T) extends ConsumerState[I, T]

  case class Continue[I, T](next: Consumer[I, T]) extends ConsumerState[I, T]

  case class Error[I, T](error: Throwable) extends ConsumerState[I, T]


This new state is simply passed on in the map and flatMap functions.

    def flatMap[S](f: T => Consumer[I, S])(implicit manifest: Manifest[S]): Consumer[I, S] = {
      new Consumer[I, S] {
        def consume(guestCount: I): ConsumerState[I, S] = self.consume(guestCount) match {
          case Error(e) => Error(e)
          case Done(value) => Continue(f(value))
          case Continue(nextCook) => Continue(nextCook flatMap f)
        }
      }
    }

    def map[S](f: T => S)(implicit manifest: Manifest[S]): Consumer[I, S] = {
      new Consumer[I, S] {
        def consume(guestCount: I): ConsumerState[I, S] = self.consume(guestCount) match {
          case Done(value) => Done(f(value))
          case Error(e) => Error(e)
          case Continue(nextCook) => Continue(nextCook map f)
        }
      }
    }

And our consumer can now validate our input as shown below  (see step 2 on github for the all the readers):
  val readHeader = new Consumer[String, String] {
    def consume(input: String): ConsumerState[String, String] =
      if (input.startsWith("# ")) Done(input.substring(2))
      else Error(new Exception("Not header line"))
  }

The consumeAll has to be adjusted to handle the possible error outcome. In this case we use Scala’s Either. Left being the error state, and Right being a good computation. 
    def consumeAll(inputs: List[I]): Either[Throwable, T] = {
      self.consume(inputs.head) match {
        case Done(value) => Right(value)
        case Error(error) => Left(error)
        case Continue(next) => next.consumeAll(inputs.tail)
      }
    }

Our message reader now requires the correct content. So feeding a good input produces a Right answer:
scala> readHeader.consume(msg(0))
iteratee.IterateeStep2.ConsumerState[String,String]: Done(Header)
//Read full message
scala> readMessage.consumeAll(msg)
Either[Throwable,(String, String, String)]: Right((Header,Body,Trailer))

While feeding an incorrect message will produce a Left with the error messages:
scala> readHeader.consume(msg(1))
iteratee.IterateeStep2.ConsumerState[String,String]: Error(java.lang.Exception: Not header line)

This is pretty cool already.

Step 3: Repeating parts

The next step (see step in github) is to handle multiple body lines. For example the message:
# Header
. Body 1
. Body 2
. Body 3
! Trailer

To read this we need to change our body consumer to handle and accumulate multiple lines. We must accumulate the results and we read them and produce the accumulated data as output. This is a way to do this:
  val readBody = recursiveBody(Nil)

  def recursiveBody(accumulated: List[String]): Consumer[String, List[String]] = new Consumer[String, List[String]] {
    def consume(input: String): ConsumerState[String, List[String]] =
      if (input.startsWith(". ")) Continue(recursiveBody(input.substring(2) :: accumulated))
      else Done(accumulated)
  }

When we run this with consumeAll from above, we get an exception NoSuchElementException. This is cause by consuming the entire input list. What is happening is that when recursiveBody fails to match a body line, it consumes the input. The next call to consumeAll gets an empty list. Our iteratee is losing the trailer line.

The first thing to do is make consumeAll handle an empty list. This version will produce an error on an empty list:
    def consumeAll(inputs: List[I]): Either[Throwable, T] = {
      if (inputs.isEmpty) {
        Left(new RuntimeException("Premature end of stream"))
      } else {
        self.consume(inputs.head) match {
          case Done(value) => Right(value)
          case Error(error) => Left(error)
          case Continue(next) => next.consumeAll(inputs.tail)
        }
      }
    }

However we still get a premature end of stream, since we haven't really fixed the issue of consuming the input. The only way around this is having two trailer lines:
// New consumeAll empty list handling
scala> readMessage.consumeAll(message)
Either[Throwable,(String, List[String], String)]: Left(java.lang.RuntimeException: Premature end of stream)
// Workaround adding two trailer
scala> readMessage.consumeAll(message ::: List("! Trailer 2"))
Either[Throwable,(String, List[String], String)]: Right((Header,List(Body 3, Body 2, Body 1),Trailer 2))

Step 4: Fixing repeat discarding unused input

This is clearly not what we want. To fix this, the consumer must be able to indicate it did not consume the input (see step 4 on github). This can be done by allowing Done and Continue to return the unconsumed input. We also introduce an Input trait to wrap the data being passed to the Iteratee. The Empty case indicates that there is no input, or that input was consume by the consumer.  The Chunk case is used to indicate that we have a bit of information to handle, or that was not consumed. We have to adjust the Done and Continue states to return an Input. This could be done with Option, but we will need to expand this soon.
  sealed trait ConsumerState[I, T]

  case class Done[I, T](value: T, remainder: Input[I]) extends ConsumerState[I, T]

  case class Continue[I, T](next: Consumer[I, T], remainder: Input[I]) extends ConsumerState[I, T]

  case class Error[I, T](error: Throwable) extends ConsumerState[I, T]
// New Input types
  sealed trait Input[+I]

  case class Chunk[I](input: I) extends Input[I]

  case object Empty extends Input[Nothing]

We have to change consumeAll since it now has to handled unconsumed input. This is achieved this way:
    def consumeAll(inputs: List[I]): Either[Throwable, T] = {
      if (inputs.isEmpty) {
        Left(new RuntimeException("Premature end of stream"))
      } else {
        self.consume(Chunk(inputs.head)) match {
          case Done(value, _) => Right(value)
          case Error(error) => Left(error)
          case Continue(next, Empty) => next.consumeAll(inputs.tail)
          case Continue(next, Chunk(c)) => next.consumeAll(c :: inputs.tail)
        }
      }
    }

If consumeAll get a Continue with Emtpy remainder, it knows the consumer has used it's input. Otherwise it passes the returned input to next one down the line. This is choice means the consumer can actually alter the input received by the next consumer. This is acceptable here, but we could be more strict.

Changes to map and flatMap are trivial (see step 4 on github). With these changes we can read inputs with no body and with several lines in the body.
scala> readMessage.consumeAll(msg)
Either[Throwable,(String, List[String], String)]: Right((Header,List(Body 1, Body 2, Body 3),Trailer))
//Read a message with not trailer still fails
scala> readMessage.consumeAll(msg.init)
Either[Throwable,(String, List[String], String)]: Left(java.lang.RuntimeException: Premature end of stream)
// Look no body
scala> readMessage.consumeAll(List("# a head", "! a tail"))
Either[Throwable,(String, List[String], String)]: Right((a head,List(),a tail))

The consumers have to handle the empty case, which normally means continuing to itself. This is important because you may see Empty in some cases. This allows us to stall the iteratee by feeding it empty.  I have seen post that this feature can be used for practical reasons, where data is not always available to feed to the iteratee. Here is an example of stalling the iteratee:
    val afterEmpty = readHeader.consume(Empty)
    afterEmpty match {
      case Continue(next, _) =>
        println("After Empty: " + next.consume(Chunk("# The Head")))
      case _ =>
        println("After Empty unexpected: " + afterEmpty)
    }


Step 5: Handling optional parts

One last step on our basic Iteratee is having the ability to handle premature end of input better (see step 5 on github). If our message does not have the all lines required, our current implementation will return:
Left(java.lang.RuntimeException: Premature end of stream)

One reason to do this, it that parts of the message may be optional, and not having them is acceptable. But to be sure these optional parts are not there, we need to inform the iteratee that the input stream is finished. To do this we change our Input trait to include an EOF input value:
  sealed trait Input[+I]

  case class Chunk[I](input: I) extends Input[I]

  case object Empty extends Input[Nothing]

  case object EOF extends Input[Nothing]

We have to change our consumeAll to propagate EOF. The implantation here will push a EOF if the list is empty, and if a Continue returns EOF as a remainder, we will truncate the input allowing an consumer to finish the stream. This change also cleans up the horrible if statement:
    def consumeAll(inputs: List[I]): Either[Throwable, T] = {
      self.consume(if (inputs.isEmpty) EOF else Chunk(inputs.head)) match {
        case Done(value, _) => Right(value)
        case Error(error) => Left(error)
        case Continue(next, EOF) => next.consumeAll(Nil)
        case Continue(next, Empty) => next.consumeAll(inputs.tail)
        case Continue(next, Chunk(c)) => next.consumeAll(c :: inputs.tail)
      }
    }

With this change we can provide an incomplete message and get a meaningful message about missing trailer:
scala> readMessage.consumeAll(msg.init)
Either[Throwable,(String, List[String], String)]: Left(java.lang.Exception: Trailer expected)

The new EOF signal also allows us to handle an optional trailer in the message. To make the trailer optional we change the trailerReader to this:
  val readOptionalTrailer = new Consumer[String, Option[String]] {
    def consume(input: Input[String]): ConsumerState[String, Option[String]] =
      input match {
        case Chunk(c) if c.startsWith("! ") => Done(Some(c.substring(2)), Empty)
        case Chunk(c) => Error(new Exception("Not trailer line"))
        case Empty => Continue(this, Empty)
        case EOF => Done(None, EOF)
      }
  }

What this consumer does is return an Option, if we have an EOF our trailer option is None, otherwise we get its value. With this we can handle messages with many body lines and an options trailer, all under the same nice to read for comprehension.
  val readMessageWithOptional = for {
    head <- readHeader
    body <- readBody
    tail <- readOptionalTrailer
  } yield (head, body, tail)

Here are some example outputs:
// No trailer
scala> readMessageWithOptional.consumeAll(msg.init)
Either[Throwable,(String, List[String], Option[String])]: Right((Header,List(Body 1, Body 2, Body 3),None))
// Just a header
scala> readMessageWithOptional.consumeAll(msg.take(1))
Either[Throwable,(String, List[String], Option[String])]: Right((Header,List(),None))
// But we have to have a message :)
scala> readMessageWithOptional.consumeAll(Nil)
Either[Throwable,(String, List[String], Option[String])]: Left(java.lang.Exception: Expected header)

We now have a basic Iteratee implementation. There are some alternatives to how to get this implemented. This implementation does capture the basic behavior. It’s amazing how clear the for-comprehension states what the iteratee does. Consumers are a bit large, but they have a single responsibility, handling one part of the message. 

In the next post I’ll explore how to improve these consumers, making them more specific and combining them with simple functions. 

Wednesday, June 5, 2013

Cooking with For Comprehension

I have been repeatedly trying to understand Iteratees. There are some excellent examples in Scala (Josh Suereth and RunĂ¡r). However everytime I try to follow them, Monads started showing up, and soon I would be neck deep in Scalaz. Monad is a pattern I’m still struggling with. Reading Learn You a Haskell for the Greater Good helped a lot, but I still don’t feel comfortable around monads.

However this was not enough to dissuade me from getting Iteratees. I turned to Haskell to see if I’d get some insight. Reading some articles on iteratees in Haskell I stumbled upon this one this post by Magnus, and the last line of the blog was great “That’s pretty much it. Not that much to it.” So this made me wonder if I was not complicating things too much.

All iteratee examples I saw used for comprehension (or the Haskell equivalent). Maybe my problem was not in the monads. I decided to take baby steps and see if I could understand how for comprehension works.

So I started with the following problem, let’s say I have a Cook defined as follows:
trait Cook[T] {
  def cookFor(guestCount: Int): T
}

This cook will get a number of guests and produce something to feed them.

Now I want to get several cooks to work together and deliver a meal. Based on the examples I found in the post above, this should be something like:
val mealCook = for {
  appetizer <- appetizerCook
  mainCourse <- mainCook
  dessert <- dessertCook
} yield (appetizer, mainCourse, dessert)

The mealCook should have the cookFor method on it and deliver whatever types the meal is composed of.  But how exactly does this work? How will the compiler figure out the types, how will a single cookFor call weave all these cooks together. This was the crux of my lack of understanding. I had used these for-comprehensions before in Martin Odersky’s excellent “Functional Programming in Scala” (available at Coursera). But I really didn’t understand how it worked.

In order to get the example above to compile we need to add couple of methods on our Cook trait. From the compiler error and previous reading I knew it needed the map and flatMap methods implemented. And to make matter easier to investigate, I added type manifests and a self type:
trait Cook[T] {
  self =>
  def cookFor(guestCount: Int): T
  def flatMap[S](f: T => Cook[S])(implicit manifest: Manifest[S]): Cook[S] = ???
  def map[S](f: T => S)(implicit manifest: Manifest[S]): Cook[S] = ???
}

The self type is used to allow us to get the current cook when implementing the methods, as you’ll see below. I will not have the type manifest in all examples for brevity, but they did help when trying to look at types.

Now all we need are some cooks that do real work:
val appetizerCook = new Cook[List[Int]] {
  def cookFor(guestCount: Int): List[Int] = (1 to guestCount).map(_ + guestCount).toList
}

val dessertCook = new Cook[String] {
  def cookFor(guestCount: Int): String = "(> " * guestCount
}

val mainCourseCook = new Cook[List[String]] {
  def cookFor(guestCount: Int) = List("Meat", "Fish", "Chicken", "Shrimp", "Vegan").take(guestCount)
}

The code now compiles but will not execute because of the incomplete implementations of the map and flatMap methods. However even at this stage we can look at the type of mealCook, which is:
val mealCook: Cook[(List[Int], List[String], String)] = ...

Notice that each cook produces its own type, and these types are captured in the composed cook. This however does not shed light on how the cooks work together. And looking at the type signatures for both map and flatMap I could not explain how the compiler derived the types or how a for-comprehension will get them to work together.

So let’s proceed in our investigation. Next obvious step is to implement the missing methods. This should shed some light on how the combined cook works.

Staring with map, defined as follows:
def map[S](f: T => S): Cook[S]

The type signature requires us to return a Cook[S], but we don’t know anything about S. The only thing we have is a function from T to S. We could do something like:
  def map[S](f: T => S): Cook[S] = {
    new Cook[S] {
      def cookFor(guestCount: Int): S = f(???)
    }
  }

But f needs a value of T, and we don’t have any at hand. However we can get one by calling cookFor of the current cook (represented by self):
  def map[S](f: T => S): Cook[S] = {
    new Cook[S] {
      def cookFor(guestCount: Int): S = f(self.cookFor(guestCount))
    }
  }
We now have something that given a guest count, create a T and then uses f to transform it to an S. But what is the type of S? If we print out the manifest we see it is the type of the value yielded by the for-comprehension. In this source here version we output a lot of information as we execute. For example when flatMap and map are applied we see:
Appetizer: flatMapping to scala.Tuple3[scala.collection.immutable.List[Int], scala.collection.immutable.List[java.lang.String], java.lang.String]
MainCourse: flatMapping to scala.Tuple3[scala.collection.immutable.List[Int], scala.collection.immutable.List[java.lang.String], java.lang.String]
Dessert: mapping to scala.Tuple3[scala.collection.immutable.List[Int], scala.collection.immutable.List[java.lang.String], java.lang.String]

Notice both map and flatMap have the same type manifest for S. My take on this is that f is injecting our value of T into the result of mealCook.    

Now let’s take a crack at flatMap, defined thusly:
  def flatMap[S](f: T => Cook[S]): Cook[S]

Again from it’s signature we can have no clue of what S is. And in this case we don’t have a function that returns an S, instead it returns a Cook of S.

Once again we need to start by return some Cook. Since no information on S is available we need to call f like we did for map. But this returns a Cook and not S; in order to get an S we need to call cookFor on whatever f returned.
  def flatMap[S](f: T => Cook[S]): Cook[S] = {
    new Cook[S] {
      def cookFor(guestCount: Int): S = f(self.cookFor(guestCount)).cookFor(guestCount)
    }
  }
So we see how flatMap links the two cooks, and the invocation of cookFor on both. And again if we print out the manifest of S we would get the type yielded by the for-comprehension. So flatMap gets the food cooked by the current cook and placed it in a cook that returns the type of the comprehension (passing it to f). When we invoke the Cook returned by f we get our value of type S.

But how does the compiler extract the types of each cook? This must be linked to the map and flatMap function.  To prove this we can do a simple change like this (full example can be found here):
  def flatMap[S](f: Int => Cook[S]): Cook[S] = ???
  def map[S](f: Float => S): Cook[S] = ???

Our mealCook type changes to Tuple3[Int, Int, Float]. What this shows is that the type of the function input argument of both flatMap and map defines the types of the generator (which is what each <- in the for-comprehension is called). In our original cooking example this argument is always of type T each cook. The compiler knows that each field in the output tuple will have the same value as the T in the cook used as a generator.

We now know that map’s and flatMap’s f function’s input parameter is the type of the generator. And f’s output is the type of the for-comprehension. The implementation of these two methods is what defines how the Cooks are coordinated to work together creating a meal.

I also did a version that outputs messages while executing the code (source here). This version allows us to see how the execution flows through each cook. The main function is the following:
  def main(args: Array[String]) {
    println("---- Each cook ----")
    println(appetizerCook.cookFor(4))
    println(dessertCook.cookFor(4))
    println(mainCourseCook.cookFor(4))
    println("---- Will start cooking ----")
    println(mealCook.cookFor(4))
  }

During execution each cook will output it's name and what it's doing. The output that it produces is as follows:
Appetizer: flatMapping to scala.Tuple3[scala.collection.immutable.List[Int], scala.collection.immutable.List[java.lang.String], java.lang.String]
---- Each cook ----
Appetizer: execute cook
List(5, 6, 7, 8)
Dessert: execute cook
(> (> (> (> 
MainCourse: execute cook
List(Meat, Fish, Chicken, Shrimp)
---- Will start cooking ----
Appetizer: cookFor in flatMap
Appetizer: execute cook
MainCourse: flatMapping to scala.Tuple3[scala.collection.immutable.List[Int], scala.collection.immutable.List[java.lang.String], java.lang.String]
MainCourse: cookFor in flatMap
MainCourse: execute cook
Dessert: mapping to scala.Tuple3[scala.collection.immutable.List[Int], scala.collection.immutable.List[java.lang.String], java.lang.String]
Dessert: cookFor in map
Dessert: execute cook
(List(5, 6, 7, 8),List(Meat, Fish, Chicken, Shrimp),(> (> (> (> )

The first line is actually called during the initialization, before the main function. Then we see the output of each cook. When we ask for the mealCook's food we see the chaining of all the flatMaps and a map to produce the final output. Each cook is called once, and all of them map to the same S output type.

Some interesting things I learned in this experiment:
  1. Because we have no information on what S is, there are few options on what we can do in the map and flatMap functions. We must use the supplied function just to get the types to align.
  2. The first generator in the for-comprehension will dictate the type of the next generators. Each generator may have its own type parameters, but they all will be of the same class. This is the type constructor we see in some of the Monad examples.
  3. When we look at the individual Cook implementations we see they are oblivious to all the logic and control to connect the cooks. We only focus on what they cook, and leave the connection to the for-comprehension.
This last point was somewhat a revelation to me. This is one of the reasons functional programming is so interesting. I’m telling what each cooks does and how I’d like them connected, but not how to execute the connected cooks. Of course the designer of the Cook trait has to know what happens, but not someone extending a Cook. This is even more impressive when we get to things like Iteratees, which I hope to cover in another post.

Monday, May 13, 2013

Programmer Rehab


For quite some time I have been thinking of writing a blog.  I decided to name this blog Programmer Rehab for a couple of reasons. This rehab is more in the sense of physical rehabilitation, which focuses on recovering motor capabilities after some sort of ailment (see Physical Rehabilitation).  But there is also some of the Winehouse rehab in this. We collect bad habits and limitations that are worthwhile getting out of us in order to become more productive members of ours society.

I have been programming in one form or another since I was eleven. My gateway drug was a Commodore Vic 20. I started programming Basic on that thing. Then I move to an Apple II, which I eventually upgraded to an apple IIe (you could do that).  Once I got to college it was firmly in the realm of Intel based PCs. Most of my programming skill I acquired at university. But this was before Agile and XP. And like all university project, I never maintained my code. Open source was still something that the uber-geeks where doing.

After I left university I started my career as a DBA doing some programming, then moved to systems management software. During this period one of my distinguishing features was that I also wrote programs. Mostly scripts, a couple of ETLs, and some monitoring libraries in Java. All this was closed source. But when I think back this was my first production software. Many of these were in production for quite some time.

In 2005 I wanted to get back to studying. I started a Master in Computer Science. Again I went into the customary programming style of university. Get it done, barely working and move on. However, while working on my final project, a friend commented on Extreme Programming. The part of XP I managed understand and apply was only test driven development, that is, only writing code once I have a test, and fixing bugs by first getting a failing test. This was my first serious glimpse on what the outcome of my rehabilitation would bring. Like someone learning to walk, I had very limited capabilities, only the core of my final project was done in Test Driven way. However this was a liberating experience, and allowed me to get my work done and two articles published.

For as long as I code, even before university, I had been working in trying to get software to help me run my Role Playing Games. As a referee or master there are a lot of moving pieces to track. I had attempted to create a referee assistant dozens of time, and normally failed in the initial planning. In 2008, I spiked a very simple combat assistant in Lisp. This initial work was promising, but I could not see how to get a nice UI for the thing. Another friend mentioned Scala, and I did a headlong plunge into the wonderful world of Scala, TDD and Agile methodologies. I succeeded in creating the software I wanted (project home is here), mainly because I did not over engineer it. I focused on the minimal features and grew from there. This was my Agile epiphany.

Ever since this experience I have become an avid programmer (even though it’s not my primary job). I have been collecting a bunch of experiences from the Web, Conferences and hard work. Hopefully I’ll manage to share some of this journey here. So let’s start: “I’m Thomas and I’m a programmer”.