From 2298f9c7afc573b4915d8ed87adb9225a1fb4edb Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Mon, 14 Oct 2019 19:53:09 -0400 Subject: [PATCH] upgrade: add support to async runtime --- async/httpaf_async.ml | 27 ++++++++++++++++++++++++++- async/httpaf_async.mli | 10 ++++++++-- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/async/httpaf_async.ml b/async/httpaf_async.ml index 4db6796a..b6492b84 100644 --- a/async/httpaf_async.ml +++ b/async/httpaf_async.ml @@ -90,8 +90,30 @@ let read fd buffer = open Httpaf module Server = struct - let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler = + module Upgrade = struct + type 'a t = + | Ignore + | Raise + | Handle of (([`Active], 'a) Socket.t -> Httpaf.Request.t -> Httpaf.Response.t -> unit) + + let to_handler = function + | Ignore -> (fun socket _request _response -> + don't_wait_for (Fd.close (Socket.fd socket))) + | Raise -> + (fun socket _request _response -> + don't_wait_for (Fd.close (Socket.fd socket)); + failwith "Upgrades not supported by server") + | Handle handler -> handler + end + + let create_connection_handler + ?(config=Config.default) + ~upgrade_handler + ~request_handler + ~error_handler + = fun client_addr socket -> + let upgrade_handler = Upgrade.to_handler upgrade_handler in let fd = Socket.fd socket in let writev = Faraday_async.writev_of_fd fd in let request_handler = request_handler client_addr in @@ -101,6 +123,7 @@ module Server = struct let buffer = Buffer.create config.read_buffer_size in let rec reader_thread () = match Server_connection.next_read_operation conn with + | `Upgrade -> () | `Read -> (* Log.Global.printf "read(%d)%!" (Fd.to_int_exn fd); *) read fd buffer @@ -136,6 +159,8 @@ module Server = struct | `Yield -> (* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *) Server_connection.yield_writer conn writer_thread; + | `Upgrade(request, response) -> + upgrade_handler socket request response | `Close _ -> (* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *) Ivar.fill write_complete (); diff --git a/async/httpaf_async.mli b/async/httpaf_async.mli index f120624d..7d65219b 100644 --- a/async/httpaf_async.mli +++ b/async/httpaf_async.mli @@ -1,11 +1,17 @@ -open! Core open Async - open Httpaf module Server : sig + module Upgrade : sig + type 'a t = + | Ignore + | Raise + | Handle of (([`Active], 'a) Socket.t -> Request.t -> Response.t -> unit) + end + val create_connection_handler : ?config : Config.t + -> upgrade_handler : 'a Upgrade.t -> request_handler : ('a -> Server_connection.request_handler) -> error_handler : ('a -> Server_connection.error_handler) -> ([< Socket.Address.t] as 'a)