Examining Hackage: pipes

Posted on June 1, 2015
Tags: haskell

It’s been a while since I did one of these “read a package and write about it” posts. Part of this is that it turns out that most software is awful and writing about code I read just makes me grumpy. However I found something nice to write about! In this post I’d like to close a somewhat embarrassing gap in my knowledge: we’re going to walk through streaming library.

I know that both lists and lazy-IO are kind of.. let’s say fragile but have neglected learning one of these fancy libraries that aim to solve those problems. Today we’ll be looking at one of those libraries, pipes!

Pipes provides one core type Proxy and a few operations on it, like await and yield. We can pair together a pipeline of operations which can send data to their neighbors and request more data from them as they need them. With these coroutine like structures we can nicely implement efficient, streaming computations.

Getting The Code

As always this starts by getting our hands on the code with the

~ $ cabal get pipes
~ $ cd pipes-4.1.5/

Now from here we can query all the available files to see what we’re up against

~/pipes-4.1.5 $ wc -l **/*.hs | sort -nr
  4796 total
  1513 src/Pipes/Tutorial.hs
   854 src/Pipes/Core.hs
   836 src/Pipes/Prelude.hs
   517 src/Pipes.hs
   380 src/Pipes/Lift.hs
   272 tests/Main.hs
   269 src/Pipes/Internal.hs
    85 benchmarks/PreludeBench.hs
    68 benchmarks/LiftBench.hs
     2 Setup.hs

So the first thing I notice is that there’s this great honking module called Pipes.Tutorial which houses a brief introduction to the pipes package. I skimmed this before starting but it doesn’t really seem to explain the implementation details.. If you don’t really know what pipes is, read this tutorial now. After doing so you have exactly my knowledge of pipes!

The next interesting module here is Pipes.Internal. I’ve found that .Internal modules seem to house the fundamental bits of the package so we’ll start there.

Pipes.Internal

This module starts with an emphatic warning

    {-| This is an internal module, meaning that it is unsafe to
        import unless you understand the risks. -}

So this seems like a perfect place to start without really understanding this library :D It exports a few different functions and one type:

    module Pipes.Internal (
        -- * Internal
          Proxy(..)
        , unsafeHoist
        , observe
        , X
        , closed
        ) where

I recognize one of those types: Proxy as the central type behind the whole pipes concept, it is the type of component in the pipe line. Let’s look at how it’s actually implemented

    data Proxy a' a b' b m r
        = Request a' (a  -> Proxy a' a b' b m r )
        | Respond b  (b' -> Proxy a' a b' b m r )
        | M          (m    (Proxy a' a b' b m r))
        | Pure    r

So two of those constructors, M and Pure, look pretty vanilla. The first one let’s us lift an action in the underlying monad m, into Proxy. It’s a little bit weird instead of having M (m r) we instead have M (m (Proxy ...)) however this doesn’t seem like a big deal because we have Pure to promote an r to a Proxy .... r. So we can lift some m r to Proxy a' a b' b m r with M . fmap Pure. It’s still not clear to me why this is a benefit though.

The first two constructors are really cool though, Request and Respond. The first thing that pops into my head is that this looks like a sort of free-monad pattern. Look how we’ve got

  1. The piece of data a user should input to an action (and Request and Respond are definitely actions)
  2. This continuation for a second argument which takes a term of the type returned by the action to another piece of pipe

This would make a lot of sense, free monad transformers nicely give rise to coroutines which are very much in line with pipes. Because of this free monad like shape, I expect that the monad instance will be like free monads and behave “like substitution”. We should chase down the leaves of this Proxy (including under lambdas) and replace each Pure r with f r for >>= and Pure (f a) for fmap.

To check if we’re right, we go down one line

    instance Monad m => Functor (Proxy a' a b' b m) where
        fmap f p0 = go p0 where
            go p = case p of
                Request a' fa  -> Request a' (\a  -> go (fa  a ))
                Respond b  fb' -> Respond b  (\b' -> go (fb' b'))
                M          m   -> M (m >>= \p' -> return (go p'))
                Pure    r      -> Pure (f r)

This looks like what I had in mind, we run down p and in the first 3 branches we recurse. I’ll admit it looks a little intimidating but after staring at it for a bit I realized that the first 3 lines are all just variations on fmap go! Indeed, we can rewrite this to

            go p = case p of
                Request a' fa  -> Request a' (fmap go fa)
                Respond b  fb' -> Respond b  (fmap go fb')
                M          m   -> M (fmap go m)
                Pure    r      -> Pure (f r)

This makes the idea a bit clearer in my mind. Let’s look at the applicative instance next!

    instance Monad m => Applicative (Proxy a' a b' b m) where
        pure      = Pure
        pf <*> px = go pf where
            go p = case p of
                Request a' fa  -> Request a' (\a  -> go (fa  a ))
                Respond b  fb' -> Respond b  (\b' -> go (fb' b'))
                M          m   -> M (m >>= \p' -> return (go p'))
                Pure    f      -> fmap f px
        (*>) = (>>)

First note that pure = Pure which isn’t a stunner just from the naming. In <*> we have the same sort of pattern as in fmap. We race down the “function” side of <*> and whenever we reach a Pure we have a function from a -> b, with that function we call fmap on the structure on the “argument” side. So we’re kind of gluing that px onto that pf by changing each Pure f to fmap f px.

Finally we have the monad instance. Of course the return implementation is the same as for pure but (>>=) = _bind so the implementation of _bind has been chucked out of the instance itself. It turns out there’s a good reason for that: _bind has a bunch of rewrite rules attached to it.

    _bind
        :: Monad m
        => Proxy a' a b' b m r
        -> (r -> Proxy a' a b' b m r')
        -> Proxy a' a b' b m r'
    p0 `_bind` f = go p0 where
        go p = case p of
            Request a' fa  -> Request a' (\a  -> go (fa  a ))
            Respond b  fb' -> Respond b  (\b' -> go (fb' b'))
            M          m   -> M (m >>= \p' -> return (go p'))
            Pure    r      -> f r

Now excitingly the implementation of bind is almost exactly what we had before! Now instead of Pure f -> fmap f px it’s Pure r -> f r so we have something more like substitution than gluing.

Now that Proxy is a monad, we can make it a monad transformer!

    instance MonadTrans (Proxy a' a b' b) where
        lift m = M (m >>= \r -> return (Pure r))

So we need to take an m a and return Proxy a' a b' b m a, we want to use M :: m (Proxy a' a b' b m a) but we have an m a, by fmaping Pure we’re good to go.

From here on out it’s just a series of not so exciting MTL instances so we’ll skip those.. There’s a couple interesting things left though! Before we get to them recall the monad transformer laws

  1. return = lift . return
  2. m >>= f = lift m >>= (lift . f)

In other words, lift should “commute” with the two operations of the monad type class. This isn’t actually true by default with Proxy, for example

return a = Pure a
lift (return a) = M (fmap Pure (return a)) = M (return (Pure a))

To solve this we have observe. This function is supposed to normalize a Proxy so that these laws hold.

    observe :: Monad m => Proxy a' a b' b m r -> Proxy a' a b' b m r
    observe p0 = M (go p0) where
        go p = case p of
            Request a' fa  -> return (Request a' (\a  -> observe (fa  a )))
            Respond b  fb' -> return (Respond b  (\b' -> observe (fb' b')))
            M          m'  -> m' >>= go
            Pure    r      -> return (Pure r)

Note that go takes a Proxy a' a b' b m r and returns m (Proxy a' a b' b m r). By doing this, we can route stick everything in m with return except for M m' which we just unwrap and keep going. This means return (Pure a) = go (Pure a) which is what is required for the monad transformer laws to hold.

Finally, the last thing in this file is X which is used to represent the type for communication that cannot happen. So if we have a pipe at the beginning of the pipeline, it shouldn’t be able to ask for input from another pipe.

    newtype X = X X

    closed :: X -> a
    closed (X x) = closed x

And there are no non-bottom expressions which occupy this type so we’re good. Now that we’ve seen the internal implementation of most of Proxy we can go look at the infrastructure pipes provides around this. Again going by the names, now that we’ve covered the internals it makes sense to move onto Pipes.Core.

Pipes.Core

Pipes.Core seems much closer to the actual user interface of the library, we can see that it exports a bunch of familiar sounding names:

    module Pipes.Core (
          Proxy
        , runEffect
        , respond
        , (/>/)
        , (//>)
        , request
        , (\>\)
        , (>\\)
        , push
        , (>~>)
        , (>>~)
        , pull
        , (>+>)
        , (+>>)
        , reflect
        , X
        , Effect
        , Producer
        , Pipe
        , Consumer
        , Client
        , Server
        , Effect'
        , Producer'
        , Consumer'
        , Client'
        , Server'
        , (\<\)
        , (/</)
        , (<~<)
        , (~<<)
        , (<+<)
        , (<\\)
        , (//<)
        , (<<+)
        , closed
        ) where

Now a few of those we’ve seen before, namely Proxy, X, and closed. Notice that Proxy is exported abstractly here so we can’t write code which violates the monad transformer laws using this module.

The first new function is called runEffect, but it has the type

    Monad m => Effect m r -> m r

Which sounds great! I however have no clue what an effect is so let’s dig around the type exports first. There are a few type synonyms here

    type Effect = Proxy X () () X
    type Producer b = Proxy X () () b
    type Pipe a b = Proxy () a () b
    type Consumer a = Proxy () a () X
    type Client a' a = Proxy a' a () X
    type Server b' b = Proxy X () b' b
    type Effect' m r = forall x' x y' y . Proxy x' x y' y m r
    type Producer' b m r = forall x' x . Proxy x' x () b m r
    type Consumer' a m r = forall y' y . Proxy () a y' y m r
    type Server' b' b m r = forall x' x . Proxy x' x b' b m r
    type Client' a' a m r = forall y' y . Proxy a' a y' y m r

Even though this looks like a lot, about half of these are actually duplicates which just use -XRankNTypes instead of explicitly using X. An Effect as seen above is Proxy X () () X.. I had to double check this but proxy takes 6 type arguments, here they are in order

  1. a' is the type of things that we can send up a Request
  2. a is the type of things which a request will return
  3. b' is what we may be sent to respond to
  4. b is what we may respond with
  5. m is the underlying monad we may use for effects
  6. r is the return value

So an Effect can only request things if it can produce an X, and it will get back a () from its requests, and it can only respond with an X and will get back a () after responding. Since we can never produce an X an Effect can never request or respond.

Similarly, a Producer can respond to things with bs, but it will only ever get back a () after a response and it can never request something. A Consumer is the dual, never responding but can request, it can only hand the code responding a () though.

Also in there are Clients and Servers which seem to be like a Consumer and a Producer but that can actually send meaningful messages with a request and receive something interesting with a respond instead of just ().

Okay, with these type synonyms in mind let’s go look at some code! Since an Effect can’t request or respond, it’s really equivalent to just some monadic action.

    runEffect :: Monad m => Effect m r -> m r
    runEffect = go
      where
        go p = case p of
            Request v _ -> closed v
            Respond v _ -> closed v
            M       m   -> m >>= go
            Pure    r   -> return r

This let’s us write runEffect which just uses the absurdity of producing a v :: X in order to turn a Proxy into an m.

runEffect is also the first function we’ve seen to actually escape the Proxy monad as well! It let’s us convert a self-contained pipeline into just an effect which should mean it comes up basically everywhere, just like runStateT.

Since the Proxy monad is abstract, we need some functions to actually be able to request things. Thus we have respond

    respond :: Monad m => a -> Proxy x' x a' a m a'
    respond a = Respond a Pure

This is actually pretty trivial, we have a constructor after all whose job it is to Respond to things so we just use that with the a we have as a response. Since we have no interesting continuation yet, but we need something of type a' -> Proxy x' x a' a m a' we just use Pure. This should be very familiar to users of free monads (remember that Pure = return)!

Next is something interesting, we’ve seen a lot of ways of manipulating a pipe, but never actually a way of combining two pipes so that they interact, our next function does that.

    (/>/)
        :: Monad m
        => (a -> Proxy x' x b' b m a')
        -> (b -> Proxy x' x c' c m b')
        -> (a -> Proxy x' x c' c m a')
    (fa />/ fb) a = fa a //> fb

Here we have two arguments, both functions to pipelines and we return a pipeline as output. Notice here that the first Proxy is something which is going to respond with things of type b and expect something of type b' in return and our second function is going to map bs to a Proxy which returns a b'. This means we can replace each Respond in the first with a call to the second function and pipe the output into our continuation for that Respond. Indeed this matches up with the return type so I anticipate that it what shall happen. However, this function shells out to another right below it so we’ll have to look at it to confirm.

    (//>)
        :: Monad m
        =>       Proxy x' x b' b m a'
        -> (b -> Proxy x' x c' c m b')
        ->       Proxy x' x c' c m a'
    p0 //> fb = go p0
      where
        go p = case p of
            Request x' fx  -> Request x' (\x -> go (fx x))
            Respond b  fb' -> fb b >>= \b' -> go (fb' b')
            M          m   -> M (m >>= \p' -> return (go p'))
            Pure       a   -> Pure a

The interesting line here is Respond b fb' -> ... which does exactly what I thought it ought to (I feel clever). In that line we run the function we have in the second argument with the data the first argument was Responding with. We sort of “intercept” a message intended for downstream and just handle it right there. Since we do this for all things Responding with bs we now only respond with cs hence the change in type. It doesn’t effect the upstream type, but we can now take something producing values and transform them to instead run some other computation (perhaps producing something else).

In a limited case we can do something like

    intercept :: Monad m
              => (b -> c)
              -> Proxy a' a b' b m r
              -> Proxy a' a b' c m r
    intercept f p = p //> respond . f

Cool! Now up next seems to be the dual of what we’ve just looked at.

    request :: Monad m => a' -> Proxy a' a y' y m a
    request a' = Request a' Pure

This is just what we had with respond but using Request instead. Similarly we ahve a counterpart for />/. It again shells out to a similar, pointful, function >\\

    (\>\)
        :: Monad m
        => (b' -> Proxy a' a y' y m b)
        -> (c' -> Proxy b' b y' y m c)
        -> (c' -> Proxy a' a y' y m c)
    (fb' \>\ fc') c' = fb' >\\ fc' c'

    (>\\)
        :: Monad m
        => (b' -> Proxy a' a y' y m b)
        ->        Proxy b' b y' y m c
        ->        Proxy a' a y' y m c
    fb' >\\ p0 = go p0
      where
        go p = case p of
            Request b' fb  -> fb' b' >>= \b -> go (fb b)
            Respond x  fx' -> Respond x (\x' -> go (fx' x'))
            M          m   -> M (m >>= \p' -> return (go p'))
            Pure       a   -> Pure a

I’d expect that this function does sort of what the other did before. It’ll take Requests and “answer” them inline by replacing it with a call to the other function. In fact, when you think about what the hell is the difference between a request and a response? They’re completely symmetric! They both transmit information sending one type in one direction and one type in another. So we should have exactly the same code that just happens to use Request instead of Respond. which is indeed what we have.

The only real difference here is in the argument order which hints at the fact that we’re going to break symmetry sooner or later, it just hasn’t happened yet.

Next up is

    push :: Monad m => a -> Proxy a' a a' a m r
    push = go
      where
        go a = Respond a (\a' -> Request a' go)

push takes a seed a and chucks it down the pipeline. Once it gets a response, it throws it up the pipeline with Request and when it gets a response (something of type a) it starts the whole process over again. Now the process starts by sending values down, there’s no reason why we can’t do the reverse and start by asking for a value

    pull :: Monad m => Proxy a' a a' a m r
    pull = go
      where
        go a' = Request a' (\a -> Respond a go)

which conveniently is right near by. Now push and pull each give rise to a form of composition which takes two Proxys and glues them together. The first is

    (>~>)
        :: Monad m
        => (_a -> Proxy a' a b' b m r)
        -> ( b -> Proxy b' b c' c m r)
        -> (_a -> Proxy a' a c' c m r)

This takes two Proxys which can communicate with each other and gives back a Proxy which has internalized this dialogue. This shells out to the pointful version, >>~

    (>>~)
        :: Monad m
        =>       Proxy a' a b' b m r
        -> (b -> Proxy b' b c' c m r)
        ->       Proxy a' a c' c m r
    p >>~ fb = case p of
        Request a' fa  -> Request a' (\a -> fa a >>~ fb)
        Respond b  fb' -> fb' +>> fb b
        M          m   -> M (m >>= \p' -> return (p' >>~ fb))
        Pure       r   -> Pure r

For this code we walk down the tree and recurse in all cases except where we have a Response. This should send some information to that function we got as an argument and then use that response to continue, so we want some way of taking a Proxy b' b c' c m r and a b' -> Proxy a' a b' b m r and giving back a Proxy a' a c' c m r. This looks like the exact dual to >>~ and indeed is the equivalent in the pull version.

    (+>>)
        :: Monad m
        => (b' -> Proxy a' a b' b m r)
        ->        Proxy b' b c' c m r
        ->        Proxy a' a c' c m r
    fb' +>> p = case p of
        Request b' fb  -> fb' b' >>~ fb
        Respond c  fc' -> Respond c (\c' -> fb' +>> fc' c')
        M          m   -> M (m >>= \p' -> return (fb' +>> p'))
        Pure       r   -> Pure r

This does the exact opposite of >>~. It walks around recursing until we get to a Request, this should transfer control up to that function b' -> Proxy ... and it does by calling >>~. So these two operators >>+ and >>~ work together to join up to Proxy functions by using one to answer the other’s Request and Responds. The symmetry breaking here is who should we inspect “first” so to speak. If we start with the upstream one than the second one is only run when a value is push-ed down to it and if we start with the former we only run the upstream version when we pull something from it. Nifty.

One thing to note, what happens when one of these Proxys give up and return? This potential situation is reflected in the fact that both of these Proxys must return an r. Therefore, whenever one of these returns and we’re currently running it (the upstream for >>~, downstream for >>+) we can just return the value and be done with the whole thing. In this sense composing a Proxy has this short circuiting property, at any point in the pipeline you can just give up and return something!

Remember before how I was ranting about how Request and Respond were really the same damn thing, it turns out I’m not the only one who thought that

    reflect :: Monad m => Proxy a' a b' b m r -> Proxy b b' a a' m r
    reflect = go
      where
        go p = case p of
            Request a' fa  -> Respond a' (\a  -> go (fa  a ))
            Respond b  fb' -> Request b  (\b' -> go (fb' b'))
            M          m   -> M (m >>= \p' -> return (go p'))
            Pure    r      -> Pure r

Looking at the type here is really telling, all we do to switch the upstream and downstream ends is swap the constructors Request and Respond! That actually wraps up the core of pipes, the rest is just a bunch of synonyms with the arguments flipped!

Now that we’ve finished up Pipes.Core it’s not clear where to go so I decided to go look at the top level Pipes module since between the .Internal and .Core modules we should have covered a lot of it. It turns out the top level only imports those two modules so we can now go through that!

Pipes

Really the top level package Pipes just re exports some stuff and defines some thin layers of the rest

    module Pipes (
          Proxy
        , X
        , Effect
        , Effect'
        , runEffect
        , Producer
        , Producer'
        , yield
        , for
        , (~>)
        , (<~)
        , Consumer
        , Consumer'
        , await
        , (>~)
        , (~<)
        , Pipe
        , cat
        , (>->)
        , (<-<)
        , ListT(..)
        , runListT
        , Enumerable(..)
        , next
        , each
        , every
        , discard
        , module Control.Monad
        , module Control.Monad.IO.Class
        , module Control.Monad.Trans.Class
        , module Control.Monad.Morph
        , module Data.Foldable
        )

Now what haven’t we seen, the first thing is this yield construct which turns out to be a snazzier name for respond with a nicer type.

    yield :: Monad m => a -> Producer' a m ()
    yield = respond

Similarly, for is just a synonym (//>) (first joiner we went through) and ~> is the point free version. On the other end we have stuff overlaying request and friends but they’re not quite symmetric

    await :: Monad m => Consumer' a m a
    await = request ()

    (>~)
        :: Monad m
        => Proxy a' a y' y m b
        -> Proxy () b y' y m c
        -> Proxy a' a y' y m c
    p1 >~ p2 = (\() -> p1) >\\ p2

So we need to cope with the fact request can actually transfer interesting data down as well as up, in the basic case though we just assume that we’re dealing with ()s. Also note that >~ is biased to the downstream Proxy, it starts by running it and whenever we actually request something (by sending up a ()) we run p1. This function lets us compose Proxys, not functions to Proxys so that’s one nice effect.

Finally, we see our first example of a pipe

    cat :: Monad m => Pipe a a m r
    cat = pull ()

cat works by requesting something upstream immediately and passing it downstream. Nothing interesting except that it combines great with other Proxys. Say for example we have a random number generator, we can easily create a Proxy producing random numbers with

    randoms = lift getRandomNumber >~ cat

we use >~ to replace each request in cat with a call to getRandomNumber which will be immediately pushed downstream. Similarly, we can use cat to push everything into some computation. If we want to debug a pipe by just printing everything we could say

    printAll = for cat (lift . print)

So cat is a nice way of lifting something to work across Proxys of values if nothing else.

Next is the common case of composing to Proxys,

    (>->)
        :: Monad m
        => Proxy a' a () b m r
        -> Proxy () b c' c m r
        -> Proxy a' a c' c m r
    p1 >-> p2 = (\() -> p1) +>> p2

>-> makes it easy to join up to Proxys that don’t send any interesting data “up” with requests. >-> starts by running p2 using +>> and whenever p2 requests something it goes and runs p1 for a while. This function lets us connect a Pipe to Pipe or Producer to Consumer for example.

Finally, we wrap up this module with the definition of ListT inside it. Using Producer we can define a nonbroken version of ListT

    newtype ListT m a = Select { enumerate :: Producer a m () }

    instance (Monad m) => Functor (ListT m) where
        fmap f p = Select (for (enumerate p) (\a -> yield (f a)))

    instance (Monad m) => Applicative (ListT m) where
        pure a = Select (yield a)
        mf <*> mx = Select (
            for (enumerate mf) (\f ->
            for (enumerate mx) (\x ->
            yield (f x) ) ) )

    instance (Monad m) => Monad (ListT m) where
        return a = Select (yield a)
        m >>= f  = Select (for (enumerate m) (\a -> enumerate (f a)))
        fail _   = mzero

What’s kinda nifty here is we just use a Producer returning a () to represent our list. Here we can use for to access every yield x which corresponds to our “list” having an entry x! From there this is really just the standard set of instances for a list! In particular >>= is mapConcat for producers. That about wraps up this module and I’ll end the blog post with it.

Wrap Up

I didn’t actually go through all of pipes here, just the “core operations” which everything else is built on top of. In particular, I urge you to go read how Pipes.Prelude is implemented. Just like implementing the Haskell prelude is a good exercise the same is true of pipes.

It turned out that pipes isn’t all that awful on the inside, it’s a library built around a specific free-monad like structure which a couple different methods of joining two computations together. In particular there were a few different notions of composition which really define pipes

  1. Sequential composition: running one machine and then another is given with >>=
  2. Take one computation and replace all its Responds with another function using //> or for in non-infix speak
  3. Take one computation and replace all its Requests with another function using >\\
  4. Take two computations and compose them together so a request from one is satisfied by running the other until a respond using >>+ and >>~

Hope you learned as much as I did, cheers.

comments powered by Disqus