Skip to content

Commit

Permalink
Chunked-encoding error reporting: async scenario (inhabitedtype#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
anmonteiro authored Dec 2, 2020
1 parent 71ee236 commit 909297d
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ Unreleased
the Async runtimes ([#69](https://github.com/anmonteiro/httpaf/pull/69))
- httpaf: call error handler on read EOF if the entire body hasn't been
received ([#75](https://github.com/anmonteiro/httpaf/pull/75))
- httpaf: Abort (chunked) responses correctly if an error is reported
([#84](https://github.com/anmonteiro/httpaf/pull/84)),
([#86](https://github.com/anmonteiro/httpaf/pull/86))

httpaf (upstream) 0.6.6
--------------
Expand Down
11 changes: 8 additions & 3 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type error =
type error_handler =
?request:Request.t -> error -> (Headers.t -> [`write] Body.t) -> unit

module Reader = Parse.Reader
module Writer = Serialize.Writer

(* XXX(seliopou): The current design assumes that a new [Reqd.t] will be
Expand Down Expand Up @@ -65,6 +66,7 @@ module Writer = Serialize.Writer
type t =
{ request : Request.t
; request_body : [`read] Body.t
; reader : Reader.request
; writer : Writer.t
; response_body_buffer : Bigstringaf.t
; error_handler : error_handler
Expand All @@ -73,9 +75,10 @@ type t =
; mutable error_code : [`Ok | error ]
}

let create error_handler request request_body writer response_body_buffer =
let create error_handler request request_body reader writer response_body_buffer =
{ request
; request_body
; reader
; writer
; response_body_buffer
; error_handler
Expand Down Expand Up @@ -215,10 +218,12 @@ let report_error t error =
failwith "httpaf.Reqd.report_exn: NYI"
| Streaming (_response, response_body), `Ok ->
Body.set_non_chunked response_body;
Body.close_writer response_body
Body.close_writer response_body;
Reader.wakeup t.reader;
| Streaming (_response, response_body), `Exn _ ->
Body.close_writer response_body;
Writer.close_and_drain t.writer
Writer.close_and_drain t.writer;
Reader.wakeup t.reader;
| (Fixed _ | Streaming _ | Upgrade _ | Waiting) , _ ->
(* XXX(seliopou): Once additional logging support is added, log the error
* in case it is not spurious. *)
Expand Down
7 changes: 4 additions & 3 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,14 @@ let create ?(config=Config.default) ?(error_handler=default_error_handler) reque
let writer = Writer.create ~buffer_size:response_buffer_size () in
let request_queue = Queue.create () in
let response_body_buffer = Bigstringaf.create response_body_buffer_size in
let handler request request_body =
let rec reader = lazy (Reader.request handler)
and handler request request_body =
let reqd =
Reqd.create error_handler request request_body writer response_body_buffer
Reqd.create error_handler request request_body (Lazy.force reader) writer response_body_buffer
in
Queue.push reqd request_queue;
in
{ reader = Reader.request handler
{ reader = Lazy.force reader
; writer
; response_body_buffer
; request_handler = request_handler
Expand Down
35 changes: 33 additions & 2 deletions lib_test/test_server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,36 @@ let test_errored_content_length_streaming_response () =
connection_is_shutdown t;
;;

let test_errored_chunked_streaming_response_async () =
let reader_woken_up = ref false in
let writer_woken_up = ref false in
let continue = ref (fun () -> ()) in
let request = Request.create `GET "/" in
let response =
Response.create `OK
~headers:(Headers.of_list ["Transfer-encoding", "chunked"])
in

let request_handler reqd =
let request_body = Reqd.request_body reqd in
Body.close_reader request_body;
let body = Reqd.respond_with_streaming reqd response in
Body.write_string body "hello";
continue := (fun () ->
Reqd.report_exn reqd (Failure "heh"));
in

let t = create request_handler in
read_request t request;
write_response t response ~body:"5\r\nhello\r\n";
yield_reader t (fun () -> reader_woken_up := true);
yield_writer t (fun () -> writer_woken_up := true);
!continue ();
Alcotest.(check bool) "Reader woken up" true !reader_woken_up;
Alcotest.(check bool) "Writer woken up" true !writer_woken_up;
connection_is_shutdown t;
;;

let tests =
[ "initial reader state" , `Quick, test_initial_reader_state
; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof
Expand Down Expand Up @@ -1662,6 +1692,7 @@ let tests =
; "reader EOF race condition causes state machine to issue writer yield", `Quick, test_race_condition_writer_issues_yield_after_reader_eof
; "multiple requests in single read", `Quick, test_multiple_requests_in_single_read
; "multiple async requests in single read", `Quick, test_multiple_async_requests_in_single_read
; "chunked-encoding streaming error test", `Quick, test_errored_chunked_streaming_response
; "content-length streaming error test", `Quick, test_errored_content_length_streaming_response
; "chunked-encoding streaming error", `Quick, test_errored_chunked_streaming_response
; "content-length streaming error", `Quick, test_errored_content_length_streaming_response
; "chunked-encoding async streaming error", `Quick, test_errored_chunked_streaming_response_async
]

0 comments on commit 909297d

Please sign in to comment.