diff --git a/CHANGES.md b/CHANGES.md index fd5b4b6d..e4a6d8c0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -17,7 +17,6 @@ - The `Log.t` initially has no output, but can have its output set by the application. - Log when `Retry_manager` retries. -- Log, at debug level, when rate limit is exhausted and when it resets. - Rename `Non_transient_error` to `Permanent_error`. - Rename `Json_object.Util.time` => `time_sec_since_epoch`. - Require Jane Street libraries >= v0.16.0. @@ -29,6 +28,9 @@ - Change `Iter_comments` to return a pipe. - Fix a crash when replying to messages by introducing the new `Endpoint.reply_to_message`. +- Revamp rate limiter. The rate limiter is now built on top of a synchronous + state machine with a small async wrapper. We eliminate various async + background jobs. ## Removed diff --git a/reddit_api_async/connection.ml b/reddit_api_async/connection.ml index 432d63df..bffc0d8b 100644 --- a/reddit_api_async/connection.ml +++ b/reddit_api_async/connection.ml @@ -382,20 +382,18 @@ type t = T : (module T with type t = 't) * 't -> t let sexp_of_t (T ((module T), t)) = T.sexp_of_t t -let all_rate_limiters ~time_source = - Rate_limiter.combine - [ Rate_limiter.by_headers ~time_source - ; Rate_limiter.with_minimum_delay ~delay:(Time_ns.Span.of_int_ms 100) ~time_source +let all_rate_limiters = + Rate_limiter_state_machine.combine + [ Rate_limiter_state_machine.by_headers + ; Rate_limiter_state_machine.with_minimum_delay ~delay:(Time_ns.Span.of_int_ms 100) ] ;; let create credentials ~user_agent = - T - ( (module Local) - , Local.create - credentials - ~user_agent - ~rate_limiter:(all_rate_limiters ~time_source:(Time_source.wall_clock ())) ) + let rate_limiter = + Rate_limiter.of_state_machine all_rate_limiters (Time_source.wall_clock ()) + in + T ((module Local), Local.create credentials ~user_agent ~rate_limiter) ;; let get ?sequence (T ((module T), t)) = T.get ?sequence t @@ -869,6 +867,9 @@ module For_testing = struct | true -> reading filename placeholders | false -> recording filename placeholders in + let rate_limiter = + Rate_limiter.of_state_machine all_rate_limiters Cassette.time_source + in let connection = T ( (module Local) @@ -876,7 +877,7 @@ module For_testing = struct (module Cassette) credentials ~time_source:Cassette.time_source - ~rate_limiter:(all_rate_limiters ~time_source:Cassette.time_source) ) + ~rate_limiter ) in Monitor.protect (fun () -> f connection) diff --git a/reddit_api_async/rate_limiter.ml b/reddit_api_async/rate_limiter.ml index 469b6cd4..2e238cf7 100644 --- a/reddit_api_async/rate_limiter.ml +++ b/reddit_api_async/rate_limiter.ml @@ -1,274 +1,56 @@ open! Core open! Async +module Rate_limiter_state_machine = Reddit_api_kernel.Rate_limiter_state_machine + +type t = + { mutable state : Rate_limiter_state_machine.t + ; response_received : (unit, read_write) Bvar.t + ; time_source : (Time_source.t[@sexp.opaque]) + } +[@@deriving sexp_of] + +let of_state_machine state time_source = + let response_received = Bvar.create () in + { state; response_received; time_source } +;; -module type S = sig - type t [@@deriving sexp_of] - - val kind : string - val permit_request : t -> unit Deferred.t - val notify_response : t -> Cohttp.Response.t -> unit - val is_ready : t -> bool - val wait_until_ready : t -> unit Deferred.t -end - -type t = T : (module S with type t = 't) * 't -> t - -let sexp_of_t (T ((module S), t)) : Sexp.t = List [ Atom S.kind; [%sexp_of: S.t] t ] -let permit_request (T ((module S), t)) = S.permit_request t -let notify_response (T ((module S), t)) = S.notify_response t -let is_ready (T ((module S), t)) = S.is_ready t -let wait_until_ready (T ((module S), t)) = S.wait_until_ready t - -module With_minimum_delay = struct - type t = - { ready : (unit, read_write) Mvar.t - ; time_source : (Time_source.t[@sexp.opaque]) - ; delay : Time_ns.Span.t - } - [@@deriving sexp_of] - - let kind = "With_minimum_delay" - - let create ~delay ~time_source = - let ready = Mvar.create () in - Mvar.set ready (); - { ready; delay; time_source } - ;; - - let permit_request { ready; delay; time_source } = - let%bind () = Mvar.take ready in - Deferred.upon (Time_source.after time_source delay) (fun () -> Mvar.set ready ()); - return () - ;; - - let notify_response (_ : t) (_ : Cohttp.Response.t) = () - let is_ready { ready; _ } = not (Mvar.is_empty ready) - let wait_until_ready { ready; _ } = Mvar.value_available ready -end - -module By_headers = struct - let kind = "By_headers" - - module Server_side_info = struct - type t = - { remaining_api_calls : int - ; reset_time : Time_ns_unix.t - } - [@@deriving sexp, fields] - - let snap_to_nearest_minute time = - let interval = Time_ns.Span.minute in - let base = Time_ns.epoch in - let candidates = - [ Time_ns.prev_multiple ~can_equal_before:true ~interval ~base ~before:time () - ; Time_ns.next_multiple ~can_equal_after:false ~interval ~base ~after:time () - ] - in - List.min_elt - candidates - ~compare: - (Comparable.lift Time_ns.Span.compare ~f:(fun time' -> - Time_ns.abs_diff time time')) - |> Option.value_exn - ;; - - let%expect_test _ = - List.iter [ "2020-11-30 18:48:01.02Z"; "2020-11-30 18:47:59.02Z" ] ~f:(fun time -> - let time = Time_ns_unix.of_string time in - print_s [%sexp (snap_to_nearest_minute time : Time_ns.Alternate_sexp.t)]); - [%expect {| - "2020-11-30 18:48:00Z" - "2020-11-30 18:48:00Z" |}]; - return () - ;; - - let parse_http_header_date date_string = - Scanf.sscanf - date_string - "%3s, %2d %3s %4d %2d:%2d:%2d GMT" - (fun day_of_week d month y hr min sec -> - let day_of_week = Day_of_week.of_string day_of_week in - let month = Month.of_string month in - let date = Date.create_exn ~y ~m:month ~d in - (match Day_of_week.equal day_of_week (Date.day_of_week date) with - | true -> () - | false -> - raise_s - [%message - "HTTP response: Day of week did not match parsed date" - (day_of_week : Day_of_week.t) - (date : Date.t) - (date_string : string)]); - let ofday = Time_ns.Ofday.create ~hr ~min ~sec () in - Time_ns.of_date_ofday date ofday ~zone:Time_ns_unix.Zone.utc) - ;; - - let%expect_test _ = - print_s - [%sexp - (parse_http_header_date "Wed, 21 Oct 2015 07:28:00 GMT" - : Time_ns.Alternate_sexp.t)]; - [%expect {| "2015-10-21 07:28:00Z" |}]; - return () - ;; - - let t_of_headers headers = - let open Option.Let_syntax in - let get_header header = Cohttp.Header.get headers header in - let%bind remaining_api_calls = - get_header "X-Ratelimit-Remaining" >>| Float.of_string >>| Int.of_float - in - let%bind reset_time = - let%bind server_time = get_header "Date" >>| parse_http_header_date in - let%bind relative_reset_time = - get_header "X-Ratelimit-Reset" >>| Int.of_string >>| Time_ns.Span.of_int_sec - in - Some (snap_to_nearest_minute (Time_ns.add server_time relative_reset_time)) - in - Some { remaining_api_calls; reset_time } - ;; - - let compare_by_inferred_age = - (* We use reset time instead of the "Date" header because we might have - sent some requests for which we have yet to receive a response. In that - case, the most authoritative picture of our remaining requests is not - the most recent response, but rather our previous state adjusted by the - number of requests we've sent. - - We therefore prefer the state with fewer requests remaining when two - states are for the same reset period. - *) - Comparable.lexicographic - [ Comparable.lift Time_ns.compare ~f:reset_time - ; Comparable.reverse (Comparable.lift compare ~f:remaining_api_calls) - ] - ;; - - let freshest = Base.Comparable.max compare_by_inferred_age - end - - type t = - { ready : (unit, read_write) Mvar.t - ; time_source : (Time_source.t[@sexp.opaque]) - ; mutable reset_event : ((Nothing.t, unit) Time_source.Event.t[@sexp.opaque]) option - ; mutable server_side_info : Server_side_info.t option - } - [@@deriving sexp_of] - - let create ~time_source = - let ready = Mvar.create () in - Mvar.set ready (); - { server_side_info = None; reset_event = None; time_source; ready } - ;; +let is_ready t = + let now = Time_source.now t.time_source in + match Rate_limiter_state_machine.wait_until t.state with + | Now -> true + | Check_after_receiving_response -> false + | After time -> Time_ns.( >= ) now time +;; - let is_ready { ready; _ } = not (Mvar.is_empty ready) - let wait_until_ready { ready; _ } = Mvar.value_available ready +let wait_until_ready t = + Deferred.repeat_until_finished () (fun () -> + match Rate_limiter_state_machine.wait_until t.state with + | Now -> return (`Finished ()) + | After time -> + (match Time_ns.( >= ) (Time_source.now t.time_source) time with + | true -> return (`Finished ()) + | false -> + let%bind () = Time_source.at t.time_source time in + return (`Repeat ())) + | Check_after_receiving_response -> + let%bind () = Bvar.wait t.response_received in + return (`Repeat ())) +;; - let rec schedule_reset_at_time t time = - let schedule_fresh_event () = - t.reset_event - <- Some - (Time_source.Event.run_at - t.time_source - time - (fun () -> - Mvar.set t.ready (); - (* In case something prevents our first request from receiving - a response, we will periodically allow retries. *) - schedule_reset_at_time t (Time_ns.add time Time_ns.Span.minute)) - ()) - in - match t.reset_event with - | None -> schedule_fresh_event () - | Some event -> - let scheduled_time = Time_source.Event.scheduled_at event in - (match Time_ns.( < ) scheduled_time time with - | false -> () +let permit_request t = + Deferred.repeat_until_finished () (fun () -> + let%bind () = wait_until_ready t in + match is_ready t with + | false -> return (`Repeat ()) | true -> - (match Time_source.Event.reschedule_at event time with - | Ok -> () - | Previously_aborted _ -> . - | Previously_happened () -> schedule_fresh_event ())) - ;; - - let update_server_side_info t ~new_server_side_info = - t.server_side_info <- Some new_server_side_info; - schedule_reset_at_time t new_server_side_info.reset_time; - match - ( new_server_side_info.remaining_api_calls > 0 - , Time_ns.equal new_server_side_info.reset_time Time_ns.epoch ) - with - | false, false -> () - | false, true -> - [%log.debug - Import.log - "Rate limit exhausted" - ~reset_time:(new_server_side_info.reset_time : Time_ns_unix.t)] - | true, _ -> Mvar.set t.ready () - ;; - - let permit_request t = - let%bind () = Mvar.take t.ready in - (match t.server_side_info with - | None -> () - | Some ({ remaining_api_calls; _ } as server_side_info) -> - (match remaining_api_calls with - | 0 -> () - | n -> - let new_server_side_info = - { server_side_info with remaining_api_calls = n - 1 } - in - update_server_side_info t ~new_server_side_info)); - return () - ;; - - let notify_response t response = - let headers = Cohttp.Response.headers response in - match Server_side_info.t_of_headers headers with - | None -> - (* We assume that, in the absence of ratelimit headers, we must have hit - some authentication failure. As a heuristic to avoid getting stuck, we - immediately reset [t.ready]. *) - Mvar.set t.ready () - | Some response_server_side_info -> - let new_server_side_info = - match t.server_side_info with - | None -> response_server_side_info - | Some server_side_info -> - (match - Comparable.lift - [%compare: Time_ns.t] - ~f:Server_side_info.reset_time - server_side_info - response_server_side_info - |> Ordering.of_int - with - | Greater | Equal -> () - | Less -> - [%log.debug - Import.log - "Rate limit is resetting" - ~old_remaining_api_calls:(server_side_info.remaining_api_calls : int)]); - Server_side_info.freshest server_side_info response_server_side_info - in - update_server_side_info t ~new_server_side_info - ;; -end - -let by_headers ~time_source = T ((module By_headers), By_headers.create ~time_source) - -let with_minimum_delay ~time_source ~delay = - T ((module With_minimum_delay), With_minimum_delay.create ~time_source ~delay) + t.state + <- Rate_limiter_state_machine.sent_request_unchecked + t.state + ~now:(Time_source.now t.time_source); + return (`Finished ())) ;; -module Combined = struct - type nonrec t = t list [@@deriving sexp_of] - - let kind = "Combined" - let permit_request ts = Deferred.all_unit (List.map ts ~f:permit_request) - let notify_response ts headers = List.iter ts ~f:(fun t -> notify_response t headers) - let is_ready ts = List.for_all ts ~f:is_ready - let wait_until_ready ts = Deferred.all_unit (List.map ts ~f:wait_until_ready) -end - -let combine ts = T ((module Combined), ts) +let notify_response t response = + t.state <- Rate_limiter_state_machine.received_response t.state response; + Bvar.broadcast t.response_received () +;; diff --git a/reddit_api_async/rate_limiter.mli b/reddit_api_async/rate_limiter.mli index 56a8126d..15c737aa 100644 --- a/reddit_api_async/rate_limiter.mli +++ b/reddit_api_async/rate_limiter.mli @@ -3,9 +3,11 @@ open! Async type t [@@deriving sexp_of] -val by_headers : time_source:Time_source.t -> t -val with_minimum_delay : time_source:Time_source.t -> delay:Time_ns.Span.t -> t -val combine : t list -> t +val of_state_machine + : Reddit_api_kernel.Rate_limiter_state_machine.t + -> Time_source.t + -> t + val permit_request : t -> unit Deferred.t val notify_response : t -> Cohttp.Response.t -> unit val is_ready : t -> bool diff --git a/reddit_api_async/retry_manager.ml b/reddit_api_async/retry_manager.ml index 9f885623..bd3dd13d 100644 --- a/reddit_api_async/retry_manager.ml +++ b/reddit_api_async/retry_manager.ml @@ -127,10 +127,10 @@ let check_server t = let on_transient_error t = match t.state with - | Waiting_for_issue_resolution _ -> () + | Waiting_for_issue_resolution _ -> return () | Working_normally -> t.state <- Waiting_for_issue_resolution { finished = Ivar.create () }; - don't_wait_for (check_server t) + check_server t ;; let rec call t endpoint = @@ -151,6 +151,6 @@ let rec call t endpoint = "Transient error" (request : Endpoint.Request.t) (response : (_, Endpoint.Error.t Connection.Error.t) Result.t)]; - on_transient_error t; + let%bind () = on_transient_error t in call t endpoint) ;; diff --git a/reddit_api_kernel/rate_limiter_state_machine.ml b/reddit_api_kernel/rate_limiter_state_machine.ml new file mode 100644 index 00000000..bf061373 --- /dev/null +++ b/reddit_api_kernel/rate_limiter_state_machine.ml @@ -0,0 +1,254 @@ +open! Core + +module When_to_send = struct + type t = + | Now + | Check_after_receiving_response + | After of Time_ns.t +end + +module type Basic = sig + type t [@@deriving sexp_of] + + val kind : string + val wait_until : t -> When_to_send.t + val sent_request_unchecked : t -> now:Time_ns.t -> t + val received_response : t -> Cohttp.Response.t -> t +end + +type t = T : (module Basic with type t = 't) * 't -> t + +let sexp_of_t (T ((module S), t)) : Sexp.t = List [ Atom S.kind; [%sexp_of: S.t] t ] +let wait_until (T ((module S), t)) = S.wait_until t + +let sent_request_unchecked (T (((module M) as m), t)) ~now = + T (m, M.sent_request_unchecked t ~now) +;; + +let received_response (T (((module M) as m), t)) response = + T (m, M.received_response t response) +;; + +module With_minimum_delay = struct + type t = + { last_request : Time_ns.Alternate_sexp.t option + ; delay : Time_ns.Span.t + } + [@@deriving sexp_of] + + let kind = "With_minimum_delay" + let create ~delay = { last_request = None; delay } + + let wait_until t : When_to_send.t = + match t.last_request with + | None -> Now + | Some time -> After (Time_ns.add time t.delay) + ;; + + let sent_request_unchecked t ~now = { t with last_request = Some now } + let received_response t (_ : Cohttp.Response.t) = t +end + +module By_headers = struct + let kind = "By_headers" + + module Server_side_info = struct + type t = + { remaining_api_calls : int + ; reset_time : Time_ns.Alternate_sexp.t + } + [@@deriving sexp, fields] + + let snap_to_nearest interval time = + let base = Time_ns.epoch in + let candidates = + [ Time_ns.prev_multiple ~can_equal_before:true ~interval ~base ~before:time () + ; Time_ns.next_multiple ~can_equal_after:false ~interval ~base ~after:time () + ] + in + List.min_elt + candidates + ~compare: + (Comparable.lift Time_ns.Span.compare ~f:(fun time' -> + Time_ns.abs_diff time time')) + |> Option.value_exn + ;; + + let%expect_test _ = + List.iter [ "2020-11-30 18:48:01.02Z"; "2020-11-30 18:47:59.02Z" ] ~f:(fun time -> + let time = Time_ns.of_string_with_utc_offset time in + print_s + [%sexp (snap_to_nearest Time_ns.Span.minute time : Time_ns.Alternate_sexp.t)]); + [%expect {| + "2020-11-30 18:48:00Z" + "2020-11-30 18:48:00Z" |}] + ;; + + let parse_http_header_date date_string = + Scanf.sscanf + date_string + "%3s, %2d %3s %4d %2d:%2d:%2d GMT" + (fun day_of_week d month y hr min sec -> + let day_of_week = Day_of_week.of_string day_of_week in + let month = Month.of_string month in + let date = Date.create_exn ~y ~m:month ~d in + (match Day_of_week.equal day_of_week (Date.day_of_week date) with + | true -> () + | false -> + raise_s + [%message + "HTTP response: Day of week did not match parsed date" + (day_of_week : Day_of_week.t) + (date : Date.t) + (date_string : string)]); + let ofday = Time_ns.Ofday.create ~hr ~min ~sec () in + Time_ns.of_date_ofday date ofday ~zone:Time_float.Zone.utc) + ;; + + let%expect_test _ = + print_s + [%sexp + (parse_http_header_date "Wed, 21 Oct 2015 07:28:00 GMT" + : Time_ns.Alternate_sexp.t)]; + [%expect {| "2015-10-21 07:28:00Z" |}] + ;; + + let t_of_headers headers = + let open Option.Let_syntax in + let get_header header = Cohttp.Header.get headers header in + let%bind remaining_api_calls = + get_header "X-Ratelimit-Remaining" >>| Float.of_string >>| Int.of_float + in + let%bind reset_time = + let%bind server_time = get_header "Date" >>| parse_http_header_date in + let%bind relative_reset_time = + get_header "X-Ratelimit-Reset" >>| Int.of_string >>| Time_ns.Span.of_int_sec + in + Some + (snap_to_nearest + Time_ns.Span.minute + (Time_ns.add server_time relative_reset_time)) + in + Some { remaining_api_calls; reset_time } + ;; + + let compare_by_inferred_age = + (* We use reset time instead of the "Date" header because we might have + sent some requests for which we have yet to receive a response. In that + case, the most authoritative picture of our remaining requests is not + the most recent response, but rather our previous state adjusted by the + number of requests we've sent. + + We therefore prefer the state with fewer requests remaining when two + states are for the same reset period. + *) + Comparable.lexicographic + [ Comparable.lift Time_ns.compare ~f:reset_time + ; Comparable.reverse (Comparable.lift compare ~f:remaining_api_calls) + ] + ;; + + let freshest = Base.Comparable.max compare_by_inferred_age + + let state_at_start_of_window ~representative_time = + let reset_time = + Time_ns.next_multiple + ~can_equal_after:false + ~interval:(Time_ns.Span.of_int_min 10) + ~base:Time_ns.epoch + ~after:representative_time + () + in + let remaining_api_calls = 996 in + { remaining_api_calls; reset_time } + ;; + end + + type t = + | Created + | Waiting_on_first_request + | Consuming_rate_limit of Server_side_info.t + [@@deriving sexp_of] + + let wait_until t : When_to_send.t = + match t with + | Created -> Now + | Waiting_on_first_request -> Check_after_receiving_response + | Consuming_rate_limit { remaining_api_calls; reset_time } -> + (match remaining_api_calls > 0 with + | true -> Now + | false -> After reset_time) + ;; + + let sent_request_unchecked t ~now = + match t with + | Created -> Waiting_on_first_request + | Waiting_on_first_request -> + raise_s + [%message + "[sent_request_unchecked] illegally called in [Waiting_on_first_request] state."] + | Consuming_rate_limit server_side_info -> + let base_server_side_info = + match Time_ns.( <= ) server_side_info.reset_time now with + | false -> server_side_info + | true -> Server_side_info.state_at_start_of_window ~representative_time:now + in + Consuming_rate_limit + { base_server_side_info with + remaining_api_calls = base_server_side_info.remaining_api_calls - 1 + } + ;; + + let received_response t response = + let headers = Cohttp.Response.headers response in + match Server_side_info.t_of_headers headers with + | None -> + (* We assume that, in the absence of ratelimit headers, we must have hit + some authentication failure. As a heuristic to avoid getting stuck, we + immediately reset [t.ready]. *) + Created + | Some response_server_side_info -> + (match t with + | Created -> + raise_s [%message "[received_response] called before [sent_request_unchecked]."] + | Waiting_on_first_request -> Consuming_rate_limit response_server_side_info + | Consuming_rate_limit server_side_info -> + Consuming_rate_limit + (Server_side_info.freshest server_side_info response_server_side_info)) + ;; +end + +let by_headers = T ((module By_headers), Created) + +let with_minimum_delay ~delay = + T ((module With_minimum_delay), With_minimum_delay.create ~delay) +;; + +module Combined = struct + type nonrec t = t list [@@deriving sexp_of] + + let kind = "Combined" + let create ts = ts + + let wait_until ts = + match + List.map ts ~f:wait_until + |> List.max_elt ~compare:(fun a b -> + match a, b with + | Now, _ -> -1 + | _, Now -> 1 + | After a, After b -> Time_ns.compare a b + | _, _ -> 0) + with + | Some v -> v + | None -> Now + ;; + + let sent_request_unchecked ts ~now = List.map ts ~f:(sent_request_unchecked ~now) + + let received_response ts response = + List.map ts ~f:(fun t -> received_response t response) + ;; +end + +let combine ts = T ((module Combined), Combined.create ts) diff --git a/reddit_api_kernel/rate_limiter_state_machine.mli b/reddit_api_kernel/rate_limiter_state_machine.mli new file mode 100644 index 00000000..7eadec9b --- /dev/null +++ b/reddit_api_kernel/rate_limiter_state_machine.mli @@ -0,0 +1,28 @@ +open! Core + +module When_to_send : sig + type t = + | Now + | Check_after_receiving_response + | After of Time_ns.t +end + +type t [@@deriving sexp_of] + +(** {1 Constructors} *) + +val by_headers : t +val with_minimum_delay : delay:Time_ns.Span.t -> t +val combine : t list -> t + +(** {1 Events} *) + +(** [sent_request_unchecked] should called immediately after sending a request. + It is the caller's responsibility to first call {!wait_until}. *) +val sent_request_unchecked : t -> now:Time_ns.t -> t + +val received_response : t -> Cohttp.Response.t -> t + +(** {1 Accessors} *) + +val wait_until : t -> When_to_send.t diff --git a/reddit_api_kernel/reddit_api_kernel.ml b/reddit_api_kernel/reddit_api_kernel.ml index 79a36002..6c34da72 100644 --- a/reddit_api_kernel/reddit_api_kernel.ml +++ b/reddit_api_kernel/reddit_api_kernel.ml @@ -8,6 +8,7 @@ module Listing = Listing module Mod_action = Mod_action module Moderator_report = Moderator_report module Modmail = Modmail +module Rate_limiter_state_machine = Rate_limiter_state_machine module Relationship = Relationship module Stylesheet = Stylesheet module Submit_text = Submit_text diff --git a/test/test_comment_fields.ml b/test/test_comment_fields.ml index 26b14bf9..06c815ef 100644 --- a/test/test_comment_fields.ml +++ b/test/test_comment_fields.ml @@ -27,7 +27,8 @@ let%expect_test "comment_fields" = Set.symmetric_diff keys_from_comment_page keys_from_info_page |> Sequence.to_list in print_s [%sexp (diff : (string, string) Either.t list)]; - [%expect {| ((First depth)) |}]; + [%expect {| + ((First depth)) |}]; print_s [%sexp (Thing.Comment.depth first_comment : int option)]; [%expect {| (0) |}]; print_s diff --git a/test/test_links_and_comments.ml b/test/test_links_and_comments.ml index 45a48c5e..5afda00b 100644 --- a/test/test_links_and_comments.ml +++ b/test/test_links_and_comments.ml @@ -41,7 +41,7 @@ let%expect_test "send_replies" = let%bind () = Connection.call_exn connection (Endpoint.send_replies ~id ~enabled:false) in - [%expect]; + [%expect {| |}]; return ()) ;; @@ -54,7 +54,7 @@ let%expect_test "set_contest_mode" = let%bind () = Connection.call_exn connection (Endpoint.set_contest_mode ~link ~enabled:false) in - [%expect]; + [%expect {| |}]; return ()) ;; @@ -86,6 +86,6 @@ let%expect_test "vote" = let%bind () = Connection.call_exn connection (Endpoint.vote () ~target ~direction:Up) in - [%expect]; + [%expect {| |}]; return ()) ;; diff --git a/test/test_rate_limiter.ml b/test/test_rate_limiter.ml index 9911920e..b6b492fc 100644 --- a/test/test_rate_limiter.ml +++ b/test/test_rate_limiter.ml @@ -49,50 +49,70 @@ let%expect_test _ = let ( ^: ) hr min = Time_ns.add Time_ns.epoch (Time_ns.Span.create ~hr ~min ()) in let time_source = Time_source.create ~now:(00 ^: 00) () in let rate_limiter = - Rate_limiter.by_headers ~time_source:(Time_source.read_only time_source) + Reddit_api_async.Rate_limiter.of_state_machine + Reddit_api_kernel.Rate_limiter_state_machine.by_headers + (Time_source.read_only time_source) in let print () = - print_s - [%sexp - { is_ready : bool = Rate_limiter.is_ready rate_limiter - ; rate_limiter : Rate_limiter.t - ; time : Time_ns.t = Time_source.now time_source - }] + print_endline + (Sexp_pretty.sexp_to_string + [%sexp + { time : Time_ns.t = Time_source.now time_source + ; is_ready : bool = Rate_limiter.is_ready rate_limiter + ; rate_limiter : Rate_limiter.t + }]) in print (); [%expect {| - ((is_ready true) - (rate_limiter - (By_headers - ((ready (())) (time_source ) (reset_event ()) - (server_side_info ())))) - (time (1970-01-01 00:00:00.000000000Z))) |}]; + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready true) + (rate_limiter ( + (state (By_headers Created)) + (response_received (has_any_waiters false)) + (time_source )))) |}]; (* Initially we can permit one request. *) let%bind () = Rate_limiter.permit_request rate_limiter in print (); [%expect {| - ((is_ready false) - (rate_limiter - (By_headers - ((ready ()) (time_source ) (reset_event ()) (server_side_info ())))) - (time (1970-01-01 00:00:00.000000000Z))) |}]; + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready false) + (rate_limiter ( + (state (By_headers Waiting_on_first_request)) + (response_received (has_any_waiters false)) + (time_source )))) |}]; (* Receiving a response allows us to send another request. *) Rate_limiter.notify_response rate_limiter (build_header ~server_time:(00 ^: 00) ~limit_remaining:1); + print (); + [%expect + {| + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready true) + (rate_limiter ( + (state ( + By_headers ( + Consuming_rate_limit ( + (remaining_api_calls 1) + (reset_time "1970-01-01 00:10:00Z"))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; let%bind () = Rate_limiter.permit_request rate_limiter in print (); [%expect {| - ((is_ready false) - (rate_limiter - (By_headers - ((ready ()) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z)))))))) - (time (1970-01-01 00:00:00.000000000Z))) |}]; + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready false) + (rate_limiter ( + (state ( + By_headers ( + Consuming_rate_limit ( + (remaining_api_calls 0) + (reset_time "1970-01-01 00:10:00Z"))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; (* Receiving a response for the same reset period will not increase our limit remaining. *) Rate_limiter.notify_response rate_limiter @@ -100,28 +120,34 @@ let%expect_test _ = print (); [%expect {| - ((is_ready false) - (rate_limiter - (By_headers - ((ready ()) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z)))))))) - (time (1970-01-01 00:00:00.000000000Z))) |}]; + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready false) + (rate_limiter ( + (state ( + By_headers ( + Consuming_rate_limit ( + (remaining_api_calls 0) + (reset_time "1970-01-01 00:10:00Z"))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; (* Moving to the next period increases our remaining limit. *) Rate_limiter.notify_response rate_limiter (build_header ~server_time:(00 ^: 10) ~limit_remaining:10); + let%bind () = Log.Global.flushed () in print (); [%expect {| - ((is_ready true) - (rate_limiter - (By_headers - ((ready (())) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 10) - (reset_time (1970-01-01 00:20:00.000000000Z)))))))) - (time (1970-01-01 00:00:00.000000000Z))) |}]; + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready true) + (rate_limiter ( + (state ( + By_headers ( + Consuming_rate_limit ( + (remaining_api_calls 10) + (reset_time "1970-01-01 00:20:00Z"))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; (* Exhausting the remaining limit causes us to be not-ready. *) let%bind () = Deferred.repeat_until_finished 10 (function @@ -133,26 +159,34 @@ let%expect_test _ = print (); [%expect {| - ("Rate limit is resetting"(old_remaining_api_calls 0)) - ((is_ready false) - (rate_limiter - (By_headers - ((ready ()) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:20:00.000000000Z)))))))) - (time (1970-01-01 00:00:00.000000000Z))) |}]; + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready false) + (rate_limiter ( + (state ( + By_headers ( + Consuming_rate_limit ( + (remaining_api_calls 0) + (reset_time "1970-01-01 00:20:00Z"))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; + let ready_deferred = Rate_limiter.permit_request rate_limiter in + [%expect {| |}]; (* Advancing the time allows us to send another request. *) let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 20) in + let%bind () = ready_deferred in print (); [%expect {| - ((is_ready true) - (rate_limiter - (By_headers - ((ready (())) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:20:00.000000000Z)))))))) - (time (1970-01-01 00:20:00.000000000Z))) |}]; + ((time (1970-01-01 00:20:00.000000000Z)) + (is_ready true) + (rate_limiter ( + (state ( + By_headers ( + Consuming_rate_limit ( + (remaining_api_calls 995) + (reset_time "1970-01-01 00:30:00Z"))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; (* Advancing past the reset time before we receive a response does not result in a double reset. *) let%bind () = Rate_limiter.permit_request rate_limiter in @@ -163,39 +197,73 @@ let%expect_test _ = print (); [%expect {| - ("Rate limit is resetting"(old_remaining_api_calls 0)) - ((is_ready false) - (rate_limiter - (By_headers - ((ready ()) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z)))))))) - (time (1970-01-01 00:20:00.000000000Z))) |}]; + ((time (1970-01-01 00:20:00.000000000Z)) + (is_ready false) + (rate_limiter ( + (state ( + By_headers ( + Consuming_rate_limit ( + (remaining_api_calls 0) + (reset_time "1970-01-01 00:30:00Z"))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 30) in print (); [%expect {| - ((is_ready true) - (rate_limiter - (By_headers - ((ready (())) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z)))))))) - (time (1970-01-01 00:30:00.000000000Z))) |}]; + ((time (1970-01-01 00:30:00.000000000Z)) + (is_ready true) + (rate_limiter ( + (state ( + By_headers ( + Consuming_rate_limit ( + (remaining_api_calls 0) + (reset_time "1970-01-01 00:30:00Z"))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; let%bind () = Rate_limiter.permit_request rate_limiter in + print (); + [%expect + {| + ((time (1970-01-01 00:30:00.000000000Z)) + (is_ready true) + (rate_limiter ( + (state ( + By_headers ( + Consuming_rate_limit ( + (remaining_api_calls 995) + (reset_time "1970-01-01 00:40:00Z"))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; Rate_limiter.notify_response rate_limiter (build_header ~server_time:(00 ^: 29) ~limit_remaining:1); + print (); + [%expect + {| + ((time (1970-01-01 00:30:00.000000000Z)) + (is_ready true) + (rate_limiter ( + (state ( + By_headers ( + Consuming_rate_limit ( + (remaining_api_calls 995) + (reset_time "1970-01-01 00:40:00Z"))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 31) in print (); [%expect {| - ((is_ready false) - (rate_limiter - (By_headers - ((ready ()) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z)))))))) - (time (1970-01-01 00:31:00.000000000Z))) |}]; + ((time (1970-01-01 00:31:00.000000000Z)) + (is_ready true) + (rate_limiter ( + (state ( + By_headers ( + Consuming_rate_limit ( + (remaining_api_calls 995) + (reset_time "1970-01-01 00:40:00Z"))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; return () ;; diff --git a/test/test_set_subreddit_sticky.ml b/test/test_set_subreddit_sticky.ml index 16ad0998..65618085 100644 --- a/test/test_set_subreddit_sticky.ml +++ b/test/test_set_subreddit_sticky.ml @@ -19,6 +19,6 @@ let%expect_test "set_subreddit_sticky" = connection (Endpoint.set_subreddit_sticky () ~link ~sticky_state:Unsticky) in - [%expect]; + [%expect {| |}]; return ()) ;;