# Examining Hackage: pipes

It’s been a while since I did one of these “read a package and write about it” posts. Part of this is that it turns out that most software is awful and writing about code I read just makes me grumpy. However I found something nice to write about! In this post I’d like to close a somewhat embarrassing gap in my knowledge: we’re going to walk through streaming library.

I know that both lists and lazy-IO are kind of.. let’s say fragile but have neglected learning one of these fancy libraries that aim to solve those problems. Today we’ll be looking at one of those libraries, pipes!

Pipes provides one core type `Proxy`

and a few operations on it, like `await`

and `yield`

. We can pair together a pipeline of operations which can send data to their neighbors and request more data from them as they need them. With these coroutine like structures we can nicely implement efficient, streaming computations.

## Getting The Code

As always this starts by getting our hands on the code with the

```
~ $ cabal get pipes
~ $ cd pipes-4.1.5/
```

Now from here we can query all the available files to see what we’re up against

```
~/pipes-4.1.5 $ wc -l **/*.hs | sort -nr
4796 total
1513 src/Pipes/Tutorial.hs
854 src/Pipes/Core.hs
836 src/Pipes/Prelude.hs
517 src/Pipes.hs
380 src/Pipes/Lift.hs
272 tests/Main.hs
269 src/Pipes/Internal.hs
85 benchmarks/PreludeBench.hs
68 benchmarks/LiftBench.hs
2 Setup.hs
```

So the first thing I notice is that there’s this great honking module called `Pipes.Tutorial`

which houses a brief introduction to the pipes package. I skimmed this before starting but it doesn’t really seem to explain the implementation details.. If you don’t really know what pipes is, read this tutorial now. After doing so you have exactly my knowledge of pipes!

The next interesting module here is `Pipes.Internal`

. I’ve found that `.Internal`

modules seem to house the fundamental bits of the package so we’ll start there.

## Pipes.Internal

This module starts with an emphatic warning

```
{-| This is an internal module, meaning that it is unsafe to
import unless you understand the risks. -}
```

So this seems like a perfect place to start without really understanding this library :D It exports a few different functions and one type:

```
module Pipes.Internal (
-- * Internal
Proxy(..)
, unsafeHoist
, observe
, X
, closed
) where
```

I recognize one of those types: `Proxy`

as the central type behind the whole pipes concept, it is the type of component in the pipe line. Let’s look at how it’s actually implemented

```
data Proxy a' a b' b m r
= Request a' (a -> Proxy a' a b' b m r )
| Respond b (b' -> Proxy a' a b' b m r )
| M (m (Proxy a' a b' b m r))
| Pure r
```

So two of those constructors, `M`

and `Pure`

, look pretty vanilla. The first one let’s us lift an action in the underlying monad `m`

, into `Proxy`

. It’s a little bit weird instead of having `M (m r)`

we instead have `M (m (Proxy ...))`

however this doesn’t seem like a big deal because we have `Pure`

to promote an `r`

to a `Proxy .... r`

. So we can lift some `m r`

to `Proxy a' a b' b m r`

with `M . fmap Pure`

. It’s still not clear to me why this is a benefit though.

The first two constructors are really cool though, `Request`

and `Respond`

. The first thing that pops into my head is that this looks like a sort of free-monad pattern. Look how we’ve got

- The piece of data a user should input to an action (and
`Request`

and`Respond`

are definitely actions) - This continuation for a second argument which takes a term of the type returned by the action to another piece of pipe

This would make a lot of sense, free monad transformers nicely give rise to coroutines which are very much in line with pipes. Because of this free monad like shape, I expect that the monad instance will be like free monads and behave “like substitution”. We should chase down the leaves of this `Proxy`

(including under lambdas) and replace each `Pure r`

with `f r`

for `>>=`

and `Pure (f a)`

for `fmap`

.

To check if we’re right, we go down one line

```
instance Monad m => Functor (Proxy a' a b' b m) where
fmap f p0 = go p0 where
go p = case p of
Request a' fa -> Request a' (\a -> go (fa a ))
Respond b fb' -> Respond b (\b' -> go (fb' b'))
M m -> M (m >>= \p' -> return (go p'))
Pure r -> Pure (f r)
```

This looks like what I had in mind, we run down `p`

and in the first 3 branches we recurse. I’ll admit it looks a little intimidating but after staring at it for a bit I realized that the first 3 lines are all just variations on `fmap go`

! Indeed, we can rewrite this to

```
go p = case p of
Request a' fa -> Request a' (fmap go fa)
Respond b fb' -> Respond b (fmap go fb')
M m -> M (fmap go m)
Pure r -> Pure (f r)
```

This makes the idea a bit clearer in my mind. Let’s look at the applicative instance next!

```
instance Monad m => Applicative (Proxy a' a b' b m) where
pure = Pure
pf <*> px = go pf where
go p = case p of
Request a' fa -> Request a' (\a -> go (fa a ))
Respond b fb' -> Respond b (\b' -> go (fb' b'))
M m -> M (m >>= \p' -> return (go p'))
Pure f -> fmap f px
(*>) = (>>)
```

First note that `pure = Pure`

which isn’t a stunner just from the naming. In `<*>`

we have the same sort of pattern as in `fmap`

. We race down the “function” side of `<*>`

and whenever we reach a `Pure`

we have a function from `a -> b`

, with that function we call `fmap`

on the structure on the “argument” side. So we’re kind of gluing that `px`

onto that `pf`

by changing each `Pure f`

to `fmap f px`

.

Finally we have the monad instance. Of course the `return`

implementation is the same as for `pure`

but `(>>=) = _bind`

so the implementation of `_bind`

has been chucked out of the instance itself. It turns out there’s a good reason for that: `_bind`

has a bunch of rewrite rules attached to it.

```
_bind
:: Monad m
=> Proxy a' a b' b m r
-> (r -> Proxy a' a b' b m r')
-> Proxy a' a b' b m r'
p0 `_bind` f = go p0 where
go p = case p of
Request a' fa -> Request a' (\a -> go (fa a ))
Respond b fb' -> Respond b (\b' -> go (fb' b'))
M m -> M (m >>= \p' -> return (go p'))
Pure r -> f r
```

Now excitingly the implementation of `bind`

is almost exactly what we had before! Now instead of `Pure f -> fmap f px`

it’s `Pure r -> f r`

so we have something more like substitution than gluing.

Now that `Proxy`

is a monad, we can make it a monad transformer!

```
instance MonadTrans (Proxy a' a b' b) where
lift m = M (m >>= \r -> return (Pure r))
```

So we need to take an `m a`

and return `Proxy a' a b' b m a`

, we want to use `M :: m (Proxy a' a b' b m a)`

but we have an `m a`

, by `fmap`

ing `Pure`

we’re good to go.

From here on out it’s just a series of not so exciting MTL instances so we’ll skip those.. There’s a couple interesting things left though! Before we get to them recall the monad transformer laws

`return = lift . return`

`m >>= f = lift m >>= (lift . f)`

In other words, `lift`

should “commute” with the two operations of the monad type class. This isn’t actually true by default with `Proxy`

, for example

```
return a = Pure a
lift (return a) = M (fmap Pure (return a)) = M (return (Pure a))
```

To solve this we have `observe`

. This function is supposed to normalize a `Proxy`

so that these laws hold.

```
observe :: Monad m => Proxy a' a b' b m r -> Proxy a' a b' b m r
observe p0 = M (go p0) where
go p = case p of
Request a' fa -> return (Request a' (\a -> observe (fa a )))
Respond b fb' -> return (Respond b (\b' -> observe (fb' b')))
M m' -> m' >>= go
Pure r -> return (Pure r)
```

Note that `go`

takes a `Proxy a' a b' b m r`

and returns `m (Proxy a' a b' b m r)`

. By doing this, we can route stick everything in `m`

with `return`

except for `M m'`

which we just unwrap and keep going. This means `return (Pure a) = go (Pure a)`

which is what is required for the monad transformer laws to hold.

Finally, the last thing in this file is `X`

which is used to represent the type for communication that cannot happen. So if we have a pipe at the beginning of the pipeline, it shouldn’t be able to ask for input from another pipe.

```
newtype X = X X
closed :: X -> a
closed (X x) = closed x
```

And there are no non-bottom expressions which occupy this type so we’re good. Now that we’ve seen the internal implementation of most of `Proxy`

we can go look at the infrastructure pipes provides around this. Again going by the names, now that we’ve covered the internals it makes sense to move onto `Pipes.Core`

.

## Pipes.Core

`Pipes.Core`

seems much closer to the actual user interface of the library, we can see that it exports a bunch of familiar sounding names:

```
module Pipes.Core (
Proxy
, runEffect
, respond
, (/>/)
, (//>)
, request
, (\>\)
, (>\\)
, push
, (>~>)
, (>>~)
, pull
, (>+>)
, (+>>)
, reflect
, X
, Effect
, Producer
, Pipe
, Consumer
, Client
, Server
, Effect'
, Producer'
, Consumer'
, Client'
, Server'
, (\<\)
, (/</)
, (<~<)
, (~<<)
, (<+<)
, (<\\)
, (//<)
, (<<+)
, closed
) where
```

Now a few of those we’ve seen before, namely `Proxy`

, `X`

, and `closed`

. Notice that `Proxy`

is exported abstractly here so we can’t write code which violates the monad transformer laws using this module.

The first new function is called `runEffect`

, but it has the type

` Monad m => Effect m r -> m r`

Which sounds great! I however have no clue what an effect is so let’s dig around the type exports first. There are a few type synonyms here

```
type Effect = Proxy X () () X
type Producer b = Proxy X () () b
type Pipe a b = Proxy () a () b
type Consumer a = Proxy () a () X
type Client a' a = Proxy a' a () X
type Server b' b = Proxy X () b' b
type Effect' m r = forall x' x y' y . Proxy x' x y' y m r
type Producer' b m r = forall x' x . Proxy x' x () b m r
type Consumer' a m r = forall y' y . Proxy () a y' y m r
type Server' b' b m r = forall x' x . Proxy x' x b' b m r
type Client' a' a m r = forall y' y . Proxy a' a y' y m r
```

Even though this looks like a lot, about half of these are actually duplicates which just use `-XRankNTypes`

instead of explicitly using `X`

. An `Effect`

as seen above is `Proxy X () () X`

.. I had to double check this but proxy takes 6 type arguments, here they are in order

`a'`

is the type of things that we can send up a`Request`

`a`

is the type of things which a request will return`b'`

is what we may be*sent*to respond to`b`

is what we may respond with`m`

is the underlying monad we may use for effects`r`

is the return value

So an `Effect`

can only request things if it can produce an `X`

, and it will get back a `()`

from its requests, and it can only respond with an `X`

and will get back a `()`

after responding. Since we can never produce an `X`

an `Effect`

can never request or respond.

Similarly, a `Producer`

can `respond`

to things with `b`

s, but it will only ever get back a `()`

after a response and it can never `request`

something. A `Consumer`

is the dual, never responding but can `request`

, it can only hand the code responding a `()`

though.

Also in there are `Client`

s and `Server`

s which seem to be like a `Consumer`

and a `Producer`

but that can actually send meaningful messages with a `request`

and receive something interesting with a `respond`

instead of just `()`

.

Okay, with these type synonyms in mind let’s go look at some code! Since an `Effect`

can’t request or respond, it’s really equivalent to just some monadic action.

```
runEffect :: Monad m => Effect m r -> m r
runEffect = go
where
go p = case p of
Request v _ -> closed v
Respond v _ -> closed v
M m -> m >>= go
Pure r -> return r
```

This let’s us write `runEffect`

which just uses the absurdity of producing a `v :: X`

in order to turn a `Proxy`

into an `m`

.

`runEffect`

is also the first function we’ve seen to actually escape the `Proxy`

monad as well! It let’s us convert a self-contained pipeline into just an effect which should mean it comes up basically everywhere, just like `runStateT`

.

Since the `Proxy`

monad is abstract, we need some functions to actually be able to request things. Thus we have `respond`

```
respond :: Monad m => a -> Proxy x' x a' a m a'
respond a = Respond a Pure
```

This is actually pretty trivial, we have a constructor after all whose job it is to `Respond`

to things so we just use that with the `a`

we have as a response. Since we have no interesting continuation yet, but we need something of type `a' -> Proxy x' x a' a m a'`

we just use `Pure`

. This should be very familiar to users of free monads (remember that `Pure`

= `return`

)!

Next is something interesting, we’ve seen a lot of ways of manipulating a pipe, but never actually a way of combining two pipes so that they interact, our next function does that.

```
(/>/)
:: Monad m
=> (a -> Proxy x' x b' b m a')
-> (b -> Proxy x' x c' c m b')
-> (a -> Proxy x' x c' c m a')
(fa />/ fb) a = fa a //> fb
```

Here we have two arguments, both functions to pipelines and we return a pipeline as output. Notice here that the first `Proxy`

is something which is going to `respond`

with things of type `b`

and expect something of type `b'`

in return and our second function is going to map `b`

s to a `Proxy`

which returns a `b'`

. This means we can replace each `Respond`

in the first with a call to the second function and pipe the output into our continuation for that `Respond`

. Indeed this matches up with the return type so I anticipate that it what shall happen. However, this function shells out to another right below it so we’ll have to look at it to confirm.

```
(//>)
:: Monad m
=> Proxy x' x b' b m a'
-> (b -> Proxy x' x c' c m b')
-> Proxy x' x c' c m a'
p0 //> fb = go p0
where
go p = case p of
Request x' fx -> Request x' (\x -> go (fx x))
Respond b fb' -> fb b >>= \b' -> go (fb' b')
M m -> M (m >>= \p' -> return (go p'))
Pure a -> Pure a
```

The interesting line here is `Respond b fb' -> ...`

which does exactly what I thought it ought to (I feel clever). In that line we run the function we have in the second argument with the data the first argument was `Respond`

ing with. We sort of “intercept” a message intended for downstream and just handle it right there. Since we do this for all things `Respond`

ing with `b`

s we now only respond with `c`

s hence the change in type. It doesn’t effect the upstream type, but we can now take something producing values and transform them to instead run some other computation (perhaps producing something else).

In a limited case we can do something like

```
intercept :: Monad m
=> (b -> c)
-> Proxy a' a b' b m r
-> Proxy a' a b' c m r
intercept f p = p //> respond . f
```

Cool! Now up next seems to be the dual of what we’ve just looked at.

```
request :: Monad m => a' -> Proxy a' a y' y m a
request a' = Request a' Pure
```

This is just what we had with `respond`

but using `Request`

instead. Similarly we ahve a counterpart for `/>/`

. It again shells out to a similar, pointful, function `>\\`

```
(\>\)
:: Monad m
=> (b' -> Proxy a' a y' y m b)
-> (c' -> Proxy b' b y' y m c)
-> (c' -> Proxy a' a y' y m c)
(fb' \>\ fc') c' = fb' >\\ fc' c'
(>\\)
:: Monad m
=> (b' -> Proxy a' a y' y m b)
-> Proxy b' b y' y m c
-> Proxy a' a y' y m c
fb' >\\ p0 = go p0
where
go p = case p of
Request b' fb -> fb' b' >>= \b -> go (fb b)
Respond x fx' -> Respond x (\x' -> go (fx' x'))
M m -> M (m >>= \p' -> return (go p'))
Pure a -> Pure a
```

I’d expect that this function does sort of what the other did before. It’ll take `Request`

s and “answer” them inline by replacing it with a call to the other function. In fact, when you think about what the hell is the difference between a request and a response? They’re completely symmetric! They both transmit information sending one type in one direction and one type in another. So we should have exactly the same code that just happens to use `Request`

instead of `Respond`

. which is indeed what we have.

The only real difference here is in the argument order which hints at the fact that we’re going to break symmetry sooner or later, it just hasn’t happened yet.

Next up is

```
push :: Monad m => a -> Proxy a' a a' a m r
push = go
where
go a = Respond a (\a' -> Request a' go)
```

`push`

takes a seed `a`

and chucks it down the pipeline. Once it gets a response, it throws it up the pipeline with `Request`

and when it gets a response (something of type `a`

) it starts the whole process over again. Now the process starts by sending values down, there’s no reason why we can’t do the reverse and start by asking for a value

```
pull :: Monad m => Proxy a' a a' a m r
pull = go
where
go a' = Request a' (\a -> Respond a go)
```

which conveniently is right near by. Now `push`

and `pull`

each give rise to a form of composition which takes two `Proxy`

s and glues them together. The first is

```
(>~>)
:: Monad m
=> (_a -> Proxy a' a b' b m r)
-> ( b -> Proxy b' b c' c m r)
-> (_a -> Proxy a' a c' c m r)
```

This takes two `Proxy`

s which can communicate with each other and gives back a `Proxy`

which has internalized this dialogue. This shells out to the pointful version, `>>~`

```
(>>~)
:: Monad m
=> Proxy a' a b' b m r
-> (b -> Proxy b' b c' c m r)
-> Proxy a' a c' c m r
p >>~ fb = case p of
Request a' fa -> Request a' (\a -> fa a >>~ fb)
Respond b fb' -> fb' +>> fb b
M m -> M (m >>= \p' -> return (p' >>~ fb))
Pure r -> Pure r
```

For this code we walk down the tree and recurse in all cases except where we have a `Response`

. This should send some information to that function we got as an argument and then use that response to continue, so we want some way of taking a `Proxy b' b c' c m r`

and a `b' -> Proxy a' a b' b m r`

and giving back a `Proxy a' a c' c m r`

. This looks like the exact dual to `>>~`

and indeed is the equivalent in the `pull`

version.

```
(+>>)
:: Monad m
=> (b' -> Proxy a' a b' b m r)
-> Proxy b' b c' c m r
-> Proxy a' a c' c m r
fb' +>> p = case p of
Request b' fb -> fb' b' >>~ fb
Respond c fc' -> Respond c (\c' -> fb' +>> fc' c')
M m -> M (m >>= \p' -> return (fb' +>> p'))
Pure r -> Pure r
```

This does the exact opposite of `>>~`

. It walks around recursing until we get to a `Request`

, this should transfer control up to that function `b' -> Proxy ...`

and it does by calling `>>~`

. So these two operators `>>+`

and `>>~`

work together to join up to `Proxy`

functions by using one to answer the other’s `Request`

and `Respond`

s. The symmetry breaking here is who should we inspect “first” so to speak. If we start with the upstream one than the second one is only run when a value is `push`

-ed down to it and if we start with the former we only run the upstream version when we `pull`

something from it. Nifty.

One thing to note, what happens when one of these `Proxy`

s give up and `return`

? This potential situation is reflected in the fact that both of these `Proxy`

s must return an `r`

. Therefore, whenever one of these returns and we’re currently running it (the upstream for `>>~`

, downstream for `>>+`

) we can just return the value and be done with the whole thing. In this sense composing a `Proxy`

has this short circuiting property, at any point in the pipeline you can just give up and `return`

something!

Remember before how I was ranting about how `Request`

and `Respond`

were really the same damn thing, it turns out I’m not the only one who thought that

```
reflect :: Monad m => Proxy a' a b' b m r -> Proxy b b' a a' m r
reflect = go
where
go p = case p of
Request a' fa -> Respond a' (\a -> go (fa a ))
Respond b fb' -> Request b (\b' -> go (fb' b'))
M m -> M (m >>= \p' -> return (go p'))
Pure r -> Pure r
```

Looking at the type here is really telling, all we do to switch the upstream and downstream ends is swap the constructors `Request`

and `Respond`

! That actually wraps up the core of pipes, the rest is just a bunch of synonyms with the arguments flipped!

Now that we’ve finished up `Pipes.Core`

it’s not clear where to go so I decided to go look at the top level `Pipes`

module since between the `.Internal`

and `.Core`

modules we should have covered a lot of it. It turns out the top level *only* imports those two modules so we can now go through that!

## Pipes

Really the top level package `Pipes`

just re exports some stuff and defines some thin layers of the rest

```
module Pipes (
Proxy
, X
, Effect
, Effect'
, runEffect
, Producer
, Producer'
, yield
, for
, (~>)
, (<~)
, Consumer
, Consumer'
, await
, (>~)
, (~<)
, Pipe
, cat
, (>->)
, (<-<)
, ListT(..)
, runListT
, Enumerable(..)
, next
, each
, every
, discard
, module Control.Monad
, module Control.Monad.IO.Class
, module Control.Monad.Trans.Class
, module Control.Monad.Morph
, module Data.Foldable
)
```

Now what haven’t we seen, the first thing is this `yield`

construct which turns out to be a snazzier name for `respond`

with a nicer type.

```
yield :: Monad m => a -> Producer' a m ()
yield = respond
```

Similarly, `for`

is just a synonym `(//>)`

(first joiner we went through) and `~>`

is the point free version. On the other end we have stuff overlaying `request`

and friends but they’re not quite symmetric

```
await :: Monad m => Consumer' a m a
await = request ()
(>~)
:: Monad m
=> Proxy a' a y' y m b
-> Proxy () b y' y m c
-> Proxy a' a y' y m c
p1 >~ p2 = (\() -> p1) >\\ p2
```

So we need to cope with the fact `request`

can actually transfer interesting data down as well as up, in the basic case though we just assume that we’re dealing with `()`

s. Also note that `>~`

is biased to the downstream `Proxy`

, it starts by running it and whenever we actually request something (by sending up a `()`

) we run `p1`

. This function lets us compose `Proxy`

s, not functions to `Proxy`

s so that’s one nice effect.

Finally, we see our first example of a pipe

```
cat :: Monad m => Pipe a a m r
cat = pull ()
```

`cat`

works by requesting something upstream immediately and passing it downstream. Nothing interesting except that it combines great with other `Proxy`

s. Say for example we have a random number generator, we can easily create a `Proxy`

producing random numbers with

` randoms = lift getRandomNumber >~ cat`

we use `>~`

to replace each `request`

in `cat`

with a call to `getRandomNumber`

which will be immediately pushed downstream. Similarly, we can use `cat`

to push everything into some computation. If we want to debug a pipe by just printing everything we could say

` printAll = for cat (lift . print)`

So `cat`

is a nice way of lifting something to work across `Proxy`

s of values if nothing else.

Next is the common case of composing to `Proxy`

s,

```
(>->)
:: Monad m
=> Proxy a' a () b m r
-> Proxy () b c' c m r
-> Proxy a' a c' c m r
p1 >-> p2 = (\() -> p1) +>> p2
```

`>->`

makes it easy to join up to `Proxy`

s that don’t send any interesting data “up” with requests. `>->`

starts by running `p2`

using `+>>`

and whenever `p2`

requests something it goes and runs `p1`

for a while. This function lets us connect a `Pipe`

to `Pipe`

or `Producer`

to `Consumer`

for example.

Finally, we wrap up this module with the definition of `ListT`

inside it. Using `Producer`

we can define a nonbroken version of `ListT`

```
newtype ListT m a = Select { enumerate :: Producer a m () }
instance (Monad m) => Functor (ListT m) where
fmap f p = Select (for (enumerate p) (\a -> yield (f a)))
instance (Monad m) => Applicative (ListT m) where
pure a = Select (yield a)
mf <*> mx = Select (
for (enumerate mf) (\f ->
for (enumerate mx) (\x ->
yield (f x) ) ) )
instance (Monad m) => Monad (ListT m) where
return a = Select (yield a)
m >>= f = Select (for (enumerate m) (\a -> enumerate (f a)))
fail _ = mzero
```

What’s kinda nifty here is we just use a `Producer`

returning a `()`

to represent our list. Here we can use `for`

to access every `yield x`

which corresponds to our “list” having an entry `x`

! From there this is really just the standard set of instances for a list! In particular `>>=`

is `mapConcat`

for producers. That about wraps up this module and I’ll end the blog post with it.

## Wrap Up

I didn’t actually go through all of `pipes`

here, just the “core operations” which everything else is built on top of. In particular, I urge you to go read how `Pipes.Prelude`

is implemented. Just like implementing the Haskell prelude is a good exercise the same is true of pipes.

It turned out that `pipes`

isn’t all that awful on the inside, it’s a library built around a specific free-monad like structure which a couple different methods of joining two computations together. In particular there were a few different notions of composition which really define pipes

- Sequential composition: running one machine and then another is given with
`>>=`

- Take one computation and replace all its
`Respond`

s with another function using`//>`

or`for`

in non-infix speak - Take one computation and replace all its
`Request`

s with another function using`>\\`

- Take two computations and compose them together so a request from one is satisfied by running the other until a respond using
`>>+`

and`>>~`

Hope you learned as much as I did, cheers.

comments powered by Disqus