diff --git a/CHANGES.md b/CHANGES.md index d90921be..a65528ce 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -73,6 +73,9 @@ Unreleased the Async runtimes ([#69](https://github.com/anmonteiro/httpaf/pull/69)) - httpaf: call error handler on read EOF if the entire body hasn't been received ([#75](https://github.com/anmonteiro/httpaf/pull/75)) +- httpaf: Abort (chunked) responses correctly if an error is reported + ([#84](https://github.com/anmonteiro/httpaf/pull/84)), + ([#86](https://github.com/anmonteiro/httpaf/pull/86)) httpaf (upstream) 0.6.6 -------------- diff --git a/lib/reqd.ml b/lib/reqd.ml index 4a6ae12f..f4b495ee 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -37,6 +37,7 @@ type error = type error_handler = ?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit +module Reader = Parse.Reader module Writer = Serialize.Writer (* XXX(seliopou): The current design assumes that a new [Reqd.t] will be @@ -65,6 +66,7 @@ module Writer = Serialize.Writer type t = { request : Request.t ; request_body : [`read] Body.t + ; reader : Reader.request ; writer : Writer.t ; response_body_buffer : Bigstringaf.t ; error_handler : error_handler @@ -73,9 +75,10 @@ type t = ; mutable error_code : [`Ok | error ] } -let create error_handler request request_body writer response_body_buffer = +let create error_handler request request_body reader writer response_body_buffer = { request ; request_body + ; reader ; writer ; response_body_buffer ; error_handler @@ -215,10 +218,12 @@ let report_error t error = failwith "httpaf.Reqd.report_exn: NYI" | Streaming (_response, response_body), `Ok -> Body.set_non_chunked response_body; - Body.close_writer response_body + Body.close_writer response_body; + Reader.wakeup t.reader; | Streaming (_response, response_body), `Exn _ -> Body.close_writer response_body; - Writer.close_and_drain t.writer + Writer.close_and_drain t.writer; + Reader.wakeup t.reader; | (Fixed _ | Streaming _ | Upgrade _ | Waiting) , _ -> (* XXX(seliopou): Once additional logging support is added, log the error * in case it is not spurious. *) diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 718664f8..b92a1ce6 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -110,13 +110,14 @@ let create ?(config=Config.default) ?(error_handler=default_error_handler) reque let writer = Writer.create ~buffer_size:response_buffer_size () in let request_queue = Queue.create () in let response_body_buffer = Bigstringaf.create response_body_buffer_size in - let handler request request_body = + let rec reader = lazy (Reader.request handler) + and handler request request_body = let reqd = - Reqd.create error_handler request request_body writer response_body_buffer + Reqd.create error_handler request request_body (Lazy.force reader) writer response_body_buffer in Queue.push reqd request_queue; in - { reader = Reader.request handler + { reader = Lazy.force reader ; writer ; response_body_buffer ; request_handler = request_handler diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index 30bfbc33..7a80c630 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -1608,6 +1608,36 @@ let test_errored_content_length_streaming_response () = connection_is_shutdown t; ;; +let test_errored_chunked_streaming_response_async () = + let reader_woken_up = ref false in + let writer_woken_up = ref false in + let continue = ref (fun () -> ()) in + let request = Request.create `GET "/" in + let response = + Response.create `OK + ~headers:(Headers.of_list ["Transfer-encoding", "chunked"]) + in + + let request_handler reqd = + let request_body = Reqd.request_body reqd in + Body.close_reader request_body; + let body = Reqd.respond_with_streaming reqd response in + Body.write_string body "hello"; + continue := (fun () -> + Reqd.report_exn reqd (Failure "heh")); + in + + let t = create request_handler in + read_request t request; + write_response t response ~body:"5\r\nhello\r\n"; + yield_reader t (fun () -> reader_woken_up := true); + yield_writer t (fun () -> writer_woken_up := true); + !continue (); + Alcotest.(check bool) "Reader woken up" true !reader_woken_up; + Alcotest.(check bool) "Writer woken up" true !writer_woken_up; + connection_is_shutdown t; +;; + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -1662,6 +1692,7 @@ let tests = ; "reader EOF race condition causes state machine to issue writer yield", `Quick, test_race_condition_writer_issues_yield_after_reader_eof ; "multiple requests in single read", `Quick, test_multiple_requests_in_single_read ; "multiple async requests in single read", `Quick, test_multiple_async_requests_in_single_read - ; "chunked-encoding streaming error test", `Quick, test_errored_chunked_streaming_response - ; "content-length streaming error test", `Quick, test_errored_content_length_streaming_response + ; "chunked-encoding streaming error", `Quick, test_errored_chunked_streaming_response + ; "content-length streaming error", `Quick, test_errored_content_length_streaming_response + ; "chunked-encoding async streaming error", `Quick, test_errored_chunked_streaming_response_async ]