Example: Conduit Streaming with Servant
Servant is an awesome set of Haskell packages for Web development. Under the hood the Servant server uses WAI and Warp. Forunately, the package wai-conduit provides support for creating HTTP responses from Conduit sources, which also allow for streaming responses from within a Servant server.
Edsko de Vries has described the basic mechanism for doing this in a Github comment.
This post describes this in more detail, giving a complete example (which can be cloned from its repository).
Let us start! The server API type for a trivial non-streaming example looks like this:
type ServiceAPI = "one" :> Get '[JSON] Int
A handler implementing this endpoint is expected to return a value of type Int and Servant takes care of serializing this value before sending it to the client. An implementation for this API can look like this:
serveAPI :: Server ServiceAPI
serveAPI = serveOne
serveOne :: Handler Int
serveOne = return 1
Now, imagine we want to implement an endpoint /ones
, which streams infinitely many ones. Say we would like to use Conduit, then a reasonable handler type for the endpoint /ones
would look like this:
serveOnes :: Handler (Source (ResourceT IO) (Flush ByteString))
Note: See the Conduit documentation for information on the Flush type.
Given the above type, an implementation for serveOnes could look as follows:
serveOnes = return go
where go = do
yield (Chunk "1")
yield Flush
go
Now, in order to be able to use this handler for implementing a Servant API, we need to extend Servant’s Web API DSL. Say we would like to extend the above API with a streaming endpoint as follows:
type ServiceAPI =
"one" :> Get '[JSON] Int
:<|> "ones" :> GetStream ByteString
We can begin by definin a new type
data GetStream a
In order to be able to use GetStream for defining Servant endpoints, we need to implement a HasServer instance for it. Edsko has demonstrated a basic class implementation in his comment on Github. I would like to make this a bit more configurable by defining a new type class Streamable:
class Streamable a where
streamableToBuilder :: a -> Builder
streamableCT :: Proxy a -> Maybe MediaType
streamableCT _ = Nothing
streamableDelimiter :: Proxy a -> Maybe Builder
streamableDelimiter _ = Nothing
The method streamableToBuilder is obviously the most important method here as it allows us to convert a value of the type implementing this class into a ByteString Builder. The method streamableCT can be used for defining the response content type while streamableDelimiter can be used for defining a Builder delimiter which will be inserted in the response stream whenever the handler produces a Flush value. While we are at it, let us implement the following helper function:
toBuilderDelimited :: forall a. Streamable a => Flush a -> [Flush Builder]
This function will take care of inserting delimiters on Flush values. Implementing is is straight-forward:
toBuilderDelimited (Chunk a) = [Chunk (streamableToBuilder a)]
toBuilderDelimited Flush = case streamableDelimiter (Proxy :: Proxy a) of
Just delim -> [Chunk delim, Flush]
Nothing -> [Flush]
Having this in place, we can now implement HasServer for GetStream a:
instance Streamable a => HasServer (GetStream a) ctxt where
type ServerT (GetStream a) m = m (Source (ResourceT IO) (Flush a))
route Proxy _ctxt sub = leafRouter $
\env req k ->
bracket createInternalState
closeInternalState
(runAction sub env req k . mkResponse)
where
mkResponse :: InternalState
-> Source (ResourceT IO) (Flush a)
-> RouteResult Response
mkResponse st =
Route
. responseSource ok200 headers
. (.| CL.map toBuilderDelimited .| C.concat)
. transPipe (`runInternalState` st)
headers :: ResponseHeaders
headers =
let maybeMediaTypeBS = Media.renderHeader <$> streamableCT (Proxy :: Proxy a)
in maybeToList $ ("Content-Type",) <$> maybeMediaTypeBS
The function mkResponse is responsible for converting a Conduit source into a HTTP Response. It uses the previously defined helper function toBuilderDelimited and responseSource from the wai-conduit package. The function headers takes care of producing the desired Content-Type header.
So far we don’t have any instances for Streamable. A naive implementation for ByteString might look like this:
instance Streamable ByteString where
streamableToBuilder = lazyByteString . ByteString.Lazy.fromStrict
Here we convert a (strict) ByteString to a ByteString Builder without setting any content or delimiter.
For a different use case one might want to stream newline-delimited JSON values using a content type of application/x-json-stream. The Streamable instance implementing this would be
instance Streamable Value where
streamableToBuilder = lazyByteString . encodingToLazyByteString . toEncoding
streamableCT _ = Just ("application" Media.// "x-json-stream")
streamableDelimiter _ = Just (lazyByteString "\n")
Given this instance, we can extend our service API by writing
type ServiceAPI = "one" :> Get '[JSON] Int
:<|> "ones" :> GetStream ByteString
:<|> "hello" :> GetStream Value
and implement a handler for the /hello
endpoint as follows:
serveHello :: Handler (Source (ResourceT IO) (Flush Value))
serveHello = return go
where go = do
yield (Chunk (Array (Vector.replicate 10 (String "hello"))))
yield Flush
go
This produces a stream of newline-delimited JSON arrays each containing 10 strings. That’s it for now, happy streaming!