From 10944482641331cf972084ea0d332e82f5fc4af3 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Fri, 29 Mar 2019 00:41:25 +0000 Subject: [PATCH 1/8] upgrade: support upgrades via an I/O operation Add `Reqd.respond_with_upgrade`, which will create response with a with status code 101, and the provided headers. On the next write operation, this response, along with the inspiring request will be provided to the runtime. It is up to the runtime to serialize the resposne and send it on the wire before handing off the socket to an upgrade handler. --- lib/httpaf.mli | 5 +- lib/reqd.ml | 90 ++++++++++++++++++++++------- lib/server_connection.ml | 122 +++++++++++++++++++++++++-------------- 3 files changed, 152 insertions(+), 65 deletions(-) diff --git a/lib/httpaf.mli b/lib/httpaf.mli index b2c341f2..69e86ec7 100644 --- a/lib/httpaf.mli +++ b/lib/httpaf.mli @@ -637,6 +637,8 @@ module Reqd : sig val respond_with_bigstring : t -> Response.t -> Bigstringaf.t -> unit val respond_with_streaming : ?flush_headers_immediately:bool -> t -> Response.t -> [`write] Body.t + val respond_with_upgrade : ?reason:string -> t -> Headers.t -> unit + (** {3 Exception Handling} *) val report_exn : t -> exn -> unit @@ -678,7 +680,7 @@ module Server_connection : sig (** [create ?config ?error_handler ~request_handler] creates a connection handler that will service individual requests with [request_handler]. *) - val next_read_operation : t -> [ `Read | `Yield | `Close ] + val next_read_operation : t -> [ `Read | `Yield | `Close | `Upgrade ] (** [next_read_operation t] returns a value describing the next operation that the caller should conduct on behalf of the connection. *) @@ -705,6 +707,7 @@ module Server_connection : sig val next_write_operation : t -> [ | `Write of Bigstringaf.t IOVec.t list | `Yield + | `Upgrade of Request.t * Response.t | `Close of int ] (** [next_write_operation t] returns a value describing the next operation that the caller should conduct on behalf of the connection. *) diff --git a/lib/reqd.ml b/lib/reqd.ml index c74b4f95..f240fde4 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -34,10 +34,29 @@ type error = [ `Bad_request | `Bad_gateway | `Internal_server_error | `Exn of exn ] -type response_state = - | Waiting of Optional_thunk.t ref - | Complete of Response.t - | Streaming of Response.t * [`write] Body.t +module Response_state = struct + type t = + | Waiting of Optional_thunk.t ref + | Upgrade of Response.t + | Complete of Response.t + | Streaming of Response.t * [`write] Body.t +end + +module Input_state = struct + type t = + | Provide + | Wait + | Complete + | Upgrade +end + +module Output_state = struct + type t = + | Ready + | Wait + | Complete + | Upgrade +end type error_handler = ?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit @@ -74,7 +93,7 @@ type t = ; response_body_buffer : Bigstringaf.t ; error_handler : error_handler ; mutable persistent : bool - ; mutable response_state : response_state + ; mutable response_state : Response_state.t ; mutable error_code : [`Ok | error ] } @@ -101,13 +120,15 @@ let response { response_state; _ } = match response_state with | Waiting _ -> None | Streaming(response, _) - | Complete (response) -> Some response + | Upgrade response + | Complete response -> Some response let response_exn { response_state; _ } = match response_state with | Waiting _ -> failwith "httpaf.Reqd.response_exn: response has not started" | Streaming(response, _) - | Complete (response) -> response + | Upgrade response + | Complete response -> response let respond_with_string t response str = if t.error_code <> `Ok then @@ -115,7 +136,7 @@ let respond_with_string t response str = match t.response_state with | Waiting when_done_waiting -> (* XXX(seliopou): check response body length *) - Writer.write_response t.writer response; + Writer.write_response t.writer response; Writer.write_string t.writer str; if t.persistent then t.persistent <- Response.persistent_connection response; @@ -123,6 +144,7 @@ let respond_with_string t response str = done_waiting when_done_waiting | Streaming _ -> failwith "httpaf.Reqd.respond_with_string: response already started" + | Upgrade _ | Complete _ -> failwith "httpaf.Reqd.respond_with_string: response already complete" @@ -140,6 +162,7 @@ let respond_with_bigstring t response (bstr:Bigstringaf.t) = done_waiting when_done_waiting | Streaming _ -> failwith "httpaf.Reqd.respond_with_bigstring: response already started" + | Upgrade _ | Complete _ -> failwith "httpaf.Reqd.respond_with_bigstring: response already complete" @@ -156,6 +179,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response = response_body | Streaming _ -> failwith "httpaf.Reqd.respond_with_streaming: response already started" + | Upgrade _ | Complete _ -> failwith "httpaf.Reqd.respond_with_streaming: response already complete" @@ -164,6 +188,19 @@ let respond_with_streaming ?(flush_headers_immediately=false) t response = failwith "httpaf.Reqd.respond_with_streaming: invalid state, currently handling error"; unsafe_respond_with_streaming ~flush_headers_immediately t response +let respond_with_upgrade ?reason t headers = + match t.response_state with + | Waiting when_done_waiting -> + let response = Response.create ?reason ~headers `Switching_protocols in + t.response_state <- Upgrade response; + Body.close_reader t.request_body; + done_waiting when_done_waiting + | Streaming _ -> + failwith "httpaf.Reqd.respond_with_streaming: response already started" + | Upgrade _ + | Complete _ -> + failwith "httpaf.Reqd.respond_with_streaming: response already complete" + let report_error t error = t.persistent <- false; Body.close_reader t.request_body; @@ -187,7 +224,7 @@ let report_error t error = | Streaming(_response, response_body), `Exn _ -> Body.close_writer response_body; Writer.close_and_drain t.writer - | (Complete _ | Streaming _ | Waiting _) , _ -> + | (Complete _ | Upgrade _ | Streaming _ | Waiting _) , _ -> (* XXX(seliopou): Once additional logging support is added, log the error * in case it is not spurious. *) () @@ -216,25 +253,36 @@ let on_more_output_available t f = when_done_waiting := Optional_thunk.some f | Streaming(_, response_body) -> Body.when_ready_to_write response_body f + | Upgrade _ | Complete _ -> failwith "httpaf.Reqd.on_more_output_available: response already complete" let persistent_connection t = t.persistent -let requires_input { request_body; _ } = - not (Body.is_closed request_body) +let input_state t : Input_state.t = + match t.response_state with + | Upgrade _ -> Upgrade + | Waiting _ + | Complete _ + | Streaming _ -> + if Body.is_closed t.request_body + then Complete + else Provide +;; -let requires_output { response_state; _ } = - match response_state with - | Complete _ -> false - | Streaming (_, response_body) -> - not (Body.is_closed response_body) - || Body.has_pending_output response_body - | Waiting _ -> true - -let is_complete t = - not (requires_input t || requires_output t) +let output_state t : Output_state.t = + match t.response_state with + | Complete _ -> Complete + | Upgrade _ -> Upgrade + | Waiting _ -> Wait + | Streaming(_, response_body) -> + if Body.has_pending_output response_body + then Ready + else if Body.is_closed response_body + then Complete + else Wait +;; let flush_request_body t = let request_body = request_body t in diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 7fb9d935..0fbf6dfe 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -197,48 +197,48 @@ let set_error_and_handle ?request t error = let report_exn t exn = set_error_and_handle t (`Exn exn) -let advance_request_queue_if_necessary t = - if is_active t then begin +let rec _next_read_operation t = + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Reader.next t.reader + ) else ( let reqd = current_reqd_exn t in - if Reqd.persistent_connection reqd then begin - if Reqd.is_complete reqd then begin + match Reqd.input_state reqd with + | Provide -> Reader.next t.reader + | Wait -> `Yield + | Complete -> + (match Reqd.output_state reqd with + | Complete -> ignore (Queue.take t.request_queue); - if not (Queue.is_empty t.request_queue) - then t.request_handler (current_reqd_exn t); - wakeup_reader t; - end - end else begin - ignore (Queue.take t.request_queue); - Queue.iter Reqd.close_request_body t.request_queue; - Queue.clear t.request_queue; - Queue.push reqd t.request_queue; - wakeup_writer t; - if Reqd.is_complete reqd - then shutdown t - else if not (Reqd.requires_input reqd) - then shutdown_reader t - end - end else if Reader.is_closed t.reader - then shutdown t - -let _next_read_operation t = - advance_request_queue_if_necessary t; - if is_active t then begin - let reqd = current_reqd_exn t in - if Reqd.requires_input reqd then Reader.next t.reader - else if Reqd.persistent_connection reqd then `Yield - else begin - shutdown_reader t; - Reader.next t.reader - end - end else - Reader.next t.reader + if Reqd.persistent_connection reqd then ( + if not (Queue.is_empty t.request_queue) + then t.request_handler (Queue.peek_exn t.request_queue); + wakeup_reader t; + _next_read_operation t; + ) else ( + shutdown t; + Reader.next t.reader + ) + | Wait + | Ready -> + if not (Reqd.persistent_connection reqd) then ( + shutdown_reader t; + Reader.next t.reader + ) else `Yield + | Upgrade -> assert false) + | Upgrade -> + assert (Reqd.output_state reqd = Upgrade); + shutdown t; + `Upgrade + ) +;; let next_read_operation t = match _next_read_operation t with | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close - | (`Read | `Yield | `Close) as operation -> operation + | (`Read | `Yield | `Close | `Upgrade) as operation -> operation let read_with_more t bs ~off ~len more = let call_handler = Queue.is_empty t.request_queue in @@ -269,9 +269,39 @@ let flush_response_body t = ;; let next_write_operation t = - advance_request_queue_if_necessary t; - flush_response_body t; - Writer.next t.writer + if not (is_active t) then ( + if Reader.is_closed t.reader + then shutdown t; + Writer.next t.writer + ) else ( + let reqd = current_reqd_exn t in + match Reqd.output_state reqd with + | Wait -> `Yield + | Ready -> + assert (Reqd.input_state reqd <> Upgrade); + Reqd.flush_response_body reqd; + Writer.next t.writer + | Complete -> + (match Reqd.input_state reqd with + | Complete -> + ignore (Queue.take t.request_queue); + if Reqd.persistent_connection reqd then ( + if not (Queue.is_empty t.request_queue) + then t.request_handler (Queue.peek_exn t.request_queue); + wakeup_reader t + ) else ( + shutdown t + ); + Writer.next t.writer + | Provide + | Wait -> `Yield + | Upgrade -> assert false) + | Upgrade -> + assert (Reqd.input_state reqd = Upgrade); + shutdown t; + `Upgrade(Reqd.request reqd, Reqd.response_exn reqd) + ) +;; let report_write_result t result = Writer.report_result t.writer result @@ -279,11 +309,17 @@ let report_write_result t result = let yield_writer t k = if is_active t then begin let reqd = current_reqd_exn t in - if Reqd.requires_output reqd - then Reqd.on_more_output_available reqd k - else if Reqd.persistent_connection reqd - then on_wakeup_writer t k - else begin shutdown t; k () end + match Reqd.output_state reqd with + | Complete -> + if Reqd.persistent_connection reqd + then on_wakeup_writer t k + else begin + shutdown t; + k () + end + | Wait -> Reqd.on_more_output_available reqd k + | Ready + | Upgrade -> k () end else if Writer.is_closed t.writer then k () else begin on_wakeup_writer t k end From 07429e00f5cfec2a7310eebc7f6b0560f42c8c0e Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Mon, 14 Oct 2019 22:18:11 -0400 Subject: [PATCH 2/8] upgrade: fix tests --- lib_test/test_httpaf.ml | 101 ++++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 45 deletions(-) diff --git a/lib_test/test_httpaf.ml b/lib_test/test_httpaf.ml index 333565b9..d3ce3f87 100644 --- a/lib_test/test_httpaf.ml +++ b/lib_test/test_httpaf.ml @@ -272,46 +272,52 @@ let response_to_string ?body r = Faraday.serialize_to_string f module Read_operation = struct - type t = [ `Read | `Yield | `Close ] + type t = [ `Read | `Yield | `Close | `Upgrade ] + + let pp_hum fmt (t:t) = + let str = + match t with + | `Read -> "Read" + | `Yield -> "Yield" + | `Close -> "Close" + | `Upgrade -> "Upgrade" + in + Format.pp_print_string fmt str + ;; + end - let pp_hum fmt t = - let str = + module Write_operation = struct + type t = + [ `Write of Bigstringaf.t IOVec.t list + | `Upgrade of Request.t * Response.t + | `Yield + | `Close of int ] + + let iovecs_to_string iovecs = + let len = IOVec.lengthv iovecs in + let bytes = Bytes.create len in + let dst_off = ref 0 in + List.iter (fun { IOVec.buffer; off = src_off; len } -> + Bigstringaf.unsafe_blit_to_bytes buffer ~src_off bytes ~dst_off:!dst_off ~len; + dst_off := !dst_off + len) + iovecs; + Bytes.unsafe_to_string bytes + ;; + + let pp_hum fmt t = match t with - | `Read -> "Read" - | `Yield -> "Yield" - | `Close -> "Close" - in - Format.pp_print_string fmt str - ;; -end - -module Write_operation = struct - type t = [ `Write of Bigstringaf.t IOVec.t list | `Yield | `Close of int ] + | `Write iovecs -> Format.fprintf fmt "Write %S" (iovecs_to_string iovecs) + | `Yield -> Format.pp_print_string fmt "Yield" + | `Close len -> Format.fprintf fmt "Close %i" len + | `Upgrade _ -> Format.pp_print_string fmt "Upgrade" + ;; - let iovecs_to_string iovecs = - let len = IOVec.lengthv iovecs in - let bytes = Bytes.create len in - let dst_off = ref 0 in - List.iter (fun { IOVec.buffer; off = src_off; len } -> - Bigstringaf.unsafe_blit_to_bytes buffer ~src_off bytes ~dst_off:!dst_off ~len; - dst_off := !dst_off + len) - iovecs; - Bytes.unsafe_to_string bytes - ;; - - let pp_hum fmt t = - match t with - | `Write iovecs -> Format.fprintf fmt "Write %S" (iovecs_to_string iovecs) - | `Yield -> Format.pp_print_string fmt "Yield" - | `Close len -> Format.fprintf fmt "Close %i" len - ;; - - let to_write_as_string t = - match t with - | `Write iovecs -> Some (iovecs_to_string iovecs) - | `Close _ | `Yield -> None - ;; -end + let to_write_as_string t = + match t with + | `Write iovecs -> Some (iovecs_to_string iovecs) + | `Close _ | `Yield | `Upgrade _ -> None + ;; + end let write_operation = Alcotest.of_pp Write_operation.pp_hum let read_operation = Alcotest.of_pp Read_operation.pp_hum @@ -423,6 +429,13 @@ module Server_connection = struct Body.close_writer resp_body ;; + let read_eof_empty t = + let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in + Alcotest.(check int) "read_eof with no input returns 0" 0 c; + Alcotest.check read_operation "Shutting down a reader closes it" + `Close (next_read_operation t); + ;; + let test_initial_reader_state () = let t = create default_request_handler in Alcotest.check read_operation "A new reader wants input" @@ -431,15 +444,13 @@ module Server_connection = struct let test_reader_is_closed_after_eof () = let t = create default_request_handler in - let c = read_eof t Bigstringaf.empty ~off:0 ~len:0 in - Alcotest.(check int) "read_eof with no input returns 0" 0 c; + read_eof_empty t; connection_is_shutdown t; let t = create default_request_handler in let c = read t Bigstringaf.empty ~off:0 ~len:0 in Alcotest.(check int) "read with no input returns 0" 0 c; - let c = read_eof t Bigstringaf.empty ~off:0 ~len:0; in - Alcotest.(check int) "read_eof with no input returns 0" 0 c; + read_eof_empty t; connection_is_shutdown t; ;; @@ -832,7 +843,7 @@ module Client_connection = struct let reader_ready t = Alcotest.check read_operation "Reader is ready" - `Read (next_read_operation t :> [`Close | `Read | `Yield]); + `Read (next_read_operation t :> Read_operation.t); ;; let write_string ?(msg="output written") t str = @@ -850,17 +861,17 @@ module Client_connection = struct let writer_yielded t = Alcotest.check write_operation "Writer is in a yield state" - `Yield (next_write_operation t); + `Yield (next_write_operation t :> Write_operation.t); ;; let writer_closed t = Alcotest.check write_operation "Writer is closed" - (`Close 0) (next_write_operation t); + (`Close 0) (next_write_operation t :> Write_operation.t); ;; let connection_is_shutdown t = Alcotest.check read_operation "Reader is closed" - `Close (next_read_operation t :> [`Close | `Read | `Yield]); + `Close (next_read_operation t :> Read_operation.t); writer_closed t; ;; From 1dccf461f3f3cda33aaae55c572bbfdd15232eaa Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Mon, 14 Oct 2019 21:00:39 -0400 Subject: [PATCH 3/8] upgrade: add a test --- lib_test/test_httpaf.ml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/lib_test/test_httpaf.ml b/lib_test/test_httpaf.ml index d3ce3f87..e9a467f6 100644 --- a/lib_test/test_httpaf.ml +++ b/lib_test/test_httpaf.ml @@ -345,6 +345,11 @@ module Server_connection = struct `Read (next_read_operation t); ;; + let read_upgrade t = + Alcotest.check read_operation "Reader is requesting an upgrade" + `Upgrade (next_read_operation t); + ;; + let reader_yielded t = Alcotest.check read_operation "Reader is in a yield state" `Yield (next_read_operation t); @@ -367,6 +372,12 @@ module Server_connection = struct report_write_result t `Closed; ;; + let write_upgrade ?(msg="upgrade written") t request response = + Alcotest.check write_operation msg + (`Upgrade(request, response)) + (next_write_operation t); + ;; + let writer_yielded t = Alcotest.check write_operation "Writer is in a yield state" `Yield (next_write_operation t); @@ -795,6 +806,18 @@ Accept-Language: en-US,en;q=0.5\r\n\r\n"; writer_closed t; ;; + let test_upgrade () = + let request_handler reqd = + Reqd.respond_with_upgrade reqd Headers.empty + in + let t = create ~error_handler request_handler in + let request = Request.create `GET "/" ~headers:(Headers.of_list [ "Connection", "upgrade" ]) in + let response = Response.create `Switching_protocols in + read_request t request; + write_upgrade t request response; + read_upgrade t; + ;; + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -813,6 +836,7 @@ Accept-Language: en-US,en;q=0.5\r\n\r\n"; ; "blocked write on chunked encoding", `Quick, test_blocked_write_on_chunked_encoding ; "writer unexpected eof", `Quick, test_unexpected_eof ; "input shrunk", `Quick, test_input_shrunk + ; "upgrade", `Quick, test_upgrade ] end From 11a3960ea4f1239df2d6f942920d2232645c3645 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Mon, 14 Oct 2019 19:43:11 -0400 Subject: [PATCH 4/8] upgrade: add support to lwt runtime --- lwt-unix/httpaf_lwt_unix.ml | 37 +++++++++++++++++++++++++++--------- lwt-unix/httpaf_lwt_unix.mli | 8 ++++++++ 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/lwt-unix/httpaf_lwt_unix.ml b/lwt-unix/httpaf_lwt_unix.ml index 545fbc6a..cd11cffd 100644 --- a/lwt-unix/httpaf_lwt_unix.ml +++ b/lwt-unix/httpaf_lwt_unix.ml @@ -34,6 +34,7 @@ open Lwt.Infix +(* Based on the Buffer module in httpaf_async.ml. *) module Buffer : sig type t @@ -103,10 +104,29 @@ let shutdown socket command = try Lwt_unix.shutdown socket command with Unix.Unix_error (Unix.ENOTCONN, _, _) -> () -module Config = Httpaf.Config - module Server = struct - let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler = + module Upgrade = struct + type t = + | Ignore + | Raise + | Handle of (Lwt_unix.file_descr -> Httpaf.Request.t -> Httpaf.Response.t -> unit) + + let to_handler = function + | Ignore -> (fun socket _request _response -> Lwt.async (fun () -> Lwt_unix.close socket)) + | Raise -> + (fun socket _request _response -> + Lwt.async (fun () -> Lwt_unix.close socket); + failwith "Upgrades not supported by server") + | Handle handler -> handler + end + + let create_connection_handler + ?(config=Httpaf.Config.default) + ~upgrade_handler + ~request_handler + ~error_handler + = + let upgrade_handler = Upgrade.to_handler upgrade_handler in fun client_addr socket -> let module Server_connection = Httpaf.Server_connection in let connection = @@ -139,7 +159,7 @@ module Server = struct | `Yield -> Server_connection.yield_reader connection read_loop; Lwt.return_unit - + | `Upgrade -> Lwt.return_unit | `Close -> Lwt.wakeup_later notify_read_loop_exited (); if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin @@ -167,11 +187,12 @@ module Server = struct writev io_vectors >>= fun result -> Server_connection.report_write_result connection result; write_loop_step () - + | `Upgrade(request, response) -> + upgrade_handler socket request response; + Lwt.return_unit | `Yield -> Server_connection.yield_writer connection write_loop; Lwt.return_unit - | `Close _ -> Lwt.wakeup_later notify_write_loop_exited (); if not (Lwt_unix.state socket = Lwt_unix.Closed) then begin @@ -201,10 +222,8 @@ module Server = struct Lwt.return_unit end - - module Client = struct - let request ?(config=Config.default) socket request ~error_handler ~response_handler = + let request ?(config=Httpaf.Config.default) socket request ~error_handler ~response_handler = let module Client_connection = Httpaf.Client_connection in let request_body, connection = Client_connection.request ~config request ~error_handler ~response_handler in diff --git a/lwt-unix/httpaf_lwt_unix.mli b/lwt-unix/httpaf_lwt_unix.mli index 6625002c..c92e061a 100644 --- a/lwt-unix/httpaf_lwt_unix.mli +++ b/lwt-unix/httpaf_lwt_unix.mli @@ -39,8 +39,16 @@ open Httpaf to [Lwt_io.establish_server_with_client_socket]. For an example, see [examples/lwt_echo_server.ml]. *) module Server : sig + module Upgrade : sig + type t = + | Ignore + | Raise + | Handle of (Lwt_unix.file_descr -> Httpaf.Request.t -> Httpaf.Response.t -> unit) + end + val create_connection_handler : ?config : Config.t + -> upgrade_handler : Upgrade.t -> request_handler : (Unix.sockaddr -> Server_connection.request_handler) -> error_handler : (Unix.sockaddr -> Server_connection.error_handler) -> Unix.sockaddr From 553adab37fd6067e0b2efd2762dcac6279efdd16 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Mon, 14 Oct 2019 19:53:09 -0400 Subject: [PATCH 5/8] upgrade: add support to async runtime --- async/httpaf_async.ml | 27 ++++++++++++++++++++++++++- async/httpaf_async.mli | 10 ++++++++-- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/async/httpaf_async.ml b/async/httpaf_async.ml index 4db6796a..b6492b84 100644 --- a/async/httpaf_async.ml +++ b/async/httpaf_async.ml @@ -90,8 +90,30 @@ let read fd buffer = open Httpaf module Server = struct - let create_connection_handler ?(config=Config.default) ~request_handler ~error_handler = + module Upgrade = struct + type 'a t = + | Ignore + | Raise + | Handle of (([`Active], 'a) Socket.t -> Httpaf.Request.t -> Httpaf.Response.t -> unit) + + let to_handler = function + | Ignore -> (fun socket _request _response -> + don't_wait_for (Fd.close (Socket.fd socket))) + | Raise -> + (fun socket _request _response -> + don't_wait_for (Fd.close (Socket.fd socket)); + failwith "Upgrades not supported by server") + | Handle handler -> handler + end + + let create_connection_handler + ?(config=Config.default) + ~upgrade_handler + ~request_handler + ~error_handler + = fun client_addr socket -> + let upgrade_handler = Upgrade.to_handler upgrade_handler in let fd = Socket.fd socket in let writev = Faraday_async.writev_of_fd fd in let request_handler = request_handler client_addr in @@ -101,6 +123,7 @@ module Server = struct let buffer = Buffer.create config.read_buffer_size in let rec reader_thread () = match Server_connection.next_read_operation conn with + | `Upgrade -> () | `Read -> (* Log.Global.printf "read(%d)%!" (Fd.to_int_exn fd); *) read fd buffer @@ -136,6 +159,8 @@ module Server = struct | `Yield -> (* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *) Server_connection.yield_writer conn writer_thread; + | `Upgrade(request, response) -> + upgrade_handler socket request response | `Close _ -> (* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *) Ivar.fill write_complete (); diff --git a/async/httpaf_async.mli b/async/httpaf_async.mli index f120624d..7d65219b 100644 --- a/async/httpaf_async.mli +++ b/async/httpaf_async.mli @@ -1,11 +1,17 @@ -open! Core open Async - open Httpaf module Server : sig + module Upgrade : sig + type 'a t = + | Ignore + | Raise + | Handle of (([`Active], 'a) Socket.t -> Request.t -> Response.t -> unit) + end + val create_connection_handler : ?config : Config.t + -> upgrade_handler : 'a Upgrade.t -> request_handler : ('a -> Server_connection.request_handler) -> error_handler : ('a -> Server_connection.error_handler) -> ([< Socket.Address.t] as 'a) From 65b5a20e74ec9341e5a4bad60d54ffcf64eb0837 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Mon, 14 Oct 2019 20:22:02 -0400 Subject: [PATCH 6/8] upgrade: fix examples --- examples/async/async_echo_post.ml | 2 +- examples/lwt/lwt_echo_post.ml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/async/async_echo_post.ml b/examples/async/async_echo_post.ml index 0f524a2a..2787c469 100644 --- a/examples/async/async_echo_post.ml +++ b/examples/async/async_echo_post.ml @@ -10,7 +10,7 @@ let main port max_accepts_per_batch () = let where_to_listen = Tcp.Where_to_listen.of_port port in Tcp.(Server.create_sock ~on_handler_error:`Raise ~backlog:10_000 ~max_connections:10_000 ~max_accepts_per_batch where_to_listen) - (Server.create_connection_handler ~request_handler ~error_handler) + (Server.create_connection_handler ~upgrade_handler:Raise ~request_handler ~error_handler) >>= fun _server -> Stdio.printf "Listening on port %i and echoing POST requests.\n" port; Stdio.printf "To send a POST request, try one of the following\n\n"; diff --git a/examples/lwt/lwt_echo_post.ml b/examples/lwt/lwt_echo_post.ml index 18307107..603011f7 100644 --- a/examples/lwt/lwt_echo_post.ml +++ b/examples/lwt/lwt_echo_post.ml @@ -12,7 +12,7 @@ let main port = Lwt.async (fun () -> Lwt_io.establish_server_with_client_socket listen_address - (Server.create_connection_handler ~request_handler ~error_handler) + (Server.create_connection_handler ~upgrade_handler:Raise ~request_handler ~error_handler) >|= fun _server -> Stdio.printf "Listening on port %i and echoing POST requests.\n" port; Stdio.printf "To send a POST request, try one of the following\n\n"; From 2c86e474e457337b82ad83868c80870712dec984 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Thu, 17 Oct 2019 22:35:25 -0400 Subject: [PATCH 7/8] upgrade: fix lwt runtime The future returned by the ugprade handler will dicatate when the fd will be closed. --- lwt-unix/httpaf_lwt_unix.ml | 7 +++---- lwt-unix/httpaf_lwt_unix.mli | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/lwt-unix/httpaf_lwt_unix.ml b/lwt-unix/httpaf_lwt_unix.ml index cd11cffd..30ff18c4 100644 --- a/lwt-unix/httpaf_lwt_unix.ml +++ b/lwt-unix/httpaf_lwt_unix.ml @@ -109,10 +109,10 @@ module Server = struct type t = | Ignore | Raise - | Handle of (Lwt_unix.file_descr -> Httpaf.Request.t -> Httpaf.Response.t -> unit) + | Handle of (Lwt_unix.file_descr -> Httpaf.Request.t -> Httpaf.Response.t -> unit Lwt.t) let to_handler = function - | Ignore -> (fun socket _request _response -> Lwt.async (fun () -> Lwt_unix.close socket)) + | Ignore -> (fun socket _request _response -> Lwt_unix.close socket) | Raise -> (fun socket _request _response -> Lwt.async (fun () -> Lwt_unix.close socket); @@ -188,8 +188,7 @@ module Server = struct Server_connection.report_write_result connection result; write_loop_step () | `Upgrade(request, response) -> - upgrade_handler socket request response; - Lwt.return_unit + upgrade_handler socket request response | `Yield -> Server_connection.yield_writer connection write_loop; Lwt.return_unit diff --git a/lwt-unix/httpaf_lwt_unix.mli b/lwt-unix/httpaf_lwt_unix.mli index c92e061a..24d5f7b7 100644 --- a/lwt-unix/httpaf_lwt_unix.mli +++ b/lwt-unix/httpaf_lwt_unix.mli @@ -43,7 +43,7 @@ module Server : sig type t = | Ignore | Raise - | Handle of (Lwt_unix.file_descr -> Httpaf.Request.t -> Httpaf.Response.t -> unit) + | Handle of (Lwt_unix.file_descr -> Httpaf.Request.t -> Httpaf.Response.t -> unit Lwt.t) end val create_connection_handler From a365c6da247cb141821c5f09ef7322f8e6fde693 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Thu, 17 Oct 2019 22:37:49 -0400 Subject: [PATCH 8/8] upgrade: fix async runtime The deferred returned by the ugprade handler will dicatate when the fd will be closed. --- async/httpaf_async.ml | 7 +++---- async/httpaf_async.mli | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/async/httpaf_async.ml b/async/httpaf_async.ml index b6492b84..104c673c 100644 --- a/async/httpaf_async.ml +++ b/async/httpaf_async.ml @@ -94,11 +94,10 @@ module Server = struct type 'a t = | Ignore | Raise - | Handle of (([`Active], 'a) Socket.t -> Httpaf.Request.t -> Httpaf.Response.t -> unit) + | Handle of (([`Active], 'a) Socket.t -> Httpaf.Request.t -> Httpaf.Response.t -> unit Deferred.t) let to_handler = function - | Ignore -> (fun socket _request _response -> - don't_wait_for (Fd.close (Socket.fd socket))) + | Ignore -> (fun socket _request _response -> Fd.close (Socket.fd socket)) | Raise -> (fun socket _request _response -> don't_wait_for (Fd.close (Socket.fd socket)); @@ -160,7 +159,7 @@ module Server = struct (* Log.Global.printf "write_yield(%d)%!" (Fd.to_int_exn fd); *) Server_connection.yield_writer conn writer_thread; | `Upgrade(request, response) -> - upgrade_handler socket request response + upgrade_handler socket request response >>> Ivar.fill write_complete | `Close _ -> (* Log.Global.printf "write_close(%d)%!" (Fd.to_int_exn fd); *) Ivar.fill write_complete (); diff --git a/async/httpaf_async.mli b/async/httpaf_async.mli index 7d65219b..a4767e82 100644 --- a/async/httpaf_async.mli +++ b/async/httpaf_async.mli @@ -6,7 +6,7 @@ module Server : sig type 'a t = | Ignore | Raise - | Handle of (([`Active], 'a) Socket.t -> Request.t -> Response.t -> unit) + | Handle of (([`Active], 'a) Socket.t -> Request.t -> Response.t -> unit Deferred.t) end val create_connection_handler