Skip to content

Commit

Permalink
cohttp-eio: Allow streaming responses from handler
Browse files Browse the repository at this point in the history
This changes the response type to `writer -> unit`. This allows
handlers to write the response inside the function, rather than
returning a request to cohttp to write it later. That's useful because
it allows e.g. streaming from an open file and then closing it
afterwards.

Partial application means that code using `respond_string` etc will
continue to work as before.

This also exposes a more polymorphic version of the `respond` function
that accepts sub-types of `Flow.source`, so that callers don't need to
cast the body.
  • Loading branch information
talex5 committed May 20, 2024
1 parent 0853c71 commit ba3250b
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Unreleased

- cohttp-eio: Make server response type abstract and allow streaming in cohttp-eio (talex5 #1024)
- cohttp-{lwt,eio}: server: add connection header to response if not present (ushitora-anqou #1025)
- cohttp-curl: Curl no longer prepends the first HTTP request header to the output. (jonahbeckford #1030, #987)
- cohttp-eio: client: use permissive argument type for make_generic
Expand Down
2 changes: 1 addition & 1 deletion cohttp-eio/examples/dune
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(executables
(names server1 client1 docker_client client_timeout client_tls)
(names server1 server2 client1 docker_client client_timeout client_tls)
(libraries
cohttp-eio
eio_main
Expand Down
37 changes: 37 additions & 0 deletions cohttp-eio/examples/server2.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
let () = Logs.set_reporter (Logs_fmt.reporter ())
and () = Logs.Src.set_level Cohttp_eio.src (Some Debug)

let ( / ) = Eio.Path.( / )

(* To stream a file, we take the extra [writer] argument explicitly.
This means that we stream the response while the function is still
running and the file is still open. *)
let handler dir _socket request _body writer =
let path =
Http.Request.resource request
|> String.split_on_char '/'
|> List.filter (( <> ) "")
|> String.concat "/"
in
let path = if path = "" then "index.html" else path in
Eio.Path.with_open_in (dir / path) @@ fun flow ->
Cohttp_eio.Server.respond () ~status:`OK
~headers:(Http.Header.of_list [ ("content-type", "text/html") ])
~body:flow writer

let log_warning ex = Logs.warn (fun f -> f "%a" Eio.Exn.pp ex)

let () =
let port = ref 8080 in
Arg.parse
[ ("-p", Arg.Set_int port, " Listening port number(8080 by default)") ]
ignore "An HTTP/1.1 server";
Eio_main.run @@ fun env ->
Eio.Switch.run @@ fun sw ->
(* Restrict to current directory: *)
let htdocs = Eio.Stdenv.cwd env in
let socket =
Eio.Net.listen env#net ~sw ~backlog:128 ~reuse_addr:true
(`Tcp (Eio.Net.Ipaddr.V4.loopback, !port))
and server = Cohttp_eio.Server.make ~callback:(handler htdocs) () in
Cohttp_eio.Server.run socket server ~on_error:log_warning
71 changes: 31 additions & 40 deletions cohttp-eio/src/server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,29 @@ module IO = Io.IO

type body = Body.t
type conn = IO.conn * Cohttp.Connection.t [@@warning "-3"]
type response = Http.Response.t * Body.t
type writer = Http.Request.t * IO.oc
type response = writer -> unit

type response_action =
[ `Expert of Http.Response.t * (IO.ic -> IO.oc -> unit)
| `Response of Http.Response.t * body ]

(* type handler =
* sw:Eio.Switch.t ->
* Eio.Net.Sockaddr.stream ->
* Http.Request.t ->
* Eio.Flow.source ->
* Http.Response.t * Eio.Flow.source *)
| `Response of response ]

type t = {
conn_closed : conn -> unit;
handler : conn -> Http.Request.t -> body -> response_action;
handler : conn -> Http.Request.t -> body -> IO.ic -> IO.oc -> unit;
}

let make_response_action ?(conn_closed = fun _ -> ()) ~callback () =
{ conn_closed; handler = callback }
{
conn_closed;
handler =
(fun conn request body ic oc ->
match callback conn request body with
| `Expert (response, handler) ->
Io.Response.write_header response oc;
handler ic oc
| `Response fn -> fn (request, oc));
}
let make_expert ?conn_closed ~callback () =
make_response_action ?conn_closed
Expand All @@ -31,12 +34,11 @@ let make_expert ?conn_closed ~callback () =
`Expert expert)
()
let make ?conn_closed ~callback () =
make_response_action ?conn_closed
~callback:(fun conn request body ->
let response = callback conn request body in
`Response response)
()
let make ?(conn_closed = fun _ -> ()) ~callback () =
{
conn_closed;
handler = (fun conn request body _ic oc -> callback conn request body (request, oc));
}
let read input =
match Io.Request.read input with
Expand Down Expand Up @@ -88,9 +90,17 @@ let write output (response : Cohttp.Response.t) body =
in
Eio.Buf_write.flush output
let respond ?headers ?flush ~status ~body () =
let response = Cohttp.Response.make ?headers ?flush ~status () in
(response, body)
let respond ?(headers = Cohttp.Header.init ()) ?flush ~status ~body () (request, oc) =
let keep_alive = Http.Request.is_keep_alive request in
let headers =
match Cohttp.Header.connection headers with
| None ->
Http.Header.add headers "connection"
(if keep_alive then "keep-alive" else "close")
| Some _ -> headers
in
let response = Cohttp.Response.make ~headers ?flush ~status () in
write oc response body
let respond_string ?headers ?flush ~status ~body () =
respond ?headers ?flush ~status ~body:(Body.of_string body) ()
Expand All @@ -117,26 +127,7 @@ let callback { conn_closed; handler } ((_, peer_address) as conn) input output =
(Body.of_string e)
| `Ok (request, body) ->
let () =
try
match handler (conn, id) request body with
| `Response (response, body) ->
let keep_alive =
Http.Request.is_keep_alive request
&& Http.Response.is_keep_alive response
in
let response =
let headers =
Http.Header.add_unless_exists
(Http.Response.headers response)
"connection"
(if keep_alive then "keep-alive" else "close")
in
{ response with Http.Response.headers }
in
write output response body
| `Expert (response, handler) ->
let () = Io.Response.write_header response output in
handler input output
try handler (conn, id) request body input output
with Eio.Io (Eio.Net.E (Connection_reset _), _) ->
Logs.info (fun m ->
m "%a: connection reset" Eio.Net.Sockaddr.pp peer_address)
Expand Down
12 changes: 11 additions & 1 deletion cohttp-eio/src/server.mli
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
type writer

include
Cohttp.Generic.Server.S
with module IO = Io.IO
and type body = Body.t
and type response = Http.Response.t * Body.t
and type response = writer -> unit

val respond :
?headers:Http.Header.t ->
?flush:bool ->
status:Http.Status.t ->
body:_ Eio.Flow.source ->
unit ->
response IO.t

val run :
?max_connections:int ->
Expand Down
4 changes: 1 addition & 3 deletions cohttp-eio/tests/test.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
open Eio.Std

let () =
Logs.set_level ~all:true @@ Some Logs.Debug;
Logs.set_reporter (Logs_fmt.reporter ())
Expand All @@ -13,7 +11,7 @@ let handler _conn request body =
Eio_mock.Flow.on_read body
[ `Return "Hello"; `Yield_then (`Return "World") ]
in
Cohttp_eio.Server.respond ~status:`OK ~body:(body :> Eio.Flow.source_ty r) ()
Cohttp_eio.Server.respond ~status:`OK ~body ()
| "/post" -> Cohttp_eio.Server.respond ~status:`OK ~body ()
| _ -> Cohttp_eio.Server.respond_string ~status:`Not_found ~body:"" ()

Expand Down

0 comments on commit ba3250b

Please sign in to comment.