1
votes

Data.Vector.Stream provides an nice Stream implementation that is very efficient thanks to the focus on fusability (see this paper for more). Since vector-0.1 this Stream implementation changed slightly by moving the Step type into a monad. (Now, the implementation is located in Data.Vector.Fusion.Stream.Monadic.)

In a nutshell, here's the definition of Stream:

data Step s a where
    Yield :: a -> s -> Step s a
    Skip  :: s -> Step s a
    Done  :: Step s a

data Stream a = forall s. Stream (s -> Step s a) s

Step s a encapsulates the three possible results from one iteration of state s with update function s -> Step s a. The stream is either Done, or skips output, or yields output. (The definition above uses GADTs, but that's not relevant, here.)

Simple applications of this Stream are:

empty :: Stream a
empty = Stream (const Done) ()

singleton :: a -> Stream a
singleton x = Stream step True where
    step True  = Yield x False
    step False = Done

fromList :: [a] -> Stream a
fromList zs = Stream step zs
where
    step (x:xs) = Yield x xs
    step []     = Done

Strict left fold is done like this:

foldl'S :: (a -> b -> a) -> a -> Stream b -> a
foldl'S f a0 (Stream step s) = go a0 s where
    go a s = a `seq`
                case step s of
                    Yield b s' -> go (f a b) s'
                    Skip    s' -> go a       s'
                    Done       -> a

and that gives the usual list-like functions lengthS = foldl'S (\n _ -> n+1) 0, etc. This certainly isn't as elegant as Conduit or Pipes, but it's simple and fast. So far so good.

Now, let's try to aggregate 'low-level' streams to more high-level ones. For example, if you have a bitstream Stream Bool you may want to decode the bits to yield a Stream Int by using some clever codec. Of course it's always possible to build up a new step function s -> Step s b from the step function extracted of a given Stream step s. Repeated applications of the step :: s->Step s a function result in awkward case (step s) of ... cascades that handle the three possibilities Done, Skip, Yield, over and over again. Ideally, the aggregate should like this:

aggregate :: Stream a -> M?? -> Stream b
newStream = aggregate oldStream $ do
    a1 <- get    -- a1 :: a
    if a1 == True then doSomething
    else do
        a2 <- get
        -- etc.

The M?? is some monad, to be defined. Let's try a type Appl s a:

newtype Appl s a = Appl ((s->Step s a) -> s -> Step s a)

It's called Appl because it has the signature of a function application. The monad instance is quite straightforward:

instance Monad (Appl s) where
    return a = Appl (\_ s -> Yield a s)
    (Appl ap) >>= f = Appl (\step s ->
        case (ap step s) of
            Done -> Done
            Skip s' -> untilNotSkip step s'
            Yield a' s' -> ap' step s' where Appl ap' = f a'

where untilNotSkip :: (s->Step s a) -> s -> Step s a is just repeated (nested) application of a step :: (s->Step s a) until a Done or a Yield is returned.

The get function is just normal function application

get :: Appl s a
get = Appl(\step s -> step s)

To tie things up, Functor and Applicative need to be done, and here comes the problem: Appl s can't be made a functor. The signature is

fmap :: (a->b) -> Appl s a -> Appl s b

and that just doesn't work because in order to make a function (s->Step s b) -> s -> Step s b) from a function (s->Step s a) -> s -> Step s a) I'd need a b->a. I can pull back an Appl s b over an a->b, but I can't push forward an Appl s a - i.e. I can have a contravariant functor but not a functor. That's weird. Streams are quite naturally comonads, but I don't see the connection. the purpose of Appl is to turn a step function s->Step s a into another one s->Step s b.

Something is very wrong here, Appl isn't the right "M??". Can anyone help out?

Update

As pointed out by Li-yao Xia the type should be something like

data Walk a b = forall s. Walk ((s->Step s a) -> s -> Step s b)

And the Functor, Applicative and Monad instances would be

instance Functor (Step s) where
    fmap f Done        = Done
    fmap f (Skip s)    = Skip s
    fmap f (Yield a s) = Yield (f a) s

instance Functor (Walk a) where
    fmap f (Walk t) = Walk ( \step s -> fmap f (t step s) )

-- default Applicative given a monad instance
ap :: (Monad m) => m (a -> b) -> m a -> m b
ap mf m = do
    f <- mf
    x <- m
    return (f x)

untilNotSkip :: (s->Step s a) -> s -> Step s a
untilNotSkip step s = case step s of
    Done        -> Done
    Skip s'     -> untilNotSkip step s'
    Yield a' s' -> Yield a' s'

instance Monad (Walk a) where
    return a = Walk (\_ s -> Yield a s)
    Walk t >>= f =
        Walk (\step s -> case t (untilNotSkip step) s of
            Done        -> Done
            Skip _      -> error "Internal error."
            Yield b' s' -> case f b' of Walk t' -> t' step s'   -- bad
    )

instance Applicative (Walk a) where
    pure = return
    (<*>) = ap

The type checker won't allow this monad instance, however. In the definition of >>= the s in Walk (\step s -> ... is different from the s' in Yield b' s' -> ..., but it must be the same. The fundamental problem here is that (>>=) :: Walk a b -> (b->Walk a c) -> Walk a c has two independently all-quantified states s, one in the first argument, and another one that is returned by b->Walk a c. Effectively this is (with abuse of notation) (forall s. Walk s a b) -> (forall s'. b->Walk s' a' c) -> (forall s''. Walk s'' a c), which doesn't make sense, neither conceptually nor for the type checker. All three s, s', s'' must be the same type.

A variation where Walk is not all-quantified over s:

data Walk s a b = Walk ((s->Step s a) -> s -> Step s b)

allows correct definition of bind, but then aggregate won't work:

-- does not compile
aggregate :: Stream a -> Walk s a b -> Stream b
aggregate (Stream step s) (M t) = Stream (t step) s

Again, the stream state s must always be the same. One way out of this would be to introduce a data PreStream s a = PreStream (s -> Step s a) s, but that doesn't allow an aggregate :: Stream a -> ?? -> Stream b either.

The source code is on github.

1

1 Answers

2
votes

Let's look at Appl again, because it seems almost right.

newtype Appl s a = Appl ((s->Step s a) -> s -> Step s a)

The idea is to define a stream transducer by converting a "low-level" step function to a "high-level" one. With that view, those two step functions shouldn't have the same output. For example, if we're transducing bits to bytes, we want (s -> Step s Bit) -> s -> Step s Byte.

Thus, a better type would be

newtype Walk s b a = Walk ((s -> Step s b) -> s -> Step s a)
-- A walk is many steps.

Furthermore, since Stream quantifies existentially over s, you'll need some universal quantification over s at some point to use Walk, so you might as well put it in the type definition.

newtype Walk b a = Walk (forall s. (s -> Step s b) -> s -> Step s a)