Servant is an awesome set of Haskell packages for Web development. Under the hood the Servant server uses WAI and Warp. Forunately, the package wai-conduit provides support for creating HTTP responses from Conduit sources, which also allow for streaming responses from within a Servant server.
Edsko de Vries has described the basic mechanism for doing this in a Github comment.
This post describes this in more detail, giving a complete example (which can be cloned from its repository).
Let us start! The server API type for a trivial non-streaming example looks like this:
type ServiceAPI = "one" :> Get '[JSON] Int
A handler implementing this endpoint is expected to return a value of type Int and Servant takes care of serializing this value before sending it to the client. An implementation for this API can look like this:
serveAPI :: Server ServiceAPI serveAPI = serveOne serveOne :: Handler Int serveOne = return 1
Now, imagine we want to implement an endpoint
/ones, which streams infinitely many ones. Say we would like to use Conduit, then a reasonable handler type for the endpoint
/ones would look like this:
serveOnes :: Handler (Source (ResourceT IO) (Flush ByteString))
Note: See the Conduit documentation for information on the Flush type.
Given the above type, an implementation for serveOnes could look as follows:
serveOnes = return go where go = do yield (Chunk "1") yield Flush go
Now, in order to be able to use this handler for implementing a Servant API, we need to extend Servant’s Web API DSL. Say we would like to extend the above API with a streaming endpoint as follows:
type ServiceAPI = "one" :> Get '[JSON] Int :<|> "ones" :> GetStream ByteString
We can begin by definin a new type
data GetStream a
In order to be able to use GetStream for defining Servant endpoints, we need to implement a HasServer instance for it. Edsko has demonstrated a basic class implementation in his comment on Github. I would like to make this a bit more configurable by defining a new type class Streamable:
class Streamable a where streamableToBuilder :: a -> Builder streamableCT :: Proxy a -> Maybe MediaType streamableCT _ = Nothing streamableDelimiter :: Proxy a -> Maybe Builder streamableDelimiter _ = Nothing
The method streamableToBuilder is obviously the most important method here as it allows us to convert a value of the type implementing this class into a ByteString Builder. The method streamableCT can be used for defining the response content type while streamableDelimiter can be used for defining a Builder delimiter which will be inserted in the response stream whenever the handler produces a Flush value. While we are at it, let us implement the following helper function:
toBuilderDelimited :: forall a. Streamable a => Flush a -> [Flush Builder]
This function will take care of inserting delimiters on Flush values. Implementing is is straight-forward:
toBuilderDelimited (Chunk a) = [Chunk (streamableToBuilder a)] toBuilderDelimited Flush = case streamableDelimiter (Proxy :: Proxy a) of Just delim -> [Chunk delim, Flush] Nothing -> [Flush]
Having this in place, we can now implement HasServer for GetStream a:
instance Streamable a => HasServer (GetStream a) ctxt where type ServerT (GetStream a) m = m (Source (ResourceT IO) (Flush a)) route Proxy _ctxt sub = leafRouter $ \env req k -> bracket createInternalState closeInternalState (runAction sub env req k . mkResponse) where mkResponse :: InternalState -> Source (ResourceT IO) (Flush a) -> RouteResult Response mkResponse st = Route . responseSource ok200 headers . (.| CL.map toBuilderDelimited .| C.concat) . transPipe (`runInternalState` st) headers :: ResponseHeaders headers = let maybeMediaTypeBS = Media.renderHeader <$> streamableCT (Proxy :: Proxy a) in maybeToList $ ("Content-Type",) <$> maybeMediaTypeBS
The function mkResponse is responsible for converting a Conduit source into a HTTP Response. It uses the previously defined helper function toBuilderDelimited and responseSource from the wai-conduit package. The function headers takes care of producing the desired Content-Type header.
So far we don’t have any instances for Streamable. A naive implementation for ByteString might look like this:
instance Streamable ByteString where streamableToBuilder = lazyByteString . ByteString.Lazy.fromStrict
Here we convert a (strict) ByteString to a ByteString Builder without setting any content or delimiter.
For a different use case one might want to stream newline-delimited JSON values using a content type of application/x-json-stream. The Streamable instance implementing this would be
instance Streamable Value where streamableToBuilder = lazyByteString . encodingToLazyByteString . toEncoding streamableCT _ = Just ("application" Media.// "x-json-stream") streamableDelimiter _ = Just (lazyByteString "\n")
Given this instance, we can extend our service API by writing
type ServiceAPI = "one" :> Get '[JSON] Int :<|> "ones" :> GetStream ByteString :<|> "hello" :> GetStream Value
and implement a handler for the
/hello endpoint as follows:
serveHello :: Handler (Source (ResourceT IO) (Flush Value)) serveHello = return go where go = do yield (Chunk (Array (Vector.replicate 10 (String "hello")))) yield Flush go
This produces a stream of newline-delimited JSON arrays each containing 10 strings. That’s it for now, happy streaming!