diff --git a/dune-project b/dune-project index 956137c17..b233ce5e1 100644 --- a/dune-project +++ b/dune-project @@ -12,6 +12,17 @@ (source (github ocsigen/lwt)) (documentation "https://ocsigen.org/lwt") +(package + (name lwt_retry) + (synopsis "Utilities for retrying Lwt computations") + (authors "Shon Feder") + (maintainers + "Raphaël Proust " + "Shon Feder ") + (depends + (ocaml (>= 4.08)) + (lwt (>= 5.3.0)))) + (package (name lwt_ppx) (synopsis "PPX syntax for Lwt, providing something similar to async/await from JavaScript") diff --git a/lwt_retry.opam b/lwt_retry.opam new file mode 100644 index 000000000..bf8413e56 --- /dev/null +++ b/lwt_retry.opam @@ -0,0 +1,31 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "Utilities for retrying Lwt computations" +maintainer: [ + "Raphaël Proust " "Shon Feder " +] +authors: ["Shon Feder"] +license: "MIT" +homepage: "https://github.com/ocsigen/lwt" +doc: "https://ocsigen.org/lwt" +bug-reports: "https://github.com/ocsigen/lwt/issues" +depends: [ + "dune" {>= "2.0"} + "ocaml" {>= "4.08"} + "lwt" {>= "5.3.0"} +] +build: [ + ["dune" "subst"] {pinned} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/ocsigen/lwt.git" diff --git a/src/retry/dune b/src/retry/dune new file mode 100644 index 000000000..2a09d3d6e --- /dev/null +++ b/src/retry/dune @@ -0,0 +1,19 @@ +(* -*- tuareg -*- *) + +let preprocess = + match Sys.getenv "BISECT_ENABLE" with + | "yes" -> "(preprocess (pps bisect_ppx))" + | _ -> "" + | exception _ -> "" + +let () = Jbuild_plugin.V1.send @@ {| + +(library + (public_name lwt_retry) + (synopsis "A utility for retrying Lwt computations") + (wrapped false) + (libraries lwt lwt.unix) + |} ^ preprocess ^ {| + (flags (:standard -w +A))) + +|} diff --git a/src/retry/lwt_retry.ml b/src/retry/lwt_retry.ml new file mode 100644 index 000000000..e2f9cf581 --- /dev/null +++ b/src/retry/lwt_retry.ml @@ -0,0 +1,67 @@ +(* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *) + + + +open Lwt.Syntax + +let default_sleep_duration n' = + let base_sleep_time = 2.0 in + let n = Int.to_float n' in + n *. base_sleep_time *. Float.pow 2.0 n + +type ('retry, 'fatal) error = + [ `Retry of 'retry + | `Fatal of 'fatal + ] + +let pp_opaque fmt _ = Format.fprintf fmt "" + +let pp_error ?(retry = pp_opaque) ?(fatal = pp_opaque) fmt err = + match err with + | `Retry r -> Format.fprintf fmt "`Retry %a" retry r + | `Fatal f -> Format.fprintf fmt "`Fatal %a" fatal f + +let equal_error ~retry ~fatal a b = + match a, b with + | `Retry a', `Retry b' -> retry a' b' + | `Fatal a', `Fatal b' -> fatal a' b' + | _ -> false + +type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error * int) result + +let on_error + (f : unit -> ('ok, ('retry, 'fatal) error) result Lwt.t) + : ('ok, 'retry, 'fatal) attempt Lwt_stream.t + = + let i = ref 0 in + let stop = ref false in + Lwt_stream.from begin fun () -> + incr i; + if !stop then + Lwt.return None + else + let+ result = f () in + match result with + | Error (`Retry _ as retry) -> Some (Error (retry, !i)) + | Error (`Fatal _ as fatal) -> stop := true; Some (Error (fatal, !i)) + | Ok _ as ok -> stop := true; Some ok + end + +let with_sleep ?(duration=default_sleep_duration) (attempts : _ attempt Lwt_stream.t) : _ attempt Lwt_stream.t = + attempts + |> Lwt_stream.map_s begin function + | Ok _ as ok -> Lwt.return ok + | Error (_, n) as err -> + let* () = Lwt_unix.sleep @@ duration n in + Lwt.return err + end + +let n_times n attempts = + if n < 0 then invalid_arg "Lwt_retry.n_times: n must be non-negative"; + (* The first attempt is a try, and re-tries start counting from n + 1 *) + let retries = n + 1 in + let+ attempts = Lwt_stream.nget retries attempts in + match List.rev attempts with + | last :: _ -> last + | _ -> failwith "Lwt_retry.n_times: impossible" diff --git a/src/retry/lwt_retry.mli b/src/retry/lwt_retry.mli new file mode 100644 index 000000000..8cf47f785 --- /dev/null +++ b/src/retry/lwt_retry.mli @@ -0,0 +1,157 @@ +(* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *) + + + +(** Utilities for retrying Lwt computations + + These utilities are useful for dealing with failure-prone computations that + are expected to succeed after some number of repeated attempts. E.g., + + {[ + let flaky_computation () = match try_to_get_resource () with + | Flaky_error msg -> Error (`Retry msg) + | Fatal_error err -> Error (`Fatal err) + | Success result -> Ok result + + let error_tolerant_computation () = + Lwt_retry.(flaky_computation + |> on_error (* Retry when [`Retry]able results are produced. *) + |> with_sleep (* Add a delay between attempts, with an exponential backoff. *) + |> n_times 10 (* Try up to 10 times, so long as errors are retryable. *) + ) + ]} + + This library provides a few combinators, but retry attempts are produced on + demand in an {!type:Lwt_stream.t}, and they can be consumed and traversed + using the {!module:Lwt_stream} functions directly. *) + +type ('retry, 'fatal) error = + [ `Retry of 'retry + | `Fatal of 'fatal + ] +(** The type of errors that a retryable computation can produce. + + - [`Retry r] when [r] represents an error that can be retried. + - [`Fatal f] when [f] represents an error that cannot be retried. *) + +type ('ok, 'retry, 'fatal) attempt = ('ok, ('retry, 'fatal) error * int) result +(** A [('ok, 'retry, 'fatal) attempt] is the [result] of a retryable computation, + with its the erroneous results enumerated. + + - [Ok v] is a successfully computed value [v] + - [Error (err, n)] is the {!type:error} [err] produced on the [n]th + attempt + + The enumeration of attempts is 1-based, because making 0 attempts means + making no attempts all, making 1 attempt means {i trying} once, and (when + [i>0]) making [n] attempts means trying once and then {i retrying} up to + [n-1] times. *) + +val pp_error : + ?retry:(Format.formatter -> 'retry -> unit) -> + ?fatal:(Format.formatter -> 'fatal -> unit) -> + Format.formatter -> ('retry, 'fatal) error -> unit +(** [pp_error ~retry ~fatal] is a pretty printer for {!type:error}s that formats + fatal and retryable errors according to the provided printers. + + If a printers is not provided, values of the type are represented as + [""]. *) + +val equal_error : + retry:('retry -> 'retry -> bool) -> + fatal:('fatal -> 'fatal -> bool) -> + ('retry, 'fatal) error -> + ('retry, 'fatal) error -> + bool + +val on_error : + (unit -> ('ok, ('retry, 'fatal) error) result Lwt.t) -> + ('ok, 'retry, 'fatal) attempt Lwt_stream.t +(** [Lwt_retry.on_error f] is a stream of attempts to compute [f], with attempts + made on demand. Attempts will be added to the stream when results are + requested until the computation either succeeds or produces a fatal error. + + Examples + + {[ + # let success () = Lwt.return_ok ();; + val success : unit -> (unit, 'a) result Lwt.t = + # Lwt_retry.(success |> on_error) |> Lwt_stream.to_list;; + - : (unit, 'a, 'b) Lwt_retry.attempt list = [Ok ()] + + # let fatal_failure () = Lwt.return_error (`Fatal ());; + val fatal_failure : unit -> ('a, [> `Fatal of unit ]) result Lwt.t = + # Lwt_retry.(fatal_failure |> on_error) |> Lwt_stream.to_list;; + - : ('a, 'b, unit) Lwt_retry.attempt list = [Error (`Fatal (), 1)] + + # let retryable_error () = Lwt.return_error (`Retry ());; + val retryable_error : unit -> ('a, [> `Retry of unit ]) result Lwt.t = + # Lwt_retry.(retryable_error |> on_error) |> Lwt_stream.nget 3;; + - : ('a, unit, 'b) Lwt_retry.attempt list = + [Error (`Retry (), 1); Error (`Retry (), 2); Error (`Retry (), 3)] + ]}*) + +val with_sleep : + ?duration:(int -> float) -> + ('ok, 'retry, 'fatal) attempt Lwt_stream.t -> + ('ok, 'retry, 'fatal) attempt Lwt_stream.t +(** [with_sleep ~duration attempts] is the stream of [attempts] with a sleep of + [duration n] seconds added before computing each [n]th retryable attempt. + + @param duration the optional sleep duration calculation, defaulting to + {!val:default_sleep_duration}. + + Examples + + {[ + # let f () = Lwt.return_error (`Retry ());; + # let attempts_with_sleeps = Lwt_retry.(f |> on_error |> with_sleep);; + + # Lwt_stream.get attempts_with_sleeps;; + (* computed immediately *) + Some (Error (`Retry (), 1)) + + # Lwt_stream.get attempts_with_sleeps;; + (* computed after 3 seconds *) + Some (Error (`Retry (), 2)) + + # Lwt_stream.get attempts_with_sleeps;; + (* computed after 9 seconds *) + Some (Error (`Retry (), 3)) + + (* a stream with a constant 1s sleep between attempts *) + # let attempts_with_constant_sleeps = + Lwt_retry.(f |> on_error |> with_sleep ~duration:(fun _ -> 1.0));; + ]} *) + +val default_sleep_duration : int -> float +(** [default_sleep_duration n] is an exponential backoff computed as [n] * 2 * + (2 ^ [n]), which gives the sequence [ [0.; 4.; 16.; 48.; 128.; 320.; 768.; + 1792.; ...] ]. *) + +val n_times : + int -> + ('ok, 'retry, 'fatal) attempt Lwt_stream.t -> + ('ok, 'retry, 'fatal) attempt Lwt.t +(** [n_times n attempts] is [Ok v] if one of the [attempts] succeeds within [n] + retries (or [n+1] attempts), [Error (`Fatal f, n+1)] if any of the attempts + results in the fatal error, or [Error (`Retry r, n+1)] if all [n] retries are + exhausted and the [n+1]th attempt results in a retry error. + + In particular [n_times 0 attempts] will *try* 1 attempt but *re-try* 0, so + it is guaranteed to produce some result. + + [n_times] forces up to [n] elements of the on-demand stream of attempts. + + Examples + + {[ + # let f () = + let i = ref 0 in + fun () -> Lwt.return_error (if !i < 3 then (incr i; `Retry ()) else `Fatal "error!");; + # Lwt_retry.(f () |> on_error |> n_times 0);; + Error (`Retry (), 1) + # Lwt_retry.(f () |> on_error |> n_times 4);; + Error (`Fatal "error!", 3) + ]} *) diff --git a/test/retry/dune b/test/retry/dune new file mode 100644 index 000000000..f07c559aa --- /dev/null +++ b/test/retry/dune @@ -0,0 +1,4 @@ +(test + (name main) + (package lwt_retry) + (libraries lwttester lwt_retry)) diff --git a/test/retry/main.ml b/test/retry/main.ml new file mode 100644 index 000000000..22a78998b --- /dev/null +++ b/test/retry/main.ml @@ -0,0 +1,158 @@ +(* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *) + + + +open Test +open Lwt.Infix +open Lwt.Syntax + +module Retry = Lwt_retry + +let pp = Retry.pp_error ~retry:Format.pp_print_float ~fatal:Format.pp_print_int + +let suite = suite "lwt_retry" [ + test_direct "can format retries outcomes" + (fun () -> + Format.asprintf "%a" pp (`Retry 3.0) = "`Retry 3."); + + test_direct "can format fatal outcomes" + (fun () -> + Format.asprintf "%a" pp (`Fatal 42) = "`Fatal 42"); + + test_direct "can format with default printer" + (fun () -> + Format.asprintf "%a" (fun x -> Retry.pp_error x) (`Fatal 42) + = + "`Fatal "); + + test "success without retry" + (fun () -> + let strm = + Retry.on_error (fun () -> Lwt.return_ok 42) + in + let* actual = Lwt_stream.next strm in + assert (actual = Ok 42); + (* ensure the post condition of an empty stream *) + Lwt_stream.is_empty strm); + + test "does not run extra attempts" + (fun () -> + let count = ref 0 in + let strm = + Retry.on_error (fun () -> + incr count; + Lwt.return_ok 42) + in + let* actual = Lwt_stream.next strm in + assert (actual = Ok 42); + (* Force another attempt on the stream *) + let+ _ = Lwt_stream.is_empty strm in + (* We should have run 1 and only 1 attempt, + or else the execution logic is wrong. *) + !count = 1); + + test "just retries" (fun () -> + let strm = + Retry.on_error (fun () -> Lwt.return_error (`Retry ())) + in + let retry_attempts = 5 in + let expected_retries = List.init retry_attempts (fun i -> Error (`Retry (), i + 1)) in + let+ actual_retries = Lwt_stream.nget retry_attempts strm in + actual_retries = expected_retries); + + test "retries before fatal error" (fun () -> + let retries_before_fatal = 3 in + let i = ref 0 in + let strm = Retry.on_error + (fun () -> + if !i < retries_before_fatal then ( + incr i; + Lwt.return_error (`Retry ()) + ) else + Lwt.return_error (`Fatal ())) + in + let* n_retry_errors = Lwt_stream.nget retries_before_fatal strm >|= List.length in + assert (n_retry_errors = retries_before_fatal); + let* fatal_error = Lwt_stream.next strm in + assert (fatal_error = Error (`Fatal (), retries_before_fatal + 1)); + (* ensure the post condition of an empty stream *) + Lwt_stream.is_empty strm); + + test "retries before success" (fun () -> + let retries_before_fatal = 3 in + let i = ref 0 in + let strm = Retry.on_error (fun () -> + if !i < retries_before_fatal then ( + incr i; + Lwt.return_error (`Retry ()) + ) else + Lwt.return_ok () + ) + in + let* n_retry_errors = Lwt_stream.nget retries_before_fatal strm >|= List.length in + assert (n_retry_errors = retries_before_fatal); + let* success = Lwt_stream.next strm in + assert (success = Ok ()); + (* ensure the post condition of an empty stream *) + Lwt_stream.is_empty strm); + + test "[n_times 0] runs one attempt" (fun () -> + let operation () = Lwt.return_error (`Retry ()) in + let+ attempt = Retry.(operation |> on_error |> n_times 0) in + attempt = Error (`Retry (), 1)); + + test "n_times gives up on a fatal error" (fun () -> + let i = ref 0 in + let operation () = + if !i < 3 then ( + incr i; + Lwt.return_error (`Retry ()) + ) else + Lwt.return_error (`Fatal ()) + in + let+ fatal_error = Retry.(operation |> on_error |> n_times 5) in + fatal_error = Error (`Fatal (), 4)); + + test "n_times gives a retry error when exhausted" (fun () -> + let retries = 5 in + let operation () = Lwt.return_error (`Retry ()) in + let+ result = Retry.(operation |> on_error |> n_times retries) in + result = Error (`Retry (), retries + 1)); + + test "n_times is ok on success" (fun () -> + let i = ref 0 in + let operation () = + if !i < 3 then ( + incr i; + Lwt.return_error (`Retry ()) + ) else + Lwt.return_ok () + in + let+ success = Retry.(operation |> on_error |> n_times 5) in + success = Ok ()); + + test_direct "n_times on negative raises Invalid_argument" (fun () -> + let invalid_negative_retries = -5 in + let operation () = Lwt.return_error (`Retry ()) in + let attempts = Retry.(operation |> on_error) in + try + let _ = Retry.(attempts |> n_times invalid_negative_retries) in + false (* We failed to raise the invalid argument exception *) + with + Invalid_argument _ -> true); + + (* test that the sleeps actually throttle computations as desired *) + test "with_sleep really does sleep" (fun () -> + let duration _ = 0.01 in + let operation () = Lwt.return_error (`Retry ()) in + (* If [with_sleep] is removed the test fails, as expected *) + let retries = Retry.(operation |> on_error |> with_sleep ~duration |> n_times 5) in + (* We will expect the [racing_operation] to complete before the retries with_sleep *) + let racing_operation = Lwt_unix.sleep (duration ()) >|= Result.ok in + let+ actual = Lwt.choose [racing_operation; retries] in + actual = Ok ()); + ] + +let () = + Test.run "retry" [suite]