Skip to content

Commit

Permalink
fix(async): do not read pipe to check if its empty
Browse files Browse the repository at this point in the history
this isn't safe as it drops chunks from the user

Signed-off-by: Rudi Grinberg <[email protected]>

<!-- ps-id: 52adccd6-eeda-4f37-a864-bf0738915158 -->
  • Loading branch information
rgrinberg committed Jun 30, 2024
1 parent f66a84d commit 1c96f58
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 51 deletions.
15 changes: 2 additions & 13 deletions cohttp-async/src/body.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,8 @@ let drain = function #B.t -> return () | `Pipe p -> Pipe.drain p

let is_empty (body : t) =
match body with
| #B.t as body -> return (B.is_empty body)
| `Pipe pipe -> (
Deferred.repeat_until_finished () @@ fun () ->
Pipe.values_available pipe >>= function
| `Eof -> return (`Finished true)
| `Ok -> (
match Pipe.peek pipe with
| None -> return (`Finished true)
| Some "" -> (
Pipe.read pipe >>| function
| `Eof -> `Finished true
| `Ok _ -> `Repeat ())
| Some _ -> return (`Finished false)))
| #B.t as body -> if B.is_empty body then `True else `False
| `Pipe _ -> `Unknown

let to_pipe = function
| `Empty -> Pipe.of_list []
Expand Down
2 changes: 1 addition & 1 deletion cohttp-async/src/body.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type t = [ Cohttp.Body.t | `Pipe of string Pipe.Reader.t ] [@@deriving sexp_of]
include Cohttp.S.Body with type t := t

val drain : t -> unit Deferred.t
val is_empty : t -> bool Deferred.t
val is_empty : t -> [ `True | `False | `Unknown ]
val to_string : t -> string Deferred.t
val to_string_list : t -> string list Deferred.t
val to_pipe : t -> string Pipe.Reader.t
Expand Down
23 changes: 12 additions & 11 deletions cohttp-async/src/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,18 @@ let call ?interrupt ?ssl_config ?headers ?(chunked = false) ?(body = `Empty)
Body.Private.disable_chunked_encoding body >>| fun (body, body_length) ->
( Cohttp.Request.make_for_client ?headers ~chunked ~body_length meth uri,
body )
| true -> (
Body.is_empty body >>| function
| true ->
(* Don't used chunked encoding with an empty body *)
( Cohttp.Request.make_for_client ?headers ~chunked:false
~body_length:0L meth uri,
body )
| false ->
(* Use chunked encoding if there is a body *)
(Cohttp.Request.make_for_client ?headers ~chunked:true meth uri, body)
))
| true ->
Deferred.return
(match Body.is_empty body with
| `True ->
(* Don't used chunked encoding with an empty body *)
( Cohttp.Request.make_for_client ?headers ~chunked:false
~body_length:0L meth uri,
body )
| `Unknown | `False ->
(* Use chunked encoding if there is a body *)
( Cohttp.Request.make_for_client ?headers ~chunked:true meth uri,
body )))
>>= fun (req, body) -> request ?interrupt ?ssl_config ~body ~uri req

let get ?interrupt ?ssl_config ?headers uri =
Expand Down
26 changes: 0 additions & 26 deletions cohttp-async/test/test_async_integration.ml
Original file line number Diff line number Diff line change
Expand Up @@ -101,39 +101,13 @@ let ts =
Body.to_string body >>| fun body ->
assert_equal ~printer "expert 2" body
in
let check_body_empty_status () =
let is_empty = Cohttp_async.Body.is_empty in
let tests =
[
("empty pipe", Pipe.of_list [], true);
("pipe with elements", Pipe.of_list [ "foo"; "bar" ], false);
( "pipe with empty items at the beginning",
Pipe.of_list [ ""; "baz" ],
false );
("Pipe with empty strings", Pipe.of_list [ ""; ""; "" ], true);
]
in
Deferred.List.iter ~how:`Sequential tests
~f:(fun (msg, pipe, expected) ->
is_empty (`Pipe pipe) >>| fun real ->
assert_equal ~msg expected real)
>>= fun () ->
let b = Pipe.of_list [ ""; ""; "foo"; "bar" ] in
is_empty (`Pipe b) >>= fun _ ->
Pipe.to_list b >>| fun real ->
let msg =
"Checking if pipe is empty consumes all leading empty strings"
in
assert_equal ~msg [ "foo"; "bar" ] real
in
[
("empty chunk test", empty_chunk);
("large response", large_response);
("large request", large_request);
("pipelined chunk test", pipelined_chunk);
("large chunked response", large_chunked_response);
("expert response", expert_pipelined);
("check body is_empty status for pipes", check_body_empty_status);
])

let () =
Expand Down

0 comments on commit 1c96f58

Please sign in to comment.