diff --git a/src/core/lwt_stream.ml b/src/core/lwt_stream.ml index cb7539405..d2450f2b7 100644 --- a/src/core/lwt_stream.ml +++ b/src/core/lwt_stream.ml @@ -649,25 +649,27 @@ let rec junk_while_s_rec node f s = let junk_while_s f s = junk_while_s_rec s.node f s -let rec junk_old_rec node s = +let rec junk_available_rec node s = if node == !(s.last) then let thread = feed s in match Lwt.state thread with | Lwt.Return _ -> - junk_old_rec node s + junk_available_rec node s | Lwt.Fail exn -> - Lwt.fail exn + raise exn | Lwt.Sleep -> - Lwt.return_unit + () else match node.data with | Some _ -> consume s node; - junk_old_rec node.next s + junk_available_rec node.next s | None -> - Lwt.return_unit + () + +let junk_available s = junk_available_rec s.node s -let junk_old s = junk_old_rec s.node s +let junk_old s = Lwt.return (junk_available s) let rec get_available_rec node acc s = if node == !(s.last) then diff --git a/src/core/lwt_stream.mli b/src/core/lwt_stream.mli index 4f74483cf..0ca934a0d 100644 --- a/src/core/lwt_stream.mli +++ b/src/core/lwt_stream.mli @@ -234,8 +234,8 @@ val junk_while_s : ('a -> bool Lwt.t) -> 'a t -> unit Lwt.t (** [junk_while f st] removes all elements at the beginning of the streams which satisfy [f]. *) -val junk_old : 'a t -> unit Lwt.t -(** [junk_old st] removes all elements that are ready to be read +val junk_available : 'a t -> unit +(** [junk_available st] removes all elements that are ready to be read without yielding from [st]. *) val get_available : 'a t -> 'a list @@ -263,6 +263,11 @@ val closed : 'a t -> unit Lwt.t @since 2.6.0 *) +(** {3 Deprecated} *) + +val junk_old : 'a t -> unit Lwt.t [@@deprecated "Use junk_available instead"] +(** @deprecated [junk_old st] is [Lwt.return (junk_available st)]. *) + (** {2 Stream transversal} *) (** Note: all the following functions are destructive. diff --git a/test/core/test_lwt_stream.ml b/test/core/test_lwt_stream.ml index 3e5193dc8..fb2133730 100644 --- a/test/core/test_lwt_stream.ml +++ b/test/core/test_lwt_stream.ml @@ -258,6 +258,36 @@ let suite = suite "lwt_stream" [ Lwt_stream.last_new stream >>= fun x -> return (x = 3)); + test_direct "junk_available" + (fun () -> + let s, push = Lwt_stream.create () in + let b0 = Lwt_stream.get_available s = [] in + let () = Lwt_stream.junk_available s in + let b1 = Lwt_stream.get_available s = [] in + let () = push (Some 1); push (Some 2); push (Some 4) in + let () = Lwt_stream.junk_available s in + let b2 = Lwt_stream.get_available s = [] in + let () = push (Some 66); push (Some 77); push (Some 99) in + let () = Lwt_stream.junk_available s in + let b3 = Lwt_stream.get_available s = [] in + b0 && b1 && b2 && b3); + + test "junk_old" + (fun () -> + let open Lwt.Syntax in + let s, push = Lwt_stream.create () in + let b0 = Lwt_stream.get_available s = [] in + let* () = Lwt_stream.junk_old s in + let b1 = Lwt_stream.get_available s = [] in + let () = push (Some 1); push (Some 2); push (Some 4) in + let* () = Lwt_stream.junk_old s in + let b2 = Lwt_stream.get_available s = [] in + let () = push (Some 66); push (Some 77); push (Some 99) in + let* () = Lwt_stream.junk_old s in + let b3 = Lwt_stream.get_available s = [] in + Lwt.return (b0 && b1 && b2 && b3)) + [@ocaml.alert "-deprecated"]; + test "cancel push stream 1" (fun () -> let stream, _ = Lwt_stream.create () in