Monday, June 10, 2019

Build an Iteratee, a step at a time: The clean up.

A bit like Sheldon Cooper I need to knock three times, or in this case write the third part of the Building an Iteratee step by step. You can see the previous part here and here. It’s been a very long time in the making. In this last part we look at how to clean up the code using combinators.

When we last looked we had built an Iteratee that could read a relatively complex message but a lot of the building blocks were really specific. They intermingled reading data with general concepts like optional or repeating elements. We now explore how to build the readers with reusable code.

Step 6: Removing duplication with Combinators

First improvement is to handle the reading of lines using a simple reusable reader. Since the elements to be read are differentiated only by their prefix, we should create a code that build a reader based on the prefix. To make error reporting clear we will add a name to each reader.

  def matchLine(name: String, prefix: String) = new Consumer[String, String] {
    def consume(input: Input[String]): ConsumerState[String, String] =
      input match {
        case Chunk(c) if c.startsWith(prefix) => Done(c.substring(prefix.length), Empty)
        case Chunk(c) => Error(new Exception(s"Not $name line"))
        case Empty => Continue(this, Empty)
        case EOF => Error(new Exception(s"$name expected"))
      }
  }


To create a reader for a given line you now use code like the following:

  val readHeader = matchLine("Header", "# ")
  val readAlternateHeader = matchLine("Header 2", "## ")
  val readSingleBody = matchLine("Body", ". ")
  val readSingleBodyHeader = matchLine("Body Header", "- ")
  val readTrailer = matchLine("Trailer", "! ")


This greatly simplifies the reader code and capture the general logic of each reader succinctly.  However we can no longer can read optional or repeated elements.

Handling Optional Elements


To recover our ability to read optional element we need to create a combinator for this. The idea is to handle the optionality of an element by wrapping a Consumer in the optional combinator.

def optional[I, T](consumer: Consumer[I, T]): Consumer[I, Option[T]] = new Consumer[I, Option[T]] {
  def consume(input: Input[I]): ConsumerState[I, Option[T]] = consumer.consume(input) match {
    case Done(value, remainder) => Done(Some(value), remainder)
    case Error(e) => Done(None, input)
    case Continue(next, remainder) => Continue(optional(next), remainder)
  }
}

This code is really simple. It tries to consume the input,  if it gets an Error this means that it did not get the optional element, so it returns Done(None, input). This indicates the absence of the element and the fact that the step did not consume the input. Otherwise is return the value read and finishes it’s work.

Handling Repeated Elements

Again to be able to handled repeated elements we need to create a new combinator. This one is a bit more complex since it has to accumulate partial results:

  
def repeat[I, T](consumer: Consumer[I, T]) = repeatRecursive(consumer, Nil)

def repeatRecursive[I, T](consumer: Consumer[I, T], accumulated: List[T]): Consumer[I, List[T]] =
  new Consumer[I, List[T]] {
    def consume(input: Input[I]): ConsumerState[I, List[T]] =
      consumer.consume(input) match {
        case Error(e) => Done(accumulated.reverse, input)
        case Continue(step, remainder) => Continue(repeatRecursive(consumer, accumulated), remainder)
        case Done(value, remainder) => Continue(repeatRecursive(consumer, value :: accumulated), remainder)
        }
    }
The repeat combinator wraps our reader in a function that accumulates partial results. At each step the accumulator reader attempts to consume the input. If the value is consumed (meaning it gets a Done result) it stores the value and applies itself again. If consuming the input produces an error we assume that the repeat group has ended and return the accumulated value.

Alternative Headers


The last combinator on this step allows us to attempt two different Consumers at a given stage. Maybe we have two type of header that can appear and we want to read either one of them.  The alternate combinator gets two consumers and will try the input on the first one, if that fails it used the other input.

def alternate[I, T](consumer1: Consumer[I, T], consumer2: Consumer[I, T]): Consumer[I, T] = new Consumer[I, T] {
  def consume(input: Input[I]): ConsumerState[I, T] =
    consumer1.consume(input) match {
      case done@Done(value, remainder) => done
      case Error(_) => Continue(consumer2, input)
      case Continue(next, remainder) => Continue(alternate(next, consumer2), remainder)
    }
 }

This makes have alternate paths really straight forward as shown below:

 val readMessageWithOptionalAlternateHeader = for {
   head <- alternate(readHeader, readAlternateHeader)
   body <- repeat(readSingleBody)
   tail <- optional(readTrailer)
 } yield (head, body, tail)

Putting it together 

The interesting part of this idea is that we can now use the combinators to create new behavior out of simple readers. This means changing the behavior is really easy and each part of code is focused on a single concern.

Our reader to get our message with optional trailer and several body parts can be written as follows:

 val readMessageWithOptionalTrailer = for {
    head <- readHeader
    body <- repeat(readSingleBody)
    tail <- optional(readTrailer)
 } yield (head, body, tail)

And it works as expected:

scala> readMessageWithOptionalTrailer.consumeAll(msg)
res3: Either[...] = Right((Header,List(Body 1, Body 2, Body 3),Some(Trailer)))

So let’s go crazy and attempt to read a more complex message with several body blocks containing a header and some lines. Here is an example of such a message:
 
# Header
- Body 1
. B1.c1
. B1.c2
- Body 2
. B2.c1
! Trailer

The lines that start with dash are the separators between body blocks. So we now have block with a repeat group.

Our reader should be the following:

val readNestedBody = for {
    bodyHeader <- readSingleBodyHeader
    content <- repeat(readSingleBody)
 } yield (bodyHeader, content)

val readNestedMessage = for {
    head <- readHeader
    body <- repeat(readNestedBody)
    tail <- optional(readTrailer)
  } yield (head, body, tail)>

This is really straightforward and easy to understand. The first iteratee readNestedBody will read the body blocks starting with line with a dash then several lines with periods. The second iteratee readNestedMessage, uses a repeat of the first one to read the message.

So lets see how it handles our nested message:

scala> readNestedMessage.consumeAll(nestedMessage)
res1: Either[...] = Right((Header,List(),None)) 

That's not right! You can see that the message was accepted (at least partially), but none of the body was captured. Why did this happen?

Step 7: Handling nested iterators

The error we saw above happens because our repeat combinator assumes we have a single step iteratee. In this part will fix this problem so that we can handle nested iteratees (code available here). When we feed readNestedBody to it, it does not accumulate the values.

When one step ends and another begins, it is flatMap that passes control to the next step. Let’s recall flatMap:

def flatMap[S](f: T => Consumer[I, S])(implicit manifest: Manifest[S]): Consumer[I, S] = {
   new Consumer[I, S] {
     def consume(input: Input[I]): ConsumerState[I, S] =
       self.consume(input) match {
         case Error(e) => Error(e)
         case Done(value, remainder) => Continue(f(value), remainder)
         case Continue(nextCook, remainder) => Continue(nextCook flatMap f, remainder)
       }
     }
   }


Notice that when flatMap gets a Done, it will pass the result to then next step (f) inside the Continue.  But our repeat combinatory assumes there is only one step. So when the nested iteratee finishes, it’s handed a partial result wrapping in the next step, we ignore the next step provided by flatMap and restart the reader from scratch.

To fix this issue we have to acknowledge that the step in provided in the Continue case and weave that in our logic, as follows:

def repeat[I, T](consumer: Consumer[I, T]) = 
   repeatRecursive(consumer, consumer, Nil)

def repeatRecursive[I, T](groupStart: Consumer[I, T], step: Consumer[I, T], accumulated: List[T]): Consumer[I, List[T]] =
  new Consumer[I, List[T]] {
    def consume(input: Input[I]): ConsumerState[I, List[T]] =
      step.consume(input) match {
        case Error(e) if groupStart == step && accumulated.nonEmpty => 
            Done (accumulated.reverse, input)
        case Error(e) => Error(e)
        case Continue(nextStep, remainder) => Continue(repeatRecursive(groupStart, nextStep, accumulated), remainder)
        case Done(value, remainder) => Continue(repeatRecursive(groupStart, groupStart, value :: accumulated), remainder)
      }
  }

In this version repeatRecursive has two Consumer parameters. The first represents the iteratee that forms the repeat group. The second is the next step, passed to it by the flatMap. When flatMap passes a new Consumer we create a new repeatRecursive capturing the initial iteratee and the next step in the process.

Also notice that we have two cases for Error. The first is when the error happens and the current step is the same as the groupStart and we have accumulated some value. What this means is that the input that was not consumed was at the beginning of the group and the accumulator has successfully read something. In this case we simply return the accumulated value. If there is no accumulated value, and it has gotten an error that means that it failed to read using the nested Consumer. In this case the error is a real problem and should be sent on.

If the consuming the input results in a Continue, it progresses to the next step as a new repeatRecursive. If it gets a Done, the nested Consumer has successfully read it’s data, we accumulated the value and start a new repeatRecursive from the being of the original Consumer.

With this code we can now can now handled nested repeat groups. This approach has one shortcoming, if the message is just the header, it will fail to read the input, as show below:

scala> readNestedMessage.consumeAll(nestedMessage)
res1: Either[...] = Right((Header,List((Body 1,List(B1.c1, B1.c2)), (Body 2,List(B2.c1))),Some(Trailer)))
#Fails to read
scala> readNestedMessage.consumeAll(nestedMessage.take(1))
res2: Either[...] = Left(java.lang.Exception: Body Header expected)

Interestingly a message with just the header failed to be processed. It should be allow. Next we explore how to fix this.

Step 8: Allowing zero sized repeat groups


On the last example the Iteratee would fail while reading a message compose only of the primary header. This should be allowed since zero entries should be a valid repeat group. This happened because we explicitly protected against empty accumulated values in the previous step. Will fix this in this step (code here).

To fix this we need to mark the end of a repeat group with a special Error message. This is done using the alternate combinator and a custom Consumer, shown below:

def wrappedRepeatGroup[I, T](consumer: Consumer[I, T]) = alternate(consumer, new Consumer[I, T] {
    def consume(input: Input[I]) = Error(null)
  })

def repeat[I, T](consumer: Consumer[I, T]) = {
  val wrapped = wrappedRepeatGroup(consumer)
  repeatRecursive(wrapped, wrapped, Nil)
}

Now the repeatRecursive code has to be adjusted to handle errors in the special Error marker, as shown here:

 def repeatRecursive[I, T](groupStart: Consumer[I, T], step: Consumer[I, T], accumulated: List[T]): Consumer[I, List[T]] = new Consumer[I, List[T]] {
     def consume(input: Input[I]): ConsumerState[I, List[T]] =
       step.consume(input) match {
         case Error(null) => Done(accumulated.reverse, input)
         case Error(e) => Error(e)
         case Continue(nextStep, remainder) => Continue(repeatRecursive(groupStart, nextStep, accumulated), remainder)
         case Done(value, remainder) => Continue(repeatRecursive(groupStart, groupStart, value :: accumulated), remainder)
        }
} 

Now the code will allow zero or more in a repeat group.

scala> readNestedMessage.consumeAll(nestedMessage.take(1))
res1: Either[...] = Right((Header,List(),None))


It’s important to note that consumeAll will normally ignore leftover input.  So processing inputs may pass with a lot of leftover input if you don’t check that. For the purposes of this blog I did not bother in checking. I leave checking for the leftover input as an exercise for the reader.

Final Considerations


This is just a brief introduction of Iteratee and the use of combinators. I hope it sheds some light on why these functional approaches can provide clarity.  The sample code here can be extended.  I actually have a more complete and fully tested implementation on another project (code and tests).

It is worthwhile to take some of these functional concepts and tear them apart to see what make them tick.