Examining Hackage: pipes
It’s been a while since I did one of these “read a package and write about it” posts. Part of this is that it turns out that most software is awful and writing about code I read just makes me grumpy. However I found something nice to write about! In this post I’d like to close a somewhat embarrassing gap in my knowledge: we’re going to walk through streaming library.
I know that both lists and lazy-IO are kind of.. let’s say fragile but have neglected learning one of these fancy libraries that aim to solve those problems. Today we’ll be looking at one of those libraries, pipes!
Pipes provides one core type Proxy
and a few operations on it, like await
and yield
. We can pair together a pipeline of operations which can send data to their neighbors and request more data from them as they need them. With these coroutine like structures we can nicely implement efficient, streaming computations.
Getting The Code
As always this starts by getting our hands on the code with the
~ $ cabal get pipes
~ $ cd pipes-4.1.5/
Now from here we can query all the available files to see what we’re up against
~/pipes-4.1.5 $ wc -l **/*.hs | sort -nr
4796 total
1513 src/Pipes/Tutorial.hs
854 src/Pipes/Core.hs
836 src/Pipes/Prelude.hs
517 src/Pipes.hs
380 src/Pipes/Lift.hs
272 tests/Main.hs
269 src/Pipes/Internal.hs
85 benchmarks/PreludeBench.hs
68 benchmarks/LiftBench.hs
2 Setup.hs
So the first thing I notice is that there’s this great honking module called Pipes.Tutorial
which houses a brief introduction to the pipes package. I skimmed this before starting but it doesn’t really seem to explain the implementation details.. If you don’t really know what pipes is, read this tutorial now. After doing so you have exactly my knowledge of pipes!
The next interesting module here is Pipes.Internal
. I’ve found that .Internal
modules seem to house the fundamental bits of the package so we’ll start there.
Pipes.Internal
This module starts with an emphatic warning
{-| This is an internal module, meaning that it is unsafe to
import unless you understand the risks. -}
So this seems like a perfect place to start without really understanding this library :D It exports a few different functions and one type:
I recognize one of those types: Proxy
as the central type behind the whole pipes concept, it is the type of component in the pipe line. Let’s look at how it’s actually implemented
data Proxy a' a b' b m r
= Request a' (a -> Proxy a' a b' b m r )
| Respond b (b' -> Proxy a' a b' b m r )
| M (m (Proxy a' a b' b m r))
| Pure r
So two of those constructors, M
and Pure
, look pretty vanilla. The first one let’s us lift an action in the underlying monad m
, into Proxy
. It’s a little bit weird instead of having M (m r)
we instead have M (m (Proxy ...))
however this doesn’t seem like a big deal because we have Pure
to promote an r
to a Proxy .... r
. So we can lift some m r
to Proxy a' a b' b m r
with M . fmap Pure
. It’s still not clear to me why this is a benefit though.
The first two constructors are really cool though, Request
and Respond
. The first thing that pops into my head is that this looks like a sort of free-monad pattern. Look how we’ve got
- The piece of data a user should input to an action (and
Request
andRespond
are definitely actions) - This continuation for a second argument which takes a term of the type returned by the action to another piece of pipe
This would make a lot of sense, free monad transformers nicely give rise to coroutines which are very much in line with pipes. Because of this free monad like shape, I expect that the monad instance will be like free monads and behave “like substitution”. We should chase down the leaves of this Proxy
(including under lambdas) and replace each Pure r
with f r
for >>=
and Pure (f a)
for fmap
.
To check if we’re right, we go down one line
instance Monad m => Functor (Proxy a' a b' b m) where
fmap f p0 = go p0 where
go p = case p of
Request a' fa -> Request a' (\a -> go (fa a ))
Respond b fb' -> Respond b (\b' -> go (fb' b'))
M m -> M (m >>= \p' -> return (go p'))
Pure r -> Pure (f r)
This looks like what I had in mind, we run down p
and in the first 3 branches we recurse. I’ll admit it looks a little intimidating but after staring at it for a bit I realized that the first 3 lines are all just variations on fmap go
! Indeed, we can rewrite this to
go p = case p of
Request a' fa -> Request a' (fmap go fa)
Respond b fb' -> Respond b (fmap go fb')
M m -> M (fmap go m)
Pure r -> Pure (f r)
This makes the idea a bit clearer in my mind. Let’s look at the applicative instance next!
instance Monad m => Applicative (Proxy a' a b' b m) where
pure = Pure
pf <*> px = go pf where
go p = case p of
Request a' fa -> Request a' (\a -> go (fa a ))
Respond b fb' -> Respond b (\b' -> go (fb' b'))
M m -> M (m >>= \p' -> return (go p'))
Pure f -> fmap f px
(*>) = (>>)
First note that pure = Pure
which isn’t a stunner just from the naming. In <*>
we have the same sort of pattern as in fmap
. We race down the “function” side of <*>
and whenever we reach a Pure
we have a function from a -> b
, with that function we call fmap
on the structure on the “argument” side. So we’re kind of gluing that px
onto that pf
by changing each Pure f
to fmap f px
.
Finally we have the monad instance. Of course the return
implementation is the same as for pure
but (>>=) = _bind
so the implementation of _bind
has been chucked out of the instance itself. It turns out there’s a good reason for that: _bind
has a bunch of rewrite rules attached to it.
_bind
:: Monad m
=> Proxy a' a b' b m r
-> (r -> Proxy a' a b' b m r')
-> Proxy a' a b' b m r'
p0 `_bind` f = go p0 where
go p = case p of
Request a' fa -> Request a' (\a -> go (fa a ))
Respond b fb' -> Respond b (\b' -> go (fb' b'))
M m -> M (m >>= \p' -> return (go p'))
Pure r -> f r
Now excitingly the implementation of bind
is almost exactly what we had before! Now instead of Pure f -> fmap f px
it’s Pure r -> f r
so we have something more like substitution than gluing.
Now that Proxy
is a monad, we can make it a monad transformer!
So we need to take an m a
and return Proxy a' a b' b m a
, we want to use M :: m (Proxy a' a b' b m a)
but we have an m a
, by fmap
ing Pure
we’re good to go.
From here on out it’s just a series of not so exciting MTL instances so we’ll skip those.. There’s a couple interesting things left though! Before we get to them recall the monad transformer laws
return = lift . return
m >>= f = lift m >>= (lift . f)
In other words, lift
should “commute” with the two operations of the monad type class. This isn’t actually true by default with Proxy
, for example
return a = Pure a
lift (return a) = M (fmap Pure (return a)) = M (return (Pure a))
To solve this we have observe
. This function is supposed to normalize a Proxy
so that these laws hold.
observe :: Monad m => Proxy a' a b' b m r -> Proxy a' a b' b m r
observe p0 = M (go p0) where
go p = case p of
Request a' fa -> return (Request a' (\a -> observe (fa a )))
Respond b fb' -> return (Respond b (\b' -> observe (fb' b')))
M m' -> m' >>= go
Pure r -> return (Pure r)
Note that go
takes a Proxy a' a b' b m r
and returns m (Proxy a' a b' b m r)
. By doing this, we can route stick everything in m
with return
except for M m'
which we just unwrap and keep going. This means return (Pure a) = go (Pure a)
which is what is required for the monad transformer laws to hold.
Finally, the last thing in this file is X
which is used to represent the type for communication that cannot happen. So if we have a pipe at the beginning of the pipeline, it shouldn’t be able to ask for input from another pipe.
And there are no non-bottom expressions which occupy this type so we’re good. Now that we’ve seen the internal implementation of most of Proxy
we can go look at the infrastructure pipes provides around this. Again going by the names, now that we’ve covered the internals it makes sense to move onto Pipes.Core
.
Pipes.Core
Pipes.Core
seems much closer to the actual user interface of the library, we can see that it exports a bunch of familiar sounding names:
module Pipes.Core (
Proxy
, runEffect
, respond
, (/>/)
, (//>)
, request
, (\>\)
, (>\\)
, push
, (>~>)
, (>>~)
, pull
, (>+>)
, (+>>)
, reflect
, X
, Effect
, Producer
, Pipe
, Consumer
, Client
, Server
, Effect'
, Producer'
, Consumer'
, Client'
, Server'
, (\<\)
, (/</)
, (<~<)
, (~<<)
, (<+<)
, (<\\)
, (//<)
, (<<+)
, closed
) where
Now a few of those we’ve seen before, namely Proxy
, X
, and closed
. Notice that Proxy
is exported abstractly here so we can’t write code which violates the monad transformer laws using this module.
The first new function is called runEffect
, but it has the type
Which sounds great! I however have no clue what an effect is so let’s dig around the type exports first. There are a few type synonyms here
type Effect = Proxy X () () X
type Producer b = Proxy X () () b
type Pipe a b = Proxy () a () b
type Consumer a = Proxy () a () X
type Client a' a = Proxy a' a () X
type Server b' b = Proxy X () b' b
type Effect' m r = forall x' x y' y . Proxy x' x y' y m r
type Producer' b m r = forall x' x . Proxy x' x () b m r
type Consumer' a m r = forall y' y . Proxy () a y' y m r
type Server' b' b m r = forall x' x . Proxy x' x b' b m r
type Client' a' a m r = forall y' y . Proxy a' a y' y m r
Even though this looks like a lot, about half of these are actually duplicates which just use -XRankNTypes
instead of explicitly using X
. An Effect
as seen above is Proxy X () () X
.. I had to double check this but proxy takes 6 type arguments, here they are in order
a'
is the type of things that we can send up aRequest
a
is the type of things which a request will returnb'
is what we may be sent to respond tob
is what we may respond withm
is the underlying monad we may use for effectsr
is the return value
So an Effect
can only request things if it can produce an X
, and it will get back a ()
from its requests, and it can only respond with an X
and will get back a ()
after responding. Since we can never produce an X
an Effect
can never request or respond.
Similarly, a Producer
can respond
to things with b
s, but it will only ever get back a ()
after a response and it can never request
something. A Consumer
is the dual, never responding but can request
, it can only hand the code responding a ()
though.
Also in there are Client
s and Server
s which seem to be like a Consumer
and a Producer
but that can actually send meaningful messages with a request
and receive something interesting with a respond
instead of just ()
.
Okay, with these type synonyms in mind let’s go look at some code! Since an Effect
can’t request or respond, it’s really equivalent to just some monadic action.
runEffect :: Monad m => Effect m r -> m r
runEffect = go
where
go p = case p of
Request v _ -> closed v
Respond v _ -> closed v
M m -> m >>= go
Pure r -> return r
This let’s us write runEffect
which just uses the absurdity of producing a v :: X
in order to turn a Proxy
into an m
.
runEffect
is also the first function we’ve seen to actually escape the Proxy
monad as well! It let’s us convert a self-contained pipeline into just an effect which should mean it comes up basically everywhere, just like runStateT
.
Since the Proxy
monad is abstract, we need some functions to actually be able to request things. Thus we have respond
This is actually pretty trivial, we have a constructor after all whose job it is to Respond
to things so we just use that with the a
we have as a response. Since we have no interesting continuation yet, but we need something of type a' -> Proxy x' x a' a m a'
we just use Pure
. This should be very familiar to users of free monads (remember that Pure
= return
)!
Next is something interesting, we’ve seen a lot of ways of manipulating a pipe, but never actually a way of combining two pipes so that they interact, our next function does that.
(/>/)
:: Monad m
=> (a -> Proxy x' x b' b m a')
-> (b -> Proxy x' x c' c m b')
-> (a -> Proxy x' x c' c m a')
(fa />/ fb) a = fa a //> fb
Here we have two arguments, both functions to pipelines and we return a pipeline as output. Notice here that the first Proxy
is something which is going to respond
with things of type b
and expect something of type b'
in return and our second function is going to map b
s to a Proxy
which returns a b'
. This means we can replace each Respond
in the first with a call to the second function and pipe the output into our continuation for that Respond
. Indeed this matches up with the return type so I anticipate that it what shall happen. However, this function shells out to another right below it so we’ll have to look at it to confirm.
(//>)
:: Monad m
=> Proxy x' x b' b m a'
-> (b -> Proxy x' x c' c m b')
-> Proxy x' x c' c m a'
p0 //> fb = go p0
where
go p = case p of
Request x' fx -> Request x' (\x -> go (fx x))
Respond b fb' -> fb b >>= \b' -> go (fb' b')
M m -> M (m >>= \p' -> return (go p'))
Pure a -> Pure a
The interesting line here is Respond b fb' -> ...
which does exactly what I thought it ought to (I feel clever). In that line we run the function we have in the second argument with the data the first argument was Respond
ing with. We sort of “intercept” a message intended for downstream and just handle it right there. Since we do this for all things Respond
ing with b
s we now only respond with c
s hence the change in type. It doesn’t effect the upstream type, but we can now take something producing values and transform them to instead run some other computation (perhaps producing something else).
In a limited case we can do something like
intercept :: Monad m
=> (b -> c)
-> Proxy a' a b' b m r
-> Proxy a' a b' c m r
intercept f p = p //> respond . f
Cool! Now up next seems to be the dual of what we’ve just looked at.
This is just what we had with respond
but using Request
instead. Similarly we ahve a counterpart for />/
. It again shells out to a similar, pointful, function >\\
(\>\)
:: Monad m
=> (b' -> Proxy a' a y' y m b)
-> (c' -> Proxy b' b y' y m c)
-> (c' -> Proxy a' a y' y m c)
(fb' \>\ fc') c' = fb' >\\ fc' c'
(>\\)
:: Monad m
=> (b' -> Proxy a' a y' y m b)
-> Proxy b' b y' y m c
-> Proxy a' a y' y m c
fb' >\\ p0 = go p0
where
go p = case p of
Request b' fb -> fb' b' >>= \b -> go (fb b)
Respond x fx' -> Respond x (\x' -> go (fx' x'))
M m -> M (m >>= \p' -> return (go p'))
Pure a -> Pure a
I’d expect that this function does sort of what the other did before. It’ll take Request
s and “answer” them inline by replacing it with a call to the other function. In fact, when you think about what the hell is the difference between a request and a response? They’re completely symmetric! They both transmit information sending one type in one direction and one type in another. So we should have exactly the same code that just happens to use Request
instead of Respond
. which is indeed what we have.
The only real difference here is in the argument order which hints at the fact that we’re going to break symmetry sooner or later, it just hasn’t happened yet.
Next up is
push
takes a seed a
and chucks it down the pipeline. Once it gets a response, it throws it up the pipeline with Request
and when it gets a response (something of type a
) it starts the whole process over again. Now the process starts by sending values down, there’s no reason why we can’t do the reverse and start by asking for a value
which conveniently is right near by. Now push
and pull
each give rise to a form of composition which takes two Proxy
s and glues them together. The first is
(>~>)
:: Monad m
=> (_a -> Proxy a' a b' b m r)
-> ( b -> Proxy b' b c' c m r)
-> (_a -> Proxy a' a c' c m r)
This takes two Proxy
s which can communicate with each other and gives back a Proxy
which has internalized this dialogue. This shells out to the pointful version, >>~
(>>~)
:: Monad m
=> Proxy a' a b' b m r
-> (b -> Proxy b' b c' c m r)
-> Proxy a' a c' c m r
p >>~ fb = case p of
Request a' fa -> Request a' (\a -> fa a >>~ fb)
Respond b fb' -> fb' +>> fb b
M m -> M (m >>= \p' -> return (p' >>~ fb))
Pure r -> Pure r
For this code we walk down the tree and recurse in all cases except where we have a Response
. This should send some information to that function we got as an argument and then use that response to continue, so we want some way of taking a Proxy b' b c' c m r
and a b' -> Proxy a' a b' b m r
and giving back a Proxy a' a c' c m r
. This looks like the exact dual to >>~
and indeed is the equivalent in the pull
version.
(+>>)
:: Monad m
=> (b' -> Proxy a' a b' b m r)
-> Proxy b' b c' c m r
-> Proxy a' a c' c m r
fb' +>> p = case p of
Request b' fb -> fb' b' >>~ fb
Respond c fc' -> Respond c (\c' -> fb' +>> fc' c')
M m -> M (m >>= \p' -> return (fb' +>> p'))
Pure r -> Pure r
This does the exact opposite of >>~
. It walks around recursing until we get to a Request
, this should transfer control up to that function b' -> Proxy ...
and it does by calling >>~
. So these two operators >>+
and >>~
work together to join up to Proxy
functions by using one to answer the other’s Request
and Respond
s. The symmetry breaking here is who should we inspect “first” so to speak. If we start with the upstream one than the second one is only run when a value is push
-ed down to it and if we start with the former we only run the upstream version when we pull
something from it. Nifty.
One thing to note, what happens when one of these Proxy
s give up and return
? This potential situation is reflected in the fact that both of these Proxy
s must return an r
. Therefore, whenever one of these returns and we’re currently running it (the upstream for >>~
, downstream for >>+
) we can just return the value and be done with the whole thing. In this sense composing a Proxy
has this short circuiting property, at any point in the pipeline you can just give up and return
something!
Remember before how I was ranting about how Request
and Respond
were really the same damn thing, it turns out I’m not the only one who thought that
reflect :: Monad m => Proxy a' a b' b m r -> Proxy b b' a a' m r
reflect = go
where
go p = case p of
Request a' fa -> Respond a' (\a -> go (fa a ))
Respond b fb' -> Request b (\b' -> go (fb' b'))
M m -> M (m >>= \p' -> return (go p'))
Pure r -> Pure r
Looking at the type here is really telling, all we do to switch the upstream and downstream ends is swap the constructors Request
and Respond
! That actually wraps up the core of pipes, the rest is just a bunch of synonyms with the arguments flipped!
Now that we’ve finished up Pipes.Core
it’s not clear where to go so I decided to go look at the top level Pipes
module since between the .Internal
and .Core
modules we should have covered a lot of it. It turns out the top level only imports those two modules so we can now go through that!
Pipes
Really the top level package Pipes
just re exports some stuff and defines some thin layers of the rest
module Pipes (
Proxy
, X
, Effect
, Effect'
, runEffect
, Producer
, Producer'
, yield
, for
, (~>)
, (<~)
, Consumer
, Consumer'
, await
, (>~)
, (~<)
, Pipe
, cat
, (>->)
, (<-<)
, ListT(..)
, runListT
, Enumerable(..)
, next
, each
, every
, discard
, module Control.Monad
, module Control.Monad.IO.Class
, module Control.Monad.Trans.Class
, module Control.Monad.Morph
, module Data.Foldable
)
Now what haven’t we seen, the first thing is this yield
construct which turns out to be a snazzier name for respond
with a nicer type.
Similarly, for
is just a synonym (//>)
(first joiner we went through) and ~>
is the point free version. On the other end we have stuff overlaying request
and friends but they’re not quite symmetric
await :: Monad m => Consumer' a m a
await = request ()
(>~)
:: Monad m
=> Proxy a' a y' y m b
-> Proxy () b y' y m c
-> Proxy a' a y' y m c
p1 >~ p2 = (\() -> p1) >\\ p2
So we need to cope with the fact request
can actually transfer interesting data down as well as up, in the basic case though we just assume that we’re dealing with ()
s. Also note that >~
is biased to the downstream Proxy
, it starts by running it and whenever we actually request something (by sending up a ()
) we run p1
. This function lets us compose Proxy
s, not functions to Proxy
s so that’s one nice effect.
Finally, we see our first example of a pipe
cat
works by requesting something upstream immediately and passing it downstream. Nothing interesting except that it combines great with other Proxy
s. Say for example we have a random number generator, we can easily create a Proxy
producing random numbers with
we use >~
to replace each request
in cat
with a call to getRandomNumber
which will be immediately pushed downstream. Similarly, we can use cat
to push everything into some computation. If we want to debug a pipe by just printing everything we could say
So cat
is a nice way of lifting something to work across Proxy
s of values if nothing else.
Next is the common case of composing to Proxy
s,
(>->)
:: Monad m
=> Proxy a' a () b m r
-> Proxy () b c' c m r
-> Proxy a' a c' c m r
p1 >-> p2 = (\() -> p1) +>> p2
>->
makes it easy to join up to Proxy
s that don’t send any interesting data “up” with requests. >->
starts by running p2
using +>>
and whenever p2
requests something it goes and runs p1
for a while. This function lets us connect a Pipe
to Pipe
or Producer
to Consumer
for example.
Finally, we wrap up this module with the definition of ListT
inside it. Using Producer
we can define a nonbroken version of ListT
newtype ListT m a = Select { enumerate :: Producer a m () }
instance (Monad m) => Functor (ListT m) where
fmap f p = Select (for (enumerate p) (\a -> yield (f a)))
instance (Monad m) => Applicative (ListT m) where
pure a = Select (yield a)
mf <*> mx = Select (
for (enumerate mf) (\f ->
for (enumerate mx) (\x ->
yield (f x) ) ) )
instance (Monad m) => Monad (ListT m) where
return a = Select (yield a)
m >>= f = Select (for (enumerate m) (\a -> enumerate (f a)))
fail _ = mzero
What’s kinda nifty here is we just use a Producer
returning a ()
to represent our list. Here we can use for
to access every yield x
which corresponds to our “list” having an entry x
! From there this is really just the standard set of instances for a list! In particular >>=
is mapConcat
for producers. That about wraps up this module and I’ll end the blog post with it.
Wrap Up
I didn’t actually go through all of pipes
here, just the “core operations” which everything else is built on top of. In particular, I urge you to go read how Pipes.Prelude
is implemented. Just like implementing the Haskell prelude is a good exercise the same is true of pipes.
It turned out that pipes
isn’t all that awful on the inside, it’s a library built around a specific free-monad like structure which a couple different methods of joining two computations together. In particular there were a few different notions of composition which really define pipes
- Sequential composition: running one machine and then another is given with
>>=
- Take one computation and replace all its
Respond
s with another function using//>
orfor
in non-infix speak - Take one computation and replace all its
Request
s with another function using>\\
- Take two computations and compose them together so a request from one is satisfied by running the other until a respond using
>>+
and>>~
Hope you learned as much as I did, cheers.
comments powered by Disqus