From cdd54de1369a93f69a650297b49d9ad828ddc615 Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Sun, 8 Oct 2023 11:10:33 -0400 Subject: [PATCH 01/16] Improve printing in rate_limiter tests --- test/test_rate_limiter.ml | 193 +++++++++++++++++++++++--------------- 1 file changed, 119 insertions(+), 74 deletions(-) diff --git a/test/test_rate_limiter.ml b/test/test_rate_limiter.ml index 9911920e..217acc91 100644 --- a/test/test_rate_limiter.ml +++ b/test/test_rate_limiter.ml @@ -52,32 +52,38 @@ let%expect_test _ = Rate_limiter.by_headers ~time_source:(Time_source.read_only time_source) in let print () = - print_s - [%sexp - { is_ready : bool = Rate_limiter.is_ready rate_limiter - ; rate_limiter : Rate_limiter.t - ; time : Time_ns.t = Time_source.now time_source - }] + print_endline + (Sexp_pretty.sexp_to_string + [%sexp + { time : Time_ns.t = Time_source.now time_source + ; is_ready : bool = Rate_limiter.is_ready rate_limiter + ; rate_limiter : Rate_limiter.t + }]) in print (); [%expect {| - ((is_ready true) - (rate_limiter - (By_headers - ((ready (())) (time_source ) (reset_event ()) - (server_side_info ())))) - (time (1970-01-01 00:00:00.000000000Z))) |}]; + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready true) + (rate_limiter ( + By_headers ( + (ready (())) + (time_source ) + (reset_event ()) + (server_side_info ()))))) |}]; (* Initially we can permit one request. *) let%bind () = Rate_limiter.permit_request rate_limiter in print (); [%expect {| - ((is_ready false) - (rate_limiter - (By_headers - ((ready ()) (time_source ) (reset_event ()) (server_side_info ())))) - (time (1970-01-01 00:00:00.000000000Z))) |}]; + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready false) + (rate_limiter ( + By_headers ( + (ready ()) + (time_source ) + (reset_event ()) + (server_side_info ()))))) |}]; (* Receiving a response allows us to send another request. *) Rate_limiter.notify_response rate_limiter @@ -86,13 +92,15 @@ let%expect_test _ = print (); [%expect {| - ((is_ready false) - (rate_limiter - (By_headers - ((ready ()) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z)))))))) - (time (1970-01-01 00:00:00.000000000Z))) |}]; + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready false) + (rate_limiter ( + By_headers ( + (ready ()) + (time_source ) + (reset_event ()) + (server_side_info (( + (remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z))))))))) |}]; (* Receiving a response for the same reset period will not increase our limit remaining. *) Rate_limiter.notify_response rate_limiter @@ -100,13 +108,15 @@ let%expect_test _ = print (); [%expect {| - ((is_ready false) - (rate_limiter - (By_headers - ((ready ()) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z)))))))) - (time (1970-01-01 00:00:00.000000000Z))) |}]; + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready false) + (rate_limiter ( + By_headers ( + (ready ()) + (time_source ) + (reset_event ()) + (server_side_info (( + (remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z))))))))) |}]; (* Moving to the next period increases our remaining limit. *) Rate_limiter.notify_response rate_limiter @@ -114,14 +124,15 @@ let%expect_test _ = print (); [%expect {| - ((is_ready true) - (rate_limiter - (By_headers - ((ready (())) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 10) - (reset_time (1970-01-01 00:20:00.000000000Z)))))))) - (time (1970-01-01 00:00:00.000000000Z))) |}]; + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready true) + (rate_limiter ( + By_headers ( + (ready (())) + (time_source ) + (reset_event ()) + (server_side_info (( + (remaining_api_calls 10) (reset_time (1970-01-01 00:20:00.000000000Z))))))))) |}]; (* Exhausting the remaining limit causes us to be not-ready. *) let%bind () = Deferred.repeat_until_finished 10 (function @@ -134,25 +145,29 @@ let%expect_test _ = [%expect {| ("Rate limit is resetting"(old_remaining_api_calls 0)) - ((is_ready false) - (rate_limiter - (By_headers - ((ready ()) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:20:00.000000000Z)))))))) - (time (1970-01-01 00:00:00.000000000Z))) |}]; + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready false) + (rate_limiter ( + By_headers ( + (ready ()) + (time_source ) + (reset_event ()) + (server_side_info (( + (remaining_api_calls 0) (reset_time (1970-01-01 00:20:00.000000000Z))))))))) |}]; (* Advancing the time allows us to send another request. *) let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 20) in print (); [%expect {| - ((is_ready true) - (rate_limiter - (By_headers - ((ready (())) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:20:00.000000000Z)))))))) - (time (1970-01-01 00:20:00.000000000Z))) |}]; + ((time (1970-01-01 00:20:00.000000000Z)) + (is_ready true) + (rate_limiter ( + By_headers ( + (ready (())) + (time_source ) + (reset_event ()) + (server_side_info (( + (remaining_api_calls 0) (reset_time (1970-01-01 00:20:00.000000000Z))))))))) |}]; (* Advancing past the reset time before we receive a response does not result in a double reset. *) let%bind () = Rate_limiter.permit_request rate_limiter in @@ -164,38 +179,68 @@ let%expect_test _ = [%expect {| ("Rate limit is resetting"(old_remaining_api_calls 0)) - ((is_ready false) - (rate_limiter - (By_headers - ((ready ()) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z)))))))) - (time (1970-01-01 00:20:00.000000000Z))) |}]; + ((time (1970-01-01 00:20:00.000000000Z)) + (is_ready false) + (rate_limiter ( + By_headers ( + (ready ()) + (time_source ) + (reset_event ()) + (server_side_info (( + (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 30) in print (); [%expect {| - ((is_ready true) - (rate_limiter - (By_headers - ((ready (())) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z)))))))) - (time (1970-01-01 00:30:00.000000000Z))) |}]; + ((time (1970-01-01 00:30:00.000000000Z)) + (is_ready true) + (rate_limiter ( + By_headers ( + (ready (())) + (time_source ) + (reset_event ()) + (server_side_info (( + (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}]; let%bind () = Rate_limiter.permit_request rate_limiter in + print (); + [%expect + {| + ((time (1970-01-01 00:30:00.000000000Z)) + (is_ready false) + (rate_limiter ( + By_headers ( + (ready ()) + (time_source ) + (reset_event ()) + (server_side_info (( + (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}]; Rate_limiter.notify_response rate_limiter (build_header ~server_time:(00 ^: 29) ~limit_remaining:1); + print (); + [%expect + {| + ((time (1970-01-01 00:30:00.000000000Z)) + (is_ready false) + (rate_limiter ( + By_headers ( + (ready ()) + (time_source ) + (reset_event ()) + (server_side_info (( + (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 31) in print (); [%expect {| - ((is_ready false) - (rate_limiter - (By_headers - ((ready ()) (time_source ) (reset_event ()) - (server_side_info - (((remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z)))))))) - (time (1970-01-01 00:31:00.000000000Z))) |}]; + ((time (1970-01-01 00:31:00.000000000Z)) + (is_ready false) + (rate_limiter ( + By_headers ( + (ready ()) + (time_source ) + (reset_event ()) + (server_side_info (( + (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}]; return () ;; From 311700a339e22a01c87116e0c7e4a9781de7bee4 Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Sun, 10 Sep 2023 07:41:15 -0400 Subject: [PATCH 02/16] Eliminate use of Deferred.upon in rate limiter --- reddit_api_async/rate_limiter.ml | 38 ++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/reddit_api_async/rate_limiter.ml b/reddit_api_async/rate_limiter.ml index 469b6cd4..fbae4700 100644 --- a/reddit_api_async/rate_limiter.ml +++ b/reddit_api_async/rate_limiter.ml @@ -21,7 +21,7 @@ let wait_until_ready (T ((module S), t)) = S.wait_until_ready t module With_minimum_delay = struct type t = - { ready : (unit, read_write) Mvar.t + { wait_until : (Time_ns.Alternate_sexp.t option, read_write) Mvar.t ; time_source : (Time_source.t[@sexp.opaque]) ; delay : Time_ns.Span.t } @@ -30,20 +30,40 @@ module With_minimum_delay = struct let kind = "With_minimum_delay" let create ~delay ~time_source = - let ready = Mvar.create () in - Mvar.set ready (); - { ready; delay; time_source } + let wait_until = Mvar.create () in + Mvar.set wait_until None; + { wait_until; delay; time_source } ;; - let permit_request { ready; delay; time_source } = - let%bind () = Mvar.take ready in - Deferred.upon (Time_source.after time_source delay) (fun () -> Mvar.set ready ()); + let permit_request { wait_until; delay; time_source } = + let%bind () = + match%bind Mvar.take wait_until with + | None -> return () + | Some wait_until -> Time_source.at time_source wait_until + in + Mvar.set wait_until (Some (Time_ns.add (Time_source.now time_source) delay)); return () ;; let notify_response (_ : t) (_ : Cohttp.Response.t) = () - let is_ready { ready; _ } = not (Mvar.is_empty ready) - let wait_until_ready { ready; _ } = Mvar.value_available ready + + let is_ready { wait_until; time_source; _ } = + match Mvar.peek wait_until with + | None -> false + | Some None -> true + | Some (Some wait_until) -> Time_ns.( >= ) (Time_source.now time_source) wait_until + ;; + + let wait_until_ready { wait_until; time_source; _ } = + let%bind mvar_contents = Mvar.take wait_until in + let%bind () = + match mvar_contents with + | None -> return () + | Some wait_until -> Time_source.at time_source wait_until + in + Mvar.set wait_until mvar_contents; + return () + ;; end module By_headers = struct From 6e0f9586fd8b818e2c6a719fa95c1ef82105baa0 Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Sun, 10 Sep 2023 11:07:15 -0400 Subject: [PATCH 03/16] Prevent deadlock in combined rate limiter --- reddit_api_async/rate_limiter.ml | 33 ++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/reddit_api_async/rate_limiter.ml b/reddit_api_async/rate_limiter.ml index fbae4700..dfa292b2 100644 --- a/reddit_api_async/rate_limiter.ml +++ b/reddit_api_async/rate_limiter.ml @@ -282,13 +282,34 @@ let with_minimum_delay ~time_source ~delay = ;; module Combined = struct - type nonrec t = t list [@@deriving sexp_of] + type nonrec t = + { ts : t list + ; sequencer : unit Throttle.Sequencer.t + } + [@@deriving sexp_of] let kind = "Combined" - let permit_request ts = Deferred.all_unit (List.map ts ~f:permit_request) - let notify_response ts headers = List.iter ts ~f:(fun t -> notify_response t headers) - let is_ready ts = List.for_all ts ~f:is_ready - let wait_until_ready ts = Deferred.all_unit (List.map ts ~f:wait_until_ready) + + let create ts = + let sequencer = Throttle.Sequencer.create () in + { ts; sequencer } + ;; + + let permit_request { ts; sequencer } = + Throttle.enqueue sequencer (fun () -> + Deferred.all_unit (List.map ts ~f:permit_request)) + ;; + + let notify_response { ts; _ } headers = + List.iter ts ~f:(fun t -> notify_response t headers) + ;; + + let is_ready { ts; _ } = List.for_all ts ~f:is_ready + + let wait_until_ready { ts; sequencer } = + Throttle.enqueue sequencer (fun () -> + Deferred.all_unit (List.map ts ~f:wait_until_ready)) + ;; end -let combine ts = T ((module Combined), ts) +let combine ts = T ((module Combined), Combined.create ts) From 2f71073dc4cf6ef2375aa23b6fbcade919436fae Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Sun, 10 Sep 2023 15:05:20 -0400 Subject: [PATCH 04/16] Don't use don't_wait_for in retry manager --- reddit_api_async/retry_manager.ml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/reddit_api_async/retry_manager.ml b/reddit_api_async/retry_manager.ml index 9f885623..bd3dd13d 100644 --- a/reddit_api_async/retry_manager.ml +++ b/reddit_api_async/retry_manager.ml @@ -127,10 +127,10 @@ let check_server t = let on_transient_error t = match t.state with - | Waiting_for_issue_resolution _ -> () + | Waiting_for_issue_resolution _ -> return () | Working_normally -> t.state <- Waiting_for_issue_resolution { finished = Ivar.create () }; - don't_wait_for (check_server t) + check_server t ;; let rec call t endpoint = @@ -151,6 +151,6 @@ let rec call t endpoint = "Transient error" (request : Endpoint.Request.t) (response : (_, Endpoint.Error.t Connection.Error.t) Result.t)]; - on_transient_error t; + let%bind () = on_transient_error t in call t endpoint) ;; From e6e38c5ca163a710ea29789597c659eccfc3e9ba Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Sun, 8 Oct 2023 11:05:38 -0400 Subject: [PATCH 05/16] Further modifications to rate limiter - removing event --- reddit_api_async/rate_limiter.ml | 123 ++++++++++++++++------------- test/test_rate_limiter.ml | 128 ++++++++++++++++--------------- 2 files changed, 136 insertions(+), 115 deletions(-) diff --git a/reddit_api_async/rate_limiter.ml b/reddit_api_async/rate_limiter.ml index dfa292b2..be2f4726 100644 --- a/reddit_api_async/rate_limiter.ml +++ b/reddit_api_async/rate_limiter.ml @@ -167,79 +167,90 @@ module By_headers = struct let freshest = Base.Comparable.max compare_by_inferred_age end + module State = struct + type t = + | Created + | Waiting_on_first_request + | Known of Server_side_info.t + [@@deriving sexp_of] + end + type t = - { ready : (unit, read_write) Mvar.t + { mutable state : State.t + ; updated : (unit, read_write) Bvar.t ; time_source : (Time_source.t[@sexp.opaque]) - ; mutable reset_event : ((Nothing.t, unit) Time_source.Event.t[@sexp.opaque]) option - ; mutable server_side_info : Server_side_info.t option } [@@deriving sexp_of] let create ~time_source = - let ready = Mvar.create () in - Mvar.set ready (); - { server_side_info = None; reset_event = None; time_source; ready } + let updated = Bvar.create () in + { time_source; state = Created; updated } ;; - let is_ready { ready; _ } = not (Mvar.is_empty ready) - let wait_until_ready { ready; _ } = Mvar.value_available ready - - let rec schedule_reset_at_time t time = - let schedule_fresh_event () = - t.reset_event - <- Some - (Time_source.Event.run_at - t.time_source - time - (fun () -> - Mvar.set t.ready (); - (* In case something prevents our first request from receiving - a response, we will periodically allow retries. *) - schedule_reset_at_time t (Time_ns.add time Time_ns.Span.minute)) - ()) - in - match t.reset_event with - | None -> schedule_fresh_event () - | Some event -> - let scheduled_time = Time_source.Event.scheduled_at event in - (match Time_ns.( < ) scheduled_time time with - | false -> () - | true -> - (match Time_source.Event.reschedule_at event time with - | Ok -> () - | Previously_aborted _ -> . - | Previously_happened () -> schedule_fresh_event ())) + let is_ready { state; time_source; _ } = + match state with + | Created -> true + | Waiting_on_first_request -> false + | Known { remaining_api_calls; reset_time } -> + remaining_api_calls > 0 || Time_ns.( >= ) (Time_source.now time_source) reset_time ;; + let wait_until_ready { state; time_source; updated } = + Deferred.repeat_until_finished () (fun () -> + match state with + | Created -> return (`Finished ()) + | Waiting_on_first_request -> + let%bind () = Bvar.wait updated in + return (`Repeat ()) + | Known { remaining_api_calls; reset_time } -> + (match + remaining_api_calls > 0 + || Time_ns.( >= ) (Time_source.now time_source) reset_time + with + | true -> return (`Finished ()) + | false -> + let%bind () = + Deferred.any + [ (* TODO reset time needs to actually... reset things*) + Time_source.at time_source reset_time + ; Bvar.wait updated + ] + in + return (`Repeat ()))) + ;; + + (* TODO: Allow retries once per minute? *) + let update_server_side_info t ~new_server_side_info = - t.server_side_info <- Some new_server_side_info; - schedule_reset_at_time t new_server_side_info.reset_time; + t.state <- Known new_server_side_info; + Bvar.broadcast t.updated (); match ( new_server_side_info.remaining_api_calls > 0 , Time_ns.equal new_server_side_info.reset_time Time_ns.epoch ) with - | false, false -> () + | false, false | true, (true | false) -> () | false, true -> [%log.debug Import.log "Rate limit exhausted" ~reset_time:(new_server_side_info.reset_time : Time_ns_unix.t)] - | true, _ -> Mvar.set t.ready () ;; let permit_request t = - let%bind () = Mvar.take t.ready in - (match t.server_side_info with - | None -> () - | Some ({ remaining_api_calls; _ } as server_side_info) -> - (match remaining_api_calls with - | 0 -> () - | n -> - let new_server_side_info = - { server_side_info with remaining_api_calls = n - 1 } - in - update_server_side_info t ~new_server_side_info)); - return () + let%bind () = wait_until_ready t in + match t.state with + | Waiting_on_first_request -> assert false + | Created -> + t.state <- Waiting_on_first_request; + return () + | Known ({ remaining_api_calls; _ } as server_side_info) -> + let new_server_side_info : Server_side_info.t = + match remaining_api_calls with + | 0 -> server_side_info + | n -> { server_side_info with remaining_api_calls = n - 1 } + in + update_server_side_info t ~new_server_side_info; + return () ;; let notify_response t response = @@ -249,12 +260,16 @@ module By_headers = struct (* We assume that, in the absence of ratelimit headers, we must have hit some authentication failure. As a heuristic to avoid getting stuck, we immediately reset [t.ready]. *) - Mvar.set t.ready () + (* TODO: What to do here? *) + t.state <- Created | Some response_server_side_info -> let new_server_side_info = - match t.server_side_info with - | None -> response_server_side_info - | Some server_side_info -> + match t.state with + | Created -> + (* TODO: this could happen if the module is misused *) + assert false + | Waiting_on_first_request -> response_server_side_info + | Known server_side_info -> (match Comparable.lift [%compare: Time_ns.t] diff --git a/test/test_rate_limiter.ml b/test/test_rate_limiter.ml index 217acc91..0ec72fae 100644 --- a/test/test_rate_limiter.ml +++ b/test/test_rate_limiter.ml @@ -67,10 +67,7 @@ let%expect_test _ = (is_ready true) (rate_limiter ( By_headers ( - (ready (())) - (time_source ) - (reset_event ()) - (server_side_info ()))))) |}]; + (state Created) (updated (has_any_waiters false)) (time_source ))))) |}]; (* Initially we can permit one request. *) let%bind () = Rate_limiter.permit_request rate_limiter in print (); @@ -80,10 +77,9 @@ let%expect_test _ = (is_ready false) (rate_limiter ( By_headers ( - (ready ()) - (time_source ) - (reset_event ()) - (server_side_info ()))))) |}]; + (state Waiting_on_first_request) + (updated (has_any_waiters false)) + (time_source ))))) |}]; (* Receiving a response allows us to send another request. *) Rate_limiter.notify_response rate_limiter @@ -96,11 +92,12 @@ let%expect_test _ = (is_ready false) (rate_limiter ( By_headers ( - (ready ()) - (time_source ) - (reset_event ()) - (server_side_info (( - (remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z))))))))) |}]; + (state ( + Known ( + (remaining_api_calls 0) + (reset_time (1970-01-01 00:10:00.000000000Z))))) + (updated (has_any_waiters false)) + (time_source ))))) |}]; (* Receiving a response for the same reset period will not increase our limit remaining. *) Rate_limiter.notify_response rate_limiter @@ -112,11 +109,12 @@ let%expect_test _ = (is_ready false) (rate_limiter ( By_headers ( - (ready ()) - (time_source ) - (reset_event ()) - (server_side_info (( - (remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z))))))))) |}]; + (state ( + Known ( + (remaining_api_calls 0) + (reset_time (1970-01-01 00:10:00.000000000Z))))) + (updated (has_any_waiters false)) + (time_source ))))) |}]; (* Moving to the next period increases our remaining limit. *) Rate_limiter.notify_response rate_limiter @@ -128,11 +126,12 @@ let%expect_test _ = (is_ready true) (rate_limiter ( By_headers ( - (ready (())) - (time_source ) - (reset_event ()) - (server_side_info (( - (remaining_api_calls 10) (reset_time (1970-01-01 00:20:00.000000000Z))))))))) |}]; + (state ( + Known ( + (remaining_api_calls 10) + (reset_time (1970-01-01 00:20:00.000000000Z))))) + (updated (has_any_waiters false)) + (time_source ))))) |}]; (* Exhausting the remaining limit causes us to be not-ready. *) let%bind () = Deferred.repeat_until_finished 10 (function @@ -149,11 +148,12 @@ let%expect_test _ = (is_ready false) (rate_limiter ( By_headers ( - (ready ()) - (time_source ) - (reset_event ()) - (server_side_info (( - (remaining_api_calls 0) (reset_time (1970-01-01 00:20:00.000000000Z))))))))) |}]; + (state ( + Known ( + (remaining_api_calls 0) + (reset_time (1970-01-01 00:20:00.000000000Z))))) + (updated (has_any_waiters false)) + (time_source ))))) |}]; (* Advancing the time allows us to send another request. *) let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 20) in print (); @@ -163,11 +163,12 @@ let%expect_test _ = (is_ready true) (rate_limiter ( By_headers ( - (ready (())) - (time_source ) - (reset_event ()) - (server_side_info (( - (remaining_api_calls 0) (reset_time (1970-01-01 00:20:00.000000000Z))))))))) |}]; + (state ( + Known ( + (remaining_api_calls 0) + (reset_time (1970-01-01 00:20:00.000000000Z))))) + (updated (has_any_waiters false)) + (time_source ))))) |}]; (* Advancing past the reset time before we receive a response does not result in a double reset. *) let%bind () = Rate_limiter.permit_request rate_limiter in @@ -183,11 +184,12 @@ let%expect_test _ = (is_ready false) (rate_limiter ( By_headers ( - (ready ()) - (time_source ) - (reset_event ()) - (server_side_info (( - (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}]; + (state ( + Known ( + (remaining_api_calls 0) + (reset_time (1970-01-01 00:30:00.000000000Z))))) + (updated (has_any_waiters false)) + (time_source ))))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 30) in print (); [%expect @@ -196,24 +198,26 @@ let%expect_test _ = (is_ready true) (rate_limiter ( By_headers ( - (ready (())) - (time_source ) - (reset_event ()) - (server_side_info (( - (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}]; + (state ( + Known ( + (remaining_api_calls 0) + (reset_time (1970-01-01 00:30:00.000000000Z))))) + (updated (has_any_waiters false)) + (time_source ))))) |}]; let%bind () = Rate_limiter.permit_request rate_limiter in print (); [%expect {| ((time (1970-01-01 00:30:00.000000000Z)) - (is_ready false) + (is_ready true) (rate_limiter ( By_headers ( - (ready ()) - (time_source ) - (reset_event ()) - (server_side_info (( - (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}]; + (state ( + Known ( + (remaining_api_calls 0) + (reset_time (1970-01-01 00:30:00.000000000Z))))) + (updated (has_any_waiters false)) + (time_source ))))) |}]; Rate_limiter.notify_response rate_limiter (build_header ~server_time:(00 ^: 29) ~limit_remaining:1); @@ -221,26 +225,28 @@ let%expect_test _ = [%expect {| ((time (1970-01-01 00:30:00.000000000Z)) - (is_ready false) + (is_ready true) (rate_limiter ( By_headers ( - (ready ()) - (time_source ) - (reset_event ()) - (server_side_info (( - (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}]; + (state ( + Known ( + (remaining_api_calls 0) + (reset_time (1970-01-01 00:30:00.000000000Z))))) + (updated (has_any_waiters false)) + (time_source ))))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 31) in print (); [%expect {| ((time (1970-01-01 00:31:00.000000000Z)) - (is_ready false) + (is_ready true) (rate_limiter ( By_headers ( - (ready ()) - (time_source ) - (reset_event ()) - (server_side_info (( - (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}]; + (state ( + Known ( + (remaining_api_calls 0) + (reset_time (1970-01-01 00:30:00.000000000Z))))) + (updated (has_any_waiters false)) + (time_source ))))) |}]; return () ;; From 8d3cb5faa65fada9bddacbfe153136805f2d0b35 Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Wed, 10 Jan 2024 19:46:09 -0500 Subject: [PATCH 06/16] Rate limiter: simplify handling of reset time --- reddit_api_async/rate_limiter.ml | 74 ++++++++++++++++++------------- test/test_comment_fields.ml | 4 +- test/test_links_and_comments.ml | 10 +++-- test/test_rate_limiter.ml | 41 +++++++++-------- test/test_set_subreddit_sticky.ml | 2 +- 5 files changed, 76 insertions(+), 55 deletions(-) diff --git a/reddit_api_async/rate_limiter.ml b/reddit_api_async/rate_limiter.ml index be2f4726..4074dfd8 100644 --- a/reddit_api_async/rate_limiter.ml +++ b/reddit_api_async/rate_limiter.ml @@ -76,8 +76,7 @@ module By_headers = struct } [@@deriving sexp, fields] - let snap_to_nearest_minute time = - let interval = Time_ns.Span.minute in + let snap_to_nearest interval time = let base = Time_ns.epoch in let candidates = [ Time_ns.prev_multiple ~can_equal_before:true ~interval ~base ~before:time () @@ -95,7 +94,8 @@ module By_headers = struct let%expect_test _ = List.iter [ "2020-11-30 18:48:01.02Z"; "2020-11-30 18:47:59.02Z" ] ~f:(fun time -> let time = Time_ns_unix.of_string time in - print_s [%sexp (snap_to_nearest_minute time : Time_ns.Alternate_sexp.t)]); + print_s + [%sexp (snap_to_nearest Time_ns.Span.minute time : Time_ns.Alternate_sexp.t)]); [%expect {| "2020-11-30 18:48:00Z" "2020-11-30 18:48:00Z" |}]; @@ -143,7 +143,10 @@ module By_headers = struct let%bind relative_reset_time = get_header "X-Ratelimit-Reset" >>| Int.of_string >>| Time_ns.Span.of_int_sec in - Some (snap_to_nearest_minute (Time_ns.add server_time relative_reset_time)) + Some + (snap_to_nearest + Time_ns.Span.minute + (Time_ns.add server_time relative_reset_time)) in Some { remaining_api_calls; reset_time } ;; @@ -165,13 +168,26 @@ module By_headers = struct ;; let freshest = Base.Comparable.max compare_by_inferred_age + + let state_at_start_of_window ~representative_time = + let reset_time = + Time_ns.next_multiple + ~can_equal_after:false + ~interval:(Time_ns.Span.of_int_min 10) + ~base:Time_ns.epoch + ~after:representative_time + () + in + let remaining_api_calls = 996 in + { remaining_api_calls; reset_time } + ;; end module State = struct type t = | Created | Waiting_on_first_request - | Known of Server_side_info.t + | Consuming_rate_limit of Server_side_info.t [@@deriving sexp_of] end @@ -191,38 +207,38 @@ module By_headers = struct match state with | Created -> true | Waiting_on_first_request -> false - | Known { remaining_api_calls; reset_time } -> + | Consuming_rate_limit { remaining_api_calls; reset_time } -> remaining_api_calls > 0 || Time_ns.( >= ) (Time_source.now time_source) reset_time ;; - let wait_until_ready { state; time_source; updated } = + let wait_until_ready t = Deferred.repeat_until_finished () (fun () -> - match state with + match t.state with | Created -> return (`Finished ()) | Waiting_on_first_request -> - let%bind () = Bvar.wait updated in + let%bind () = Bvar.wait t.updated in return (`Repeat ()) - | Known { remaining_api_calls; reset_time } -> - (match - remaining_api_calls > 0 - || Time_ns.( >= ) (Time_source.now time_source) reset_time - with - | true -> return (`Finished ()) + | Consuming_rate_limit { remaining_api_calls; reset_time } -> + let now = Time_source.now t.time_source in + (* TODO: Move above? *) + (match Time_ns.( >= ) now reset_time with + | true -> + t.state + <- Consuming_rate_limit + (Server_side_info.state_at_start_of_window ~representative_time:now); + return (`Finished ()) | false -> - let%bind () = - Deferred.any - [ (* TODO reset time needs to actually... reset things*) - Time_source.at time_source reset_time - ; Bvar.wait updated - ] - in - return (`Repeat ()))) + (match remaining_api_calls > 0 with + | true -> return (`Finished ()) + | false -> + let%bind () = Time_source.at t.time_source reset_time in + return (`Repeat ())))) ;; (* TODO: Allow retries once per minute? *) let update_server_side_info t ~new_server_side_info = - t.state <- Known new_server_side_info; + t.state <- Consuming_rate_limit new_server_side_info; Bvar.broadcast t.updated (); match ( new_server_side_info.remaining_api_calls > 0 @@ -239,11 +255,11 @@ module By_headers = struct let permit_request t = let%bind () = wait_until_ready t in match t.state with - | Waiting_on_first_request -> assert false + | Waiting_on_first_request -> (* TODO is this impossible? *) assert false | Created -> t.state <- Waiting_on_first_request; return () - | Known ({ remaining_api_calls; _ } as server_side_info) -> + | Consuming_rate_limit ({ remaining_api_calls; _ } as server_side_info) -> let new_server_side_info : Server_side_info.t = match remaining_api_calls with | 0 -> server_side_info @@ -265,11 +281,9 @@ module By_headers = struct | Some response_server_side_info -> let new_server_side_info = match t.state with - | Created -> - (* TODO: this could happen if the module is misused *) - assert false + | Created -> raise_s [%message "[notify_response] called before [permit_request]"] | Waiting_on_first_request -> response_server_side_info - | Known server_side_info -> + | Consuming_rate_limit server_side_info -> (match Comparable.lift [%compare: Time_ns.t] diff --git a/test/test_comment_fields.ml b/test/test_comment_fields.ml index 26b14bf9..33eb52ba 100644 --- a/test/test_comment_fields.ml +++ b/test/test_comment_fields.ml @@ -27,7 +27,9 @@ let%expect_test "comment_fields" = Set.symmetric_diff keys_from_comment_page keys_from_info_page |> Sequence.to_list in print_s [%sexp (diff : (string, string) Either.t list)]; - [%expect {| ((First depth)) |}]; + [%expect {| + ("Rate limit is resetting"(old_remaining_api_calls 995)) + ((First depth)) |}]; print_s [%sexp (Thing.Comment.depth first_comment : int option)]; [%expect {| (0) |}]; print_s diff --git a/test/test_links_and_comments.ml b/test/test_links_and_comments.ml index 45a48c5e..9c5a2a62 100644 --- a/test/test_links_and_comments.ml +++ b/test/test_links_and_comments.ml @@ -27,7 +27,7 @@ let%expect_test "unsave" = let%bind () = Connection.call_exn connection (Endpoint.unsave ~id) in (* Unsave is idempotent *) let%bind () = Connection.call_exn connection (Endpoint.unsave ~id) in - [%expect {| |}]; + [%expect {| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; return ()) ;; @@ -41,7 +41,7 @@ let%expect_test "send_replies" = let%bind () = Connection.call_exn connection (Endpoint.send_replies ~id ~enabled:false) in - [%expect]; + [%expect{| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; return ()) ;; @@ -54,7 +54,7 @@ let%expect_test "set_contest_mode" = let%bind () = Connection.call_exn connection (Endpoint.set_contest_mode ~link ~enabled:false) in - [%expect]; + [%expect{| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; return ()) ;; @@ -86,6 +86,8 @@ let%expect_test "vote" = let%bind () = Connection.call_exn connection (Endpoint.vote () ~target ~direction:Up) in - [%expect]; + [%expect{| + ("Rate limit is resetting"(old_remaining_api_calls 995)) + ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; return ()) ;; diff --git a/test/test_rate_limiter.ml b/test/test_rate_limiter.ml index 0ec72fae..e75f7649 100644 --- a/test/test_rate_limiter.ml +++ b/test/test_rate_limiter.ml @@ -93,7 +93,7 @@ let%expect_test _ = (rate_limiter ( By_headers ( (state ( - Known ( + Consuming_rate_limit ( (remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z))))) (updated (has_any_waiters false)) @@ -110,7 +110,7 @@ let%expect_test _ = (rate_limiter ( By_headers ( (state ( - Known ( + Consuming_rate_limit ( (remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z))))) (updated (has_any_waiters false)) @@ -127,7 +127,7 @@ let%expect_test _ = (rate_limiter ( By_headers ( (state ( - Known ( + Consuming_rate_limit ( (remaining_api_calls 10) (reset_time (1970-01-01 00:20:00.000000000Z))))) (updated (has_any_waiters false)) @@ -141,6 +141,7 @@ let%expect_test _ = return (`Repeat (n - 1))) in print (); + (* TODO It's not actually resetting: It's just exhausted. *) [%expect {| ("Rate limit is resetting"(old_remaining_api_calls 0)) @@ -149,13 +150,16 @@ let%expect_test _ = (rate_limiter ( By_headers ( (state ( - Known ( + Consuming_rate_limit ( (remaining_api_calls 0) (reset_time (1970-01-01 00:20:00.000000000Z))))) (updated (has_any_waiters false)) (time_source ))))) |}]; + let ready_deferred = Rate_limiter.permit_request rate_limiter in + [%expect {| |}]; (* Advancing the time allows us to send another request. *) let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 20) in + let%bind () = ready_deferred in print (); [%expect {| @@ -164,9 +168,9 @@ let%expect_test _ = (rate_limiter ( By_headers ( (state ( - Known ( - (remaining_api_calls 0) - (reset_time (1970-01-01 00:20:00.000000000Z))))) + Consuming_rate_limit ( + (remaining_api_calls 995) + (reset_time (1970-01-01 00:30:00.000000000Z))))) (updated (has_any_waiters false)) (time_source ))))) |}]; (* Advancing past the reset time before we receive a response does not result @@ -179,13 +183,12 @@ let%expect_test _ = print (); [%expect {| - ("Rate limit is resetting"(old_remaining_api_calls 0)) ((time (1970-01-01 00:20:00.000000000Z)) (is_ready false) (rate_limiter ( By_headers ( (state ( - Known ( + Consuming_rate_limit ( (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))) (updated (has_any_waiters false)) @@ -199,7 +202,7 @@ let%expect_test _ = (rate_limiter ( By_headers ( (state ( - Known ( + Consuming_rate_limit ( (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))) (updated (has_any_waiters false)) @@ -213,9 +216,9 @@ let%expect_test _ = (rate_limiter ( By_headers ( (state ( - Known ( - (remaining_api_calls 0) - (reset_time (1970-01-01 00:30:00.000000000Z))))) + Consuming_rate_limit ( + (remaining_api_calls 995) + (reset_time (1970-01-01 00:40:00.000000000Z))))) (updated (has_any_waiters false)) (time_source ))))) |}]; Rate_limiter.notify_response @@ -229,9 +232,9 @@ let%expect_test _ = (rate_limiter ( By_headers ( (state ( - Known ( - (remaining_api_calls 0) - (reset_time (1970-01-01 00:30:00.000000000Z))))) + Consuming_rate_limit ( + (remaining_api_calls 995) + (reset_time (1970-01-01 00:40:00.000000000Z))))) (updated (has_any_waiters false)) (time_source ))))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 31) in @@ -243,9 +246,9 @@ let%expect_test _ = (rate_limiter ( By_headers ( (state ( - Known ( - (remaining_api_calls 0) - (reset_time (1970-01-01 00:30:00.000000000Z))))) + Consuming_rate_limit ( + (remaining_api_calls 995) + (reset_time (1970-01-01 00:40:00.000000000Z))))) (updated (has_any_waiters false)) (time_source ))))) |}]; return () diff --git a/test/test_set_subreddit_sticky.ml b/test/test_set_subreddit_sticky.ml index 16ad0998..6587374a 100644 --- a/test/test_set_subreddit_sticky.ml +++ b/test/test_set_subreddit_sticky.ml @@ -19,6 +19,6 @@ let%expect_test "set_subreddit_sticky" = connection (Endpoint.set_subreddit_sticky () ~link ~sticky_state:Unsticky) in - [%expect]; + [%expect{| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; return ()) ;; From 4615b695f7cd82da75a999805a6b93c9c1fe326f Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Fri, 12 Jan 2024 21:57:08 -0500 Subject: [PATCH 07/16] Rate limiter: track received_response directly on state --- reddit_api_async/rate_limiter.ml | 27 +++++++++++++-------------- test/test_comment_fields.ml | 3 ++- test/test_links_and_comments.ml | 7 ++++--- test/test_rate_limiter.ml | 16 +++------------- test/test_set_subreddit_sticky.ml | 2 +- 5 files changed, 23 insertions(+), 32 deletions(-) diff --git a/reddit_api_async/rate_limiter.ml b/reddit_api_async/rate_limiter.ml index 4074dfd8..454f75dc 100644 --- a/reddit_api_async/rate_limiter.ml +++ b/reddit_api_async/rate_limiter.ml @@ -186,27 +186,23 @@ module By_headers = struct module State = struct type t = | Created - | Waiting_on_first_request + | Waiting_on_first_request of { received_response : unit Ivar.t } | Consuming_rate_limit of Server_side_info.t [@@deriving sexp_of] end type t = { mutable state : State.t - ; updated : (unit, read_write) Bvar.t ; time_source : (Time_source.t[@sexp.opaque]) } [@@deriving sexp_of] - let create ~time_source = - let updated = Bvar.create () in - { time_source; state = Created; updated } - ;; + let create ~time_source = { time_source; state = Created } let is_ready { state; time_source; _ } = match state with | Created -> true - | Waiting_on_first_request -> false + | Waiting_on_first_request _ -> false | Consuming_rate_limit { remaining_api_calls; reset_time } -> remaining_api_calls > 0 || Time_ns.( >= ) (Time_source.now time_source) reset_time ;; @@ -215,12 +211,11 @@ module By_headers = struct Deferred.repeat_until_finished () (fun () -> match t.state with | Created -> return (`Finished ()) - | Waiting_on_first_request -> - let%bind () = Bvar.wait t.updated in + | Waiting_on_first_request { received_response } -> + let%bind () = Ivar.read received_response in return (`Repeat ()) | Consuming_rate_limit { remaining_api_calls; reset_time } -> let now = Time_source.now t.time_source in - (* TODO: Move above? *) (match Time_ns.( >= ) now reset_time with | true -> t.state @@ -238,8 +233,11 @@ module By_headers = struct (* TODO: Allow retries once per minute? *) let update_server_side_info t ~new_server_side_info = + (match t.state with + | State.Created | State.Consuming_rate_limit _ -> () + | State.Waiting_on_first_request { received_response } -> + Ivar.fill_if_empty received_response ()); t.state <- Consuming_rate_limit new_server_side_info; - Bvar.broadcast t.updated (); match ( new_server_side_info.remaining_api_calls > 0 , Time_ns.equal new_server_side_info.reset_time Time_ns.epoch ) @@ -255,9 +253,10 @@ module By_headers = struct let permit_request t = let%bind () = wait_until_ready t in match t.state with - | Waiting_on_first_request -> (* TODO is this impossible? *) assert false + | Waiting_on_first_request _ -> (* TODO is this impossible? *) assert false | Created -> - t.state <- Waiting_on_first_request; + let received_response = Ivar.create () in + t.state <- Waiting_on_first_request { received_response }; return () | Consuming_rate_limit ({ remaining_api_calls; _ } as server_side_info) -> let new_server_side_info : Server_side_info.t = @@ -282,7 +281,7 @@ module By_headers = struct let new_server_side_info = match t.state with | Created -> raise_s [%message "[notify_response] called before [permit_request]"] - | Waiting_on_first_request -> response_server_side_info + | Waiting_on_first_request _ -> response_server_side_info | Consuming_rate_limit server_side_info -> (match Comparable.lift diff --git a/test/test_comment_fields.ml b/test/test_comment_fields.ml index 33eb52ba..13a92289 100644 --- a/test/test_comment_fields.ml +++ b/test/test_comment_fields.ml @@ -27,7 +27,8 @@ let%expect_test "comment_fields" = Set.symmetric_diff keys_from_comment_page keys_from_info_page |> Sequence.to_list in print_s [%sexp (diff : (string, string) Either.t list)]; - [%expect {| + [%expect + {| ("Rate limit is resetting"(old_remaining_api_calls 995)) ((First depth)) |}]; print_s [%sexp (Thing.Comment.depth first_comment : int option)]; diff --git a/test/test_links_and_comments.ml b/test/test_links_and_comments.ml index 9c5a2a62..cdd311f9 100644 --- a/test/test_links_and_comments.ml +++ b/test/test_links_and_comments.ml @@ -41,7 +41,7 @@ let%expect_test "send_replies" = let%bind () = Connection.call_exn connection (Endpoint.send_replies ~id ~enabled:false) in - [%expect{| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; + [%expect {| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; return ()) ;; @@ -54,7 +54,7 @@ let%expect_test "set_contest_mode" = let%bind () = Connection.call_exn connection (Endpoint.set_contest_mode ~link ~enabled:false) in - [%expect{| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; + [%expect {| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; return ()) ;; @@ -86,7 +86,8 @@ let%expect_test "vote" = let%bind () = Connection.call_exn connection (Endpoint.vote () ~target ~direction:Up) in - [%expect{| + [%expect + {| ("Rate limit is resetting"(old_remaining_api_calls 995)) ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; return ()) diff --git a/test/test_rate_limiter.ml b/test/test_rate_limiter.ml index e75f7649..3116698b 100644 --- a/test/test_rate_limiter.ml +++ b/test/test_rate_limiter.ml @@ -67,7 +67,8 @@ let%expect_test _ = (is_ready true) (rate_limiter ( By_headers ( - (state Created) (updated (has_any_waiters false)) (time_source ))))) |}]; + (state Created) + (time_source ))))) |}]; (* Initially we can permit one request. *) let%bind () = Rate_limiter.permit_request rate_limiter in print (); @@ -77,8 +78,7 @@ let%expect_test _ = (is_ready false) (rate_limiter ( By_headers ( - (state Waiting_on_first_request) - (updated (has_any_waiters false)) + (state (Waiting_on_first_request (received_response Empty))) (time_source ))))) |}]; (* Receiving a response allows us to send another request. *) Rate_limiter.notify_response @@ -96,7 +96,6 @@ let%expect_test _ = Consuming_rate_limit ( (remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z))))) - (updated (has_any_waiters false)) (time_source ))))) |}]; (* Receiving a response for the same reset period will not increase our limit remaining. *) Rate_limiter.notify_response @@ -113,7 +112,6 @@ let%expect_test _ = Consuming_rate_limit ( (remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z))))) - (updated (has_any_waiters false)) (time_source ))))) |}]; (* Moving to the next period increases our remaining limit. *) Rate_limiter.notify_response @@ -130,7 +128,6 @@ let%expect_test _ = Consuming_rate_limit ( (remaining_api_calls 10) (reset_time (1970-01-01 00:20:00.000000000Z))))) - (updated (has_any_waiters false)) (time_source ))))) |}]; (* Exhausting the remaining limit causes us to be not-ready. *) let%bind () = @@ -153,7 +150,6 @@ let%expect_test _ = Consuming_rate_limit ( (remaining_api_calls 0) (reset_time (1970-01-01 00:20:00.000000000Z))))) - (updated (has_any_waiters false)) (time_source ))))) |}]; let ready_deferred = Rate_limiter.permit_request rate_limiter in [%expect {| |}]; @@ -171,7 +167,6 @@ let%expect_test _ = Consuming_rate_limit ( (remaining_api_calls 995) (reset_time (1970-01-01 00:30:00.000000000Z))))) - (updated (has_any_waiters false)) (time_source ))))) |}]; (* Advancing past the reset time before we receive a response does not result in a double reset. *) @@ -191,7 +186,6 @@ let%expect_test _ = Consuming_rate_limit ( (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))) - (updated (has_any_waiters false)) (time_source ))))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 30) in print (); @@ -205,7 +199,6 @@ let%expect_test _ = Consuming_rate_limit ( (remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))) - (updated (has_any_waiters false)) (time_source ))))) |}]; let%bind () = Rate_limiter.permit_request rate_limiter in print (); @@ -219,7 +212,6 @@ let%expect_test _ = Consuming_rate_limit ( (remaining_api_calls 995) (reset_time (1970-01-01 00:40:00.000000000Z))))) - (updated (has_any_waiters false)) (time_source ))))) |}]; Rate_limiter.notify_response rate_limiter @@ -235,7 +227,6 @@ let%expect_test _ = Consuming_rate_limit ( (remaining_api_calls 995) (reset_time (1970-01-01 00:40:00.000000000Z))))) - (updated (has_any_waiters false)) (time_source ))))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 31) in print (); @@ -249,7 +240,6 @@ let%expect_test _ = Consuming_rate_limit ( (remaining_api_calls 995) (reset_time (1970-01-01 00:40:00.000000000Z))))) - (updated (has_any_waiters false)) (time_source ))))) |}]; return () ;; diff --git a/test/test_set_subreddit_sticky.ml b/test/test_set_subreddit_sticky.ml index 6587374a..7a1746fe 100644 --- a/test/test_set_subreddit_sticky.ml +++ b/test/test_set_subreddit_sticky.ml @@ -19,6 +19,6 @@ let%expect_test "set_subreddit_sticky" = connection (Endpoint.set_subreddit_sticky () ~link ~sticky_state:Unsticky) in - [%expect{| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; + [%expect {| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; return ()) ;; From 55c9ccbe1c5964055f233d5e5eb4e2b83ee425d5 Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Sat, 13 Jan 2024 12:14:19 -0500 Subject: [PATCH 08/16] Rate limiter: clear up remaining todos --- reddit_api_async/rate_limiter.ml | 8 ++++---- test/test_rate_limiter.ml | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/reddit_api_async/rate_limiter.ml b/reddit_api_async/rate_limiter.ml index 454f75dc..7a520584 100644 --- a/reddit_api_async/rate_limiter.ml +++ b/reddit_api_async/rate_limiter.ml @@ -230,8 +230,6 @@ module By_headers = struct return (`Repeat ())))) ;; - (* TODO: Allow retries once per minute? *) - let update_server_side_info t ~new_server_side_info = (match t.state with | State.Created | State.Consuming_rate_limit _ -> () @@ -253,7 +251,10 @@ module By_headers = struct let permit_request t = let%bind () = wait_until_ready t in match t.state with - | Waiting_on_first_request _ -> (* TODO is this impossible? *) assert false + | Waiting_on_first_request _ -> + raise_s + [%message + "Unexpectedly in [Waiting_on_first_request] state after [wait_until_ready]."] | Created -> let received_response = Ivar.create () in t.state <- Waiting_on_first_request { received_response }; @@ -275,7 +276,6 @@ module By_headers = struct (* We assume that, in the absence of ratelimit headers, we must have hit some authentication failure. As a heuristic to avoid getting stuck, we immediately reset [t.ready]. *) - (* TODO: What to do here? *) t.state <- Created | Some response_server_side_info -> let new_server_side_info = diff --git a/test/test_rate_limiter.ml b/test/test_rate_limiter.ml index 3116698b..c88092e7 100644 --- a/test/test_rate_limiter.ml +++ b/test/test_rate_limiter.ml @@ -117,9 +117,11 @@ let%expect_test _ = Rate_limiter.notify_response rate_limiter (build_header ~server_time:(00 ^: 10) ~limit_remaining:10); + let%bind () = Log.Global.flushed () in print (); [%expect {| + ("Rate limit is resetting"(old_remaining_api_calls 0)) ((time (1970-01-01 00:00:00.000000000Z)) (is_ready true) (rate_limiter ( @@ -138,10 +140,8 @@ let%expect_test _ = return (`Repeat (n - 1))) in print (); - (* TODO It's not actually resetting: It's just exhausted. *) [%expect {| - ("Rate limit is resetting"(old_remaining_api_calls 0)) ((time (1970-01-01 00:00:00.000000000Z)) (is_ready false) (rate_limiter ( From a619f34ab98f48683db3201964c0041dd30bcfac Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Wed, 24 Jan 2024 19:24:07 -0500 Subject: [PATCH 09/16] Split out synchronous aspects of rate limiter to a separate module in reddit_api_kernel --- reddit_api_async/connection.ml | 26 +- reddit_api_async/rate_limiter.ml | 384 ++++--------------------- reddit_api_async/rate_limiter.mli | 4 +- reddit_api_async/reddit_api_async.ml | 6 +- reddit_api_kernel/rate_limiter.ml | 261 +++++++++++++++++ reddit_api_kernel/rate_limiter.mli | 17 ++ reddit_api_kernel/reddit_api_kernel.ml | 1 + test/test_rate_limiter.ml | 120 ++++---- 8 files changed, 420 insertions(+), 399 deletions(-) create mode 100644 reddit_api_kernel/rate_limiter.ml create mode 100644 reddit_api_kernel/rate_limiter.mli diff --git a/reddit_api_async/connection.ml b/reddit_api_async/connection.ml index 432d63df..e25074f2 100644 --- a/reddit_api_async/connection.ml +++ b/reddit_api_async/connection.ml @@ -1,6 +1,8 @@ open! Core open! Async +module Rate_limiter' = Rate_limiter open Reddit_api_kernel +module Rate_limiter = Rate_limiter' module Credentials = struct module Password = struct @@ -382,20 +384,19 @@ type t = T : (module T with type t = 't) * 't -> t let sexp_of_t (T ((module T), t)) = T.sexp_of_t t -let all_rate_limiters ~time_source = - Rate_limiter.combine - [ Rate_limiter.by_headers ~time_source - ; Rate_limiter.with_minimum_delay ~delay:(Time_ns.Span.of_int_ms 100) ~time_source +let all_rate_limiters () = + let module Synchronous_rate_limiter = Reddit_api_kernel.Rate_limiter in + Synchronous_rate_limiter.combine + [ Synchronous_rate_limiter.by_headers () + ; Synchronous_rate_limiter.with_minimum_delay ~delay:(Time_ns.Span.of_int_ms 100) ] ;; let create credentials ~user_agent = - T - ( (module Local) - , Local.create - credentials - ~user_agent - ~rate_limiter:(all_rate_limiters ~time_source:(Time_source.wall_clock ())) ) + let rate_limiter = + Rate_limiter.of_synchronous (all_rate_limiters ()) (Time_source.wall_clock ()) + in + T ((module Local), Local.create credentials ~user_agent ~rate_limiter) ;; let get ?sequence (T ((module T), t)) = T.get ?sequence t @@ -869,6 +870,9 @@ module For_testing = struct | true -> reading filename placeholders | false -> recording filename placeholders in + let rate_limiter = + Rate_limiter.of_synchronous (all_rate_limiters ()) Cassette.time_source + in let connection = T ( (module Local) @@ -876,7 +880,7 @@ module For_testing = struct (module Cassette) credentials ~time_source:Cassette.time_source - ~rate_limiter:(all_rate_limiters ~time_source:Cassette.time_source) ) + ~rate_limiter ) in Monitor.protect (fun () -> f connection) diff --git a/reddit_api_async/rate_limiter.ml b/reddit_api_async/rate_limiter.ml index 7a520584..ca354803 100644 --- a/reddit_api_async/rate_limiter.ml +++ b/reddit_api_async/rate_limiter.ml @@ -1,343 +1,55 @@ open! Core open! Async - -module type S = sig - type t [@@deriving sexp_of] - - val kind : string - val permit_request : t -> unit Deferred.t - val notify_response : t -> Cohttp.Response.t -> unit - val is_ready : t -> bool - val wait_until_ready : t -> unit Deferred.t -end - -type t = T : (module S with type t = 't) * 't -> t - -let sexp_of_t (T ((module S), t)) : Sexp.t = List [ Atom S.kind; [%sexp_of: S.t] t ] -let permit_request (T ((module S), t)) = S.permit_request t -let notify_response (T ((module S), t)) = S.notify_response t -let is_ready (T ((module S), t)) = S.is_ready t -let wait_until_ready (T ((module S), t)) = S.wait_until_ready t - -module With_minimum_delay = struct - type t = - { wait_until : (Time_ns.Alternate_sexp.t option, read_write) Mvar.t - ; time_source : (Time_source.t[@sexp.opaque]) - ; delay : Time_ns.Span.t - } - [@@deriving sexp_of] - - let kind = "With_minimum_delay" - - let create ~delay ~time_source = - let wait_until = Mvar.create () in - Mvar.set wait_until None; - { wait_until; delay; time_source } - ;; - - let permit_request { wait_until; delay; time_source } = - let%bind () = - match%bind Mvar.take wait_until with - | None -> return () - | Some wait_until -> Time_source.at time_source wait_until - in - Mvar.set wait_until (Some (Time_ns.add (Time_source.now time_source) delay)); - return () - ;; - - let notify_response (_ : t) (_ : Cohttp.Response.t) = () - - let is_ready { wait_until; time_source; _ } = - match Mvar.peek wait_until with - | None -> false - | Some None -> true - | Some (Some wait_until) -> Time_ns.( >= ) (Time_source.now time_source) wait_until - ;; - - let wait_until_ready { wait_until; time_source; _ } = - let%bind mvar_contents = Mvar.take wait_until in - let%bind () = - match mvar_contents with - | None -> return () - | Some wait_until -> Time_source.at time_source wait_until - in - Mvar.set wait_until mvar_contents; - return () - ;; -end - -module By_headers = struct - let kind = "By_headers" - - module Server_side_info = struct - type t = - { remaining_api_calls : int - ; reset_time : Time_ns_unix.t - } - [@@deriving sexp, fields] - - let snap_to_nearest interval time = - let base = Time_ns.epoch in - let candidates = - [ Time_ns.prev_multiple ~can_equal_before:true ~interval ~base ~before:time () - ; Time_ns.next_multiple ~can_equal_after:false ~interval ~base ~after:time () - ] - in - List.min_elt - candidates - ~compare: - (Comparable.lift Time_ns.Span.compare ~f:(fun time' -> - Time_ns.abs_diff time time')) - |> Option.value_exn - ;; - - let%expect_test _ = - List.iter [ "2020-11-30 18:48:01.02Z"; "2020-11-30 18:47:59.02Z" ] ~f:(fun time -> - let time = Time_ns_unix.of_string time in - print_s - [%sexp (snap_to_nearest Time_ns.Span.minute time : Time_ns.Alternate_sexp.t)]); - [%expect {| - "2020-11-30 18:48:00Z" - "2020-11-30 18:48:00Z" |}]; - return () - ;; - - let parse_http_header_date date_string = - Scanf.sscanf - date_string - "%3s, %2d %3s %4d %2d:%2d:%2d GMT" - (fun day_of_week d month y hr min sec -> - let day_of_week = Day_of_week.of_string day_of_week in - let month = Month.of_string month in - let date = Date.create_exn ~y ~m:month ~d in - (match Day_of_week.equal day_of_week (Date.day_of_week date) with - | true -> () - | false -> - raise_s - [%message - "HTTP response: Day of week did not match parsed date" - (day_of_week : Day_of_week.t) - (date : Date.t) - (date_string : string)]); - let ofday = Time_ns.Ofday.create ~hr ~min ~sec () in - Time_ns.of_date_ofday date ofday ~zone:Time_ns_unix.Zone.utc) - ;; - - let%expect_test _ = - print_s - [%sexp - (parse_http_header_date "Wed, 21 Oct 2015 07:28:00 GMT" - : Time_ns.Alternate_sexp.t)]; - [%expect {| "2015-10-21 07:28:00Z" |}]; - return () - ;; - - let t_of_headers headers = - let open Option.Let_syntax in - let get_header header = Cohttp.Header.get headers header in - let%bind remaining_api_calls = - get_header "X-Ratelimit-Remaining" >>| Float.of_string >>| Int.of_float - in - let%bind reset_time = - let%bind server_time = get_header "Date" >>| parse_http_header_date in - let%bind relative_reset_time = - get_header "X-Ratelimit-Reset" >>| Int.of_string >>| Time_ns.Span.of_int_sec - in - Some - (snap_to_nearest - Time_ns.Span.minute - (Time_ns.add server_time relative_reset_time)) - in - Some { remaining_api_calls; reset_time } - ;; - - let compare_by_inferred_age = - (* We use reset time instead of the "Date" header because we might have - sent some requests for which we have yet to receive a response. In that - case, the most authoritative picture of our remaining requests is not - the most recent response, but rather our previous state adjusted by the - number of requests we've sent. - - We therefore prefer the state with fewer requests remaining when two - states are for the same reset period. - *) - Comparable.lexicographic - [ Comparable.lift Time_ns.compare ~f:reset_time - ; Comparable.reverse (Comparable.lift compare ~f:remaining_api_calls) - ] - ;; - - let freshest = Base.Comparable.max compare_by_inferred_age - - let state_at_start_of_window ~representative_time = - let reset_time = - Time_ns.next_multiple - ~can_equal_after:false - ~interval:(Time_ns.Span.of_int_min 10) - ~base:Time_ns.epoch - ~after:representative_time - () - in - let remaining_api_calls = 996 in - { remaining_api_calls; reset_time } - ;; - end - - module State = struct - type t = - | Created - | Waiting_on_first_request of { received_response : unit Ivar.t } - | Consuming_rate_limit of Server_side_info.t - [@@deriving sexp_of] - end - - type t = - { mutable state : State.t - ; time_source : (Time_source.t[@sexp.opaque]) - } - [@@deriving sexp_of] - - let create ~time_source = { time_source; state = Created } - - let is_ready { state; time_source; _ } = - match state with - | Created -> true - | Waiting_on_first_request _ -> false - | Consuming_rate_limit { remaining_api_calls; reset_time } -> - remaining_api_calls > 0 || Time_ns.( >= ) (Time_source.now time_source) reset_time - ;; - - let wait_until_ready t = - Deferred.repeat_until_finished () (fun () -> - match t.state with - | Created -> return (`Finished ()) - | Waiting_on_first_request { received_response } -> - let%bind () = Ivar.read received_response in - return (`Repeat ()) - | Consuming_rate_limit { remaining_api_calls; reset_time } -> - let now = Time_source.now t.time_source in - (match Time_ns.( >= ) now reset_time with - | true -> - t.state - <- Consuming_rate_limit - (Server_side_info.state_at_start_of_window ~representative_time:now); - return (`Finished ()) - | false -> - (match remaining_api_calls > 0 with - | true -> return (`Finished ()) - | false -> - let%bind () = Time_source.at t.time_source reset_time in - return (`Repeat ())))) - ;; - - let update_server_side_info t ~new_server_side_info = - (match t.state with - | State.Created | State.Consuming_rate_limit _ -> () - | State.Waiting_on_first_request { received_response } -> - Ivar.fill_if_empty received_response ()); - t.state <- Consuming_rate_limit new_server_side_info; - match - ( new_server_side_info.remaining_api_calls > 0 - , Time_ns.equal new_server_side_info.reset_time Time_ns.epoch ) - with - | false, false | true, (true | false) -> () - | false, true -> - [%log.debug - Import.log - "Rate limit exhausted" - ~reset_time:(new_server_side_info.reset_time : Time_ns_unix.t)] - ;; - - let permit_request t = - let%bind () = wait_until_ready t in - match t.state with - | Waiting_on_first_request _ -> - raise_s - [%message - "Unexpectedly in [Waiting_on_first_request] state after [wait_until_ready]."] - | Created -> - let received_response = Ivar.create () in - t.state <- Waiting_on_first_request { received_response }; - return () - | Consuming_rate_limit ({ remaining_api_calls; _ } as server_side_info) -> - let new_server_side_info : Server_side_info.t = - match remaining_api_calls with - | 0 -> server_side_info - | n -> { server_side_info with remaining_api_calls = n - 1 } - in - update_server_side_info t ~new_server_side_info; - return () - ;; - - let notify_response t response = - let headers = Cohttp.Response.headers response in - match Server_side_info.t_of_headers headers with - | None -> - (* We assume that, in the absence of ratelimit headers, we must have hit - some authentication failure. As a heuristic to avoid getting stuck, we - immediately reset [t.ready]. *) - t.state <- Created - | Some response_server_side_info -> - let new_server_side_info = - match t.state with - | Created -> raise_s [%message "[notify_response] called before [permit_request]"] - | Waiting_on_first_request _ -> response_server_side_info - | Consuming_rate_limit server_side_info -> - (match - Comparable.lift - [%compare: Time_ns.t] - ~f:Server_side_info.reset_time - server_side_info - response_server_side_info - |> Ordering.of_int - with - | Greater | Equal -> () - | Less -> - [%log.debug - Import.log - "Rate limit is resetting" - ~old_remaining_api_calls:(server_side_info.remaining_api_calls : int)]); - Server_side_info.freshest server_side_info response_server_side_info - in - update_server_side_info t ~new_server_side_info - ;; -end - -let by_headers ~time_source = T ((module By_headers), By_headers.create ~time_source) - -let with_minimum_delay ~time_source ~delay = - T ((module With_minimum_delay), With_minimum_delay.create ~time_source ~delay) +module Synchronous_rate_limiter = Reddit_api_kernel.Rate_limiter + +type t = + { state : Synchronous_rate_limiter.t + ; response_received : (unit, read_write) Bvar.t + ; time_source : (Time_source.t[@sexp.opaque]) + } +[@@deriving sexp_of] + +let of_synchronous state time_source = + let response_received = Bvar.create () in + { state; response_received; time_source } ;; -module Combined = struct - type nonrec t = - { ts : t list - ; sequencer : unit Throttle.Sequencer.t - } - [@@deriving sexp_of] - - let kind = "Combined" - - let create ts = - let sequencer = Throttle.Sequencer.create () in - { ts; sequencer } - ;; - - let permit_request { ts; sequencer } = - Throttle.enqueue sequencer (fun () -> - Deferred.all_unit (List.map ts ~f:permit_request)) - ;; - - let notify_response { ts; _ } headers = - List.iter ts ~f:(fun t -> notify_response t headers) - ;; +let is_ready t = + let now = Time_source.now t.time_source in + match Synchronous_rate_limiter.wait_until t.state with + | Now -> true + | Check_after_receiving_response -> false + | After time -> Time_ns.( >= ) now time +;; - let is_ready { ts; _ } = List.for_all ts ~f:is_ready +let wait_until_ready t = + Deferred.repeat_until_finished () (fun () -> + match Synchronous_rate_limiter.wait_until t.state with + | Now -> return (`Finished ()) + | After time -> + (match Time_ns.( >= ) (Time_source.now t.time_source) time with + | true -> return (`Finished ()) + | false -> + let%bind () = Time_source.at t.time_source time in + return (`Repeat ())) + | Check_after_receiving_response -> + let%bind () = Bvar.wait t.response_received in + return (`Repeat ())) +;; - let wait_until_ready { ts; sequencer } = - Throttle.enqueue sequencer (fun () -> - Deferred.all_unit (List.map ts ~f:wait_until_ready)) - ;; -end +let permit_request t = + Deferred.repeat_until_finished () (fun () -> + let%bind () = wait_until_ready t in + match is_ready t with + | false -> return (`Repeat ()) + | true -> + Synchronous_rate_limiter.sent_request_unchecked + t.state + ~now:(Time_source.now t.time_source); + return (`Finished ())) +;; -let combine ts = T ((module Combined), Combined.create ts) +let notify_response t response = + Synchronous_rate_limiter.received_response t.state response; + Bvar.broadcast t.response_received () +;; diff --git a/reddit_api_async/rate_limiter.mli b/reddit_api_async/rate_limiter.mli index 56a8126d..b22875e1 100644 --- a/reddit_api_async/rate_limiter.mli +++ b/reddit_api_async/rate_limiter.mli @@ -3,9 +3,7 @@ open! Async type t [@@deriving sexp_of] -val by_headers : time_source:Time_source.t -> t -val with_minimum_delay : time_source:Time_source.t -> delay:Time_ns.Span.t -> t -val combine : t list -> t +val of_synchronous : Reddit_api_kernel.Rate_limiter.t -> Time_source.t -> t val permit_request : t -> unit Deferred.t val notify_response : t -> Cohttp.Response.t -> unit val is_ready : t -> bool diff --git a/reddit_api_async/reddit_api_async.ml b/reddit_api_async/reddit_api_async.ml index 861b7325..a70dcb69 100644 --- a/reddit_api_async/reddit_api_async.ml +++ b/reddit_api_async/reddit_api_async.ml @@ -1,7 +1,11 @@ +open struct + module Rate_limiter' = Rate_limiter +end + include Reddit_api_kernel module Connection = Connection module Iter_comments = Iter_comments -module Rate_limiter = Rate_limiter +module Rate_limiter = Rate_limiter' module Retry_manager = Retry_manager module Stream = Stream diff --git a/reddit_api_kernel/rate_limiter.ml b/reddit_api_kernel/rate_limiter.ml new file mode 100644 index 00000000..b1a5e02f --- /dev/null +++ b/reddit_api_kernel/rate_limiter.ml @@ -0,0 +1,261 @@ +open! Core + +module When_to_send = struct + type t = + | Now + | Check_after_receiving_response + | After of Time_ns.t +end + +module type Basic = sig + type t [@@deriving sexp_of] + + val kind : string + val wait_until : t -> When_to_send.t + val sent_request_unchecked : t -> now:Time_ns.t -> unit + val received_response : t -> Cohttp.Response.t -> unit +end + +type t = T : (module Basic with type t = 't) * 't -> t + +let sexp_of_t (T ((module S), t)) : Sexp.t = List [ Atom S.kind; [%sexp_of: S.t] t ] +let wait_until (T ((module S), t)) = S.wait_until t +let sent_request_unchecked (T ((module S), t)) = S.sent_request_unchecked t +let received_response (T ((module S), t)) = S.received_response t + +module With_minimum_delay = struct + type t = + { mutable last_request : Time_ns.Alternate_sexp.t option + ; delay : Time_ns.Span.t + } + [@@deriving sexp_of] + + let kind = "With_minimum_delay" + let create ~delay = { last_request = None; delay } + + let wait_until t : When_to_send.t = + match t.last_request with + | None -> Now + | Some time -> After (Time_ns.add time t.delay) + ;; + + let sent_request_unchecked t ~now = t.last_request <- Some (Time_ns.add now t.delay) + let received_response (_ : t) (_ : Cohttp.Response.t) = () +end + +module By_headers = struct + let kind = "By_headers" + + module Server_side_info = struct + type t = + { remaining_api_calls : int + ; reset_time : Time_ns.Alternate_sexp.t + } + [@@deriving sexp, fields] + + let snap_to_nearest interval time = + let base = Time_ns.epoch in + let candidates = + [ Time_ns.prev_multiple ~can_equal_before:true ~interval ~base ~before:time () + ; Time_ns.next_multiple ~can_equal_after:false ~interval ~base ~after:time () + ] + in + List.min_elt + candidates + ~compare: + (Comparable.lift Time_ns.Span.compare ~f:(fun time' -> + Time_ns.abs_diff time time')) + |> Option.value_exn + ;; + + let%expect_test _ = + List.iter [ "2020-11-30 18:48:01.02Z"; "2020-11-30 18:47:59.02Z" ] ~f:(fun time -> + let time = Time_ns.of_string_with_utc_offset time in + print_s + [%sexp (snap_to_nearest Time_ns.Span.minute time : Time_ns.Alternate_sexp.t)]); + [%expect {| + "2020-11-30 18:48:00Z" + "2020-11-30 18:48:00Z" |}] + ;; + + let parse_http_header_date date_string = + Scanf.sscanf + date_string + "%3s, %2d %3s %4d %2d:%2d:%2d GMT" + (fun day_of_week d month y hr min sec -> + let day_of_week = Day_of_week.of_string day_of_week in + let month = Month.of_string month in + let date = Date.create_exn ~y ~m:month ~d in + (match Day_of_week.equal day_of_week (Date.day_of_week date) with + | true -> () + | false -> + raise_s + [%message + "HTTP response: Day of week did not match parsed date" + (day_of_week : Day_of_week.t) + (date : Date.t) + (date_string : string)]); + let ofday = Time_ns.Ofday.create ~hr ~min ~sec () in + Time_ns.of_date_ofday date ofday ~zone:Time_float.Zone.utc) + ;; + + let%expect_test _ = + print_s + [%sexp + (parse_http_header_date "Wed, 21 Oct 2015 07:28:00 GMT" + : Time_ns.Alternate_sexp.t)]; + [%expect {| "2015-10-21 07:28:00Z" |}] + ;; + + let t_of_headers headers = + let open Option.Let_syntax in + let get_header header = Cohttp.Header.get headers header in + let%bind remaining_api_calls = + get_header "X-Ratelimit-Remaining" >>| Float.of_string >>| Int.of_float + in + let%bind reset_time = + let%bind server_time = get_header "Date" >>| parse_http_header_date in + let%bind relative_reset_time = + get_header "X-Ratelimit-Reset" >>| Int.of_string >>| Time_ns.Span.of_int_sec + in + Some + (snap_to_nearest + Time_ns.Span.minute + (Time_ns.add server_time relative_reset_time)) + in + Some { remaining_api_calls; reset_time } + ;; + + let compare_by_inferred_age = + (* We use reset time instead of the "Date" header because we might have + sent some requests for which we have yet to receive a response. In that + case, the most authoritative picture of our remaining requests is not + the most recent response, but rather our previous state adjusted by the + number of requests we've sent. + + We therefore prefer the state with fewer requests remaining when two + states are for the same reset period. + *) + Comparable.lexicographic + [ Comparable.lift Time_ns.compare ~f:reset_time + ; Comparable.reverse (Comparable.lift compare ~f:remaining_api_calls) + ] + ;; + + let freshest = Base.Comparable.max compare_by_inferred_age + + let state_at_start_of_window ~representative_time = + let reset_time = + Time_ns.next_multiple + ~can_equal_after:false + ~interval:(Time_ns.Span.of_int_min 10) + ~base:Time_ns.epoch + ~after:representative_time + () + in + let remaining_api_calls = 996 in + { remaining_api_calls; reset_time } + ;; + end + + module State = struct + type t = + | Created + | Waiting_on_first_request + | Consuming_rate_limit of Server_side_info.t + [@@deriving sexp_of] + end + + type t = State.t ref [@@deriving sexp_of] + + let create () = ref State.Created + + let wait_until (t : t) : When_to_send.t = + match !t with + | Created -> Now + | Waiting_on_first_request -> Check_after_receiving_response + | Consuming_rate_limit { remaining_api_calls; reset_time } -> + (match remaining_api_calls > 0 with + | true -> Now + | false -> After reset_time) + ;; + + let sent_request_unchecked (t : t) ~now = + let new_state : State.t = + match !t with + | Created -> Waiting_on_first_request + | Waiting_on_first_request -> + raise_s + [%message + "[sent_request_unchecked] illegally called in [Waiting_on_first_request] \ + state."] + | Consuming_rate_limit server_side_info -> + let base_server_side_info = + match Time_ns.( <= ) server_side_info.reset_time now with + | false -> server_side_info + | true -> Server_side_info.state_at_start_of_window ~representative_time:now + in + Consuming_rate_limit + { base_server_side_info with + remaining_api_calls = base_server_side_info.remaining_api_calls - 1 + } + in + t := new_state + ;; + + let received_response (t : t) response = + let headers = Cohttp.Response.headers response in + let new_state : State.t = + match Server_side_info.t_of_headers headers with + | None -> + (* We assume that, in the absence of ratelimit headers, we must have hit + some authentication failure. As a heuristic to avoid getting stuck, we + immediately reset [t.ready]. *) + Created + | Some response_server_side_info -> + (match !t with + | Created -> + raise_s [%message "[received_response] called before [sent_request_unchecked]."] + | Waiting_on_first_request -> Consuming_rate_limit response_server_side_info + | Consuming_rate_limit server_side_info -> + Consuming_rate_limit + (Server_side_info.freshest server_side_info response_server_side_info)) + in + t := new_state + ;; +end + +let by_headers () = T ((module By_headers), By_headers.create ()) + +let with_minimum_delay ~delay = + T ((module With_minimum_delay), With_minimum_delay.create ~delay) +;; + +module Combined = struct + type nonrec t = t list [@@deriving sexp_of] + + let kind = "Combined" + let create ts = ts + + let wait_until ts = + match + List.map ts ~f:wait_until + |> List.max_elt ~compare:(fun a b -> + match a, b with + | Now, _ -> -1 + | _, Now -> 1 + | After a, After b -> Time_ns.compare a b + | _, _ -> 0) + with + | Some v -> v + | None -> Now + ;; + + let sent_request_unchecked ts ~now = List.iter ts ~f:(sent_request_unchecked ~now) + + let received_response ts response = + List.iter ts ~f:(fun t -> received_response t response) + ;; +end + +let combine ts = T ((module Combined), Combined.create ts) diff --git a/reddit_api_kernel/rate_limiter.mli b/reddit_api_kernel/rate_limiter.mli new file mode 100644 index 00000000..8cfdb54d --- /dev/null +++ b/reddit_api_kernel/rate_limiter.mli @@ -0,0 +1,17 @@ +open! Core + +module When_to_send : sig + type t = + | Now + | Check_after_receiving_response + | After of Time_ns.t +end + +type t [@@deriving sexp_of] + +val by_headers : unit -> t +val with_minimum_delay : delay:Time_ns.Span.t -> t +val combine : t list -> t +val wait_until : t -> When_to_send.t +val sent_request_unchecked : t -> now:Time_ns.t -> unit +val received_response : t -> Cohttp.Response.t -> unit diff --git a/reddit_api_kernel/reddit_api_kernel.ml b/reddit_api_kernel/reddit_api_kernel.ml index 79a36002..0f5e2f4c 100644 --- a/reddit_api_kernel/reddit_api_kernel.ml +++ b/reddit_api_kernel/reddit_api_kernel.ml @@ -8,6 +8,7 @@ module Listing = Listing module Mod_action = Mod_action module Moderator_report = Moderator_report module Modmail = Modmail +module Rate_limiter = Rate_limiter module Relationship = Relationship module Stylesheet = Stylesheet module Submit_text = Submit_text diff --git a/test/test_rate_limiter.ml b/test/test_rate_limiter.ml index c88092e7..f8131c69 100644 --- a/test/test_rate_limiter.ml +++ b/test/test_rate_limiter.ml @@ -49,7 +49,9 @@ let%expect_test _ = let ( ^: ) hr min = Time_ns.add Time_ns.epoch (Time_ns.Span.create ~hr ~min ()) in let time_source = Time_source.create ~now:(00 ^: 00) () in let rate_limiter = - Rate_limiter.by_headers ~time_source:(Time_source.read_only time_source) + Reddit_api_async.Rate_limiter.of_synchronous + (Reddit_api_kernel.Rate_limiter.by_headers ()) + (Time_source.read_only time_source) in let print () = print_endline @@ -66,9 +68,9 @@ let%expect_test _ = ((time (1970-01-01 00:00:00.000000000Z)) (is_ready true) (rate_limiter ( - By_headers ( - (state Created) - (time_source ))))) |}]; + (state (By_headers Created)) + (response_received (has_any_waiters false)) + (time_source )))) |}]; (* Initially we can permit one request. *) let%bind () = Rate_limiter.permit_request rate_limiter in print (); @@ -77,13 +79,26 @@ let%expect_test _ = ((time (1970-01-01 00:00:00.000000000Z)) (is_ready false) (rate_limiter ( - By_headers ( - (state (Waiting_on_first_request (received_response Empty))) - (time_source ))))) |}]; + (state (By_headers Waiting_on_first_request)) + (response_received (has_any_waiters false)) + (time_source )))) |}]; (* Receiving a response allows us to send another request. *) Rate_limiter.notify_response rate_limiter (build_header ~server_time:(00 ^: 00) ~limit_remaining:1); + print (); + [%expect + {| + ((time (1970-01-01 00:00:00.000000000Z)) + (is_ready true) + (rate_limiter ( + (state ( + By_headers ( + Consuming_rate_limit ( + (remaining_api_calls 1) + (reset_time (1970-01-01 00:10:00.000000000Z)))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; let%bind () = Rate_limiter.permit_request rate_limiter in print (); [%expect @@ -91,12 +106,13 @@ let%expect_test _ = ((time (1970-01-01 00:00:00.000000000Z)) (is_ready false) (rate_limiter ( - By_headers ( - (state ( + (state ( + By_headers ( Consuming_rate_limit ( (remaining_api_calls 0) - (reset_time (1970-01-01 00:10:00.000000000Z))))) - (time_source ))))) |}]; + (reset_time (1970-01-01 00:10:00.000000000Z)))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; (* Receiving a response for the same reset period will not increase our limit remaining. *) Rate_limiter.notify_response rate_limiter @@ -107,12 +123,13 @@ let%expect_test _ = ((time (1970-01-01 00:00:00.000000000Z)) (is_ready false) (rate_limiter ( - By_headers ( - (state ( + (state ( + By_headers ( Consuming_rate_limit ( (remaining_api_calls 0) - (reset_time (1970-01-01 00:10:00.000000000Z))))) - (time_source ))))) |}]; + (reset_time (1970-01-01 00:10:00.000000000Z)))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; (* Moving to the next period increases our remaining limit. *) Rate_limiter.notify_response rate_limiter @@ -121,16 +138,16 @@ let%expect_test _ = print (); [%expect {| - ("Rate limit is resetting"(old_remaining_api_calls 0)) ((time (1970-01-01 00:00:00.000000000Z)) (is_ready true) (rate_limiter ( - By_headers ( - (state ( + (state ( + By_headers ( Consuming_rate_limit ( (remaining_api_calls 10) - (reset_time (1970-01-01 00:20:00.000000000Z))))) - (time_source ))))) |}]; + (reset_time (1970-01-01 00:20:00.000000000Z)))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; (* Exhausting the remaining limit causes us to be not-ready. *) let%bind () = Deferred.repeat_until_finished 10 (function @@ -145,12 +162,13 @@ let%expect_test _ = ((time (1970-01-01 00:00:00.000000000Z)) (is_ready false) (rate_limiter ( - By_headers ( - (state ( + (state ( + By_headers ( Consuming_rate_limit ( (remaining_api_calls 0) - (reset_time (1970-01-01 00:20:00.000000000Z))))) - (time_source ))))) |}]; + (reset_time (1970-01-01 00:20:00.000000000Z)))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; let ready_deferred = Rate_limiter.permit_request rate_limiter in [%expect {| |}]; (* Advancing the time allows us to send another request. *) @@ -162,12 +180,13 @@ let%expect_test _ = ((time (1970-01-01 00:20:00.000000000Z)) (is_ready true) (rate_limiter ( - By_headers ( - (state ( + (state ( + By_headers ( Consuming_rate_limit ( (remaining_api_calls 995) - (reset_time (1970-01-01 00:30:00.000000000Z))))) - (time_source ))))) |}]; + (reset_time (1970-01-01 00:30:00.000000000Z)))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; (* Advancing past the reset time before we receive a response does not result in a double reset. *) let%bind () = Rate_limiter.permit_request rate_limiter in @@ -181,12 +200,13 @@ let%expect_test _ = ((time (1970-01-01 00:20:00.000000000Z)) (is_ready false) (rate_limiter ( - By_headers ( - (state ( + (state ( + By_headers ( Consuming_rate_limit ( (remaining_api_calls 0) - (reset_time (1970-01-01 00:30:00.000000000Z))))) - (time_source ))))) |}]; + (reset_time (1970-01-01 00:30:00.000000000Z)))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 30) in print (); [%expect @@ -194,12 +214,13 @@ let%expect_test _ = ((time (1970-01-01 00:30:00.000000000Z)) (is_ready true) (rate_limiter ( - By_headers ( - (state ( + (state ( + By_headers ( Consuming_rate_limit ( (remaining_api_calls 0) - (reset_time (1970-01-01 00:30:00.000000000Z))))) - (time_source ))))) |}]; + (reset_time (1970-01-01 00:30:00.000000000Z)))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; let%bind () = Rate_limiter.permit_request rate_limiter in print (); [%expect @@ -207,12 +228,13 @@ let%expect_test _ = ((time (1970-01-01 00:30:00.000000000Z)) (is_ready true) (rate_limiter ( - By_headers ( - (state ( + (state ( + By_headers ( Consuming_rate_limit ( (remaining_api_calls 995) - (reset_time (1970-01-01 00:40:00.000000000Z))))) - (time_source ))))) |}]; + (reset_time (1970-01-01 00:40:00.000000000Z)))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; Rate_limiter.notify_response rate_limiter (build_header ~server_time:(00 ^: 29) ~limit_remaining:1); @@ -222,12 +244,13 @@ let%expect_test _ = ((time (1970-01-01 00:30:00.000000000Z)) (is_ready true) (rate_limiter ( - By_headers ( - (state ( + (state ( + By_headers ( Consuming_rate_limit ( (remaining_api_calls 995) - (reset_time (1970-01-01 00:40:00.000000000Z))))) - (time_source ))))) |}]; + (reset_time (1970-01-01 00:40:00.000000000Z)))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 31) in print (); [%expect @@ -235,11 +258,12 @@ let%expect_test _ = ((time (1970-01-01 00:31:00.000000000Z)) (is_ready true) (rate_limiter ( - By_headers ( - (state ( + (state ( + By_headers ( Consuming_rate_limit ( (remaining_api_calls 995) - (reset_time (1970-01-01 00:40:00.000000000Z))))) - (time_source ))))) |}]; + (reset_time (1970-01-01 00:40:00.000000000Z)))))) + (response_received (has_any_waiters false)) + (time_source )))) |}]; return () ;; From 4884d498f94a30ad653c4593d4c76f025456a91f Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Wed, 24 Jan 2024 19:50:06 -0500 Subject: [PATCH 10/16] Make synchronous rate limiter immutable --- reddit_api_async/connection.ml | 2 +- reddit_api_async/rate_limiter.ml | 11 +-- reddit_api_kernel/rate_limiter.ml | 113 ++++++++++++++--------------- reddit_api_kernel/rate_limiter.mli | 6 +- test/test_comment_fields.ml | 4 +- test/test_links_and_comments.ml | 11 +-- test/test_rate_limiter.ml | 24 +++--- test/test_set_subreddit_sticky.ml | 2 +- 8 files changed, 81 insertions(+), 92 deletions(-) diff --git a/reddit_api_async/connection.ml b/reddit_api_async/connection.ml index e25074f2..4053b45d 100644 --- a/reddit_api_async/connection.ml +++ b/reddit_api_async/connection.ml @@ -387,7 +387,7 @@ let sexp_of_t (T ((module T), t)) = T.sexp_of_t t let all_rate_limiters () = let module Synchronous_rate_limiter = Reddit_api_kernel.Rate_limiter in Synchronous_rate_limiter.combine - [ Synchronous_rate_limiter.by_headers () + [ Synchronous_rate_limiter.by_headers ; Synchronous_rate_limiter.with_minimum_delay ~delay:(Time_ns.Span.of_int_ms 100) ] ;; diff --git a/reddit_api_async/rate_limiter.ml b/reddit_api_async/rate_limiter.ml index ca354803..da428aa6 100644 --- a/reddit_api_async/rate_limiter.ml +++ b/reddit_api_async/rate_limiter.ml @@ -3,7 +3,7 @@ open! Async module Synchronous_rate_limiter = Reddit_api_kernel.Rate_limiter type t = - { state : Synchronous_rate_limiter.t + { mutable state : Synchronous_rate_limiter.t ; response_received : (unit, read_write) Bvar.t ; time_source : (Time_source.t[@sexp.opaque]) } @@ -43,13 +43,14 @@ let permit_request t = match is_ready t with | false -> return (`Repeat ()) | true -> - Synchronous_rate_limiter.sent_request_unchecked - t.state - ~now:(Time_source.now t.time_source); + t.state + <- Synchronous_rate_limiter.sent_request_unchecked + t.state + ~now:(Time_source.now t.time_source); return (`Finished ())) ;; let notify_response t response = - Synchronous_rate_limiter.received_response t.state response; + t.state <- Synchronous_rate_limiter.received_response t.state response; Bvar.broadcast t.response_received () ;; diff --git a/reddit_api_kernel/rate_limiter.ml b/reddit_api_kernel/rate_limiter.ml index b1a5e02f..bf061373 100644 --- a/reddit_api_kernel/rate_limiter.ml +++ b/reddit_api_kernel/rate_limiter.ml @@ -12,20 +12,26 @@ module type Basic = sig val kind : string val wait_until : t -> When_to_send.t - val sent_request_unchecked : t -> now:Time_ns.t -> unit - val received_response : t -> Cohttp.Response.t -> unit + val sent_request_unchecked : t -> now:Time_ns.t -> t + val received_response : t -> Cohttp.Response.t -> t end type t = T : (module Basic with type t = 't) * 't -> t let sexp_of_t (T ((module S), t)) : Sexp.t = List [ Atom S.kind; [%sexp_of: S.t] t ] let wait_until (T ((module S), t)) = S.wait_until t -let sent_request_unchecked (T ((module S), t)) = S.sent_request_unchecked t -let received_response (T ((module S), t)) = S.received_response t + +let sent_request_unchecked (T (((module M) as m), t)) ~now = + T (m, M.sent_request_unchecked t ~now) +;; + +let received_response (T (((module M) as m), t)) response = + T (m, M.received_response t response) +;; module With_minimum_delay = struct type t = - { mutable last_request : Time_ns.Alternate_sexp.t option + { last_request : Time_ns.Alternate_sexp.t option ; delay : Time_ns.Span.t } [@@deriving sexp_of] @@ -39,8 +45,8 @@ module With_minimum_delay = struct | Some time -> After (Time_ns.add time t.delay) ;; - let sent_request_unchecked t ~now = t.last_request <- Some (Time_ns.add now t.delay) - let received_response (_ : t) (_ : Cohttp.Response.t) = () + let sent_request_unchecked t ~now = { t with last_request = Some now } + let received_response t (_ : Cohttp.Response.t) = t end module By_headers = struct @@ -158,20 +164,14 @@ module By_headers = struct ;; end - module State = struct - type t = - | Created - | Waiting_on_first_request - | Consuming_rate_limit of Server_side_info.t - [@@deriving sexp_of] - end - - type t = State.t ref [@@deriving sexp_of] - - let create () = ref State.Created + type t = + | Created + | Waiting_on_first_request + | Consuming_rate_limit of Server_side_info.t + [@@deriving sexp_of] - let wait_until (t : t) : When_to_send.t = - match !t with + let wait_until t : When_to_send.t = + match t with | Created -> Now | Waiting_on_first_request -> Check_after_receiving_response | Consuming_rate_limit { remaining_api_calls; reset_time } -> @@ -180,52 +180,45 @@ module By_headers = struct | false -> After reset_time) ;; - let sent_request_unchecked (t : t) ~now = - let new_state : State.t = - match !t with - | Created -> Waiting_on_first_request - | Waiting_on_first_request -> - raise_s - [%message - "[sent_request_unchecked] illegally called in [Waiting_on_first_request] \ - state."] - | Consuming_rate_limit server_side_info -> - let base_server_side_info = - match Time_ns.( <= ) server_side_info.reset_time now with - | false -> server_side_info - | true -> Server_side_info.state_at_start_of_window ~representative_time:now - in - Consuming_rate_limit - { base_server_side_info with - remaining_api_calls = base_server_side_info.remaining_api_calls - 1 - } - in - t := new_state + let sent_request_unchecked t ~now = + match t with + | Created -> Waiting_on_first_request + | Waiting_on_first_request -> + raise_s + [%message + "[sent_request_unchecked] illegally called in [Waiting_on_first_request] state."] + | Consuming_rate_limit server_side_info -> + let base_server_side_info = + match Time_ns.( <= ) server_side_info.reset_time now with + | false -> server_side_info + | true -> Server_side_info.state_at_start_of_window ~representative_time:now + in + Consuming_rate_limit + { base_server_side_info with + remaining_api_calls = base_server_side_info.remaining_api_calls - 1 + } ;; - let received_response (t : t) response = + let received_response t response = let headers = Cohttp.Response.headers response in - let new_state : State.t = - match Server_side_info.t_of_headers headers with - | None -> - (* We assume that, in the absence of ratelimit headers, we must have hit + match Server_side_info.t_of_headers headers with + | None -> + (* We assume that, in the absence of ratelimit headers, we must have hit some authentication failure. As a heuristic to avoid getting stuck, we immediately reset [t.ready]. *) - Created - | Some response_server_side_info -> - (match !t with - | Created -> - raise_s [%message "[received_response] called before [sent_request_unchecked]."] - | Waiting_on_first_request -> Consuming_rate_limit response_server_side_info - | Consuming_rate_limit server_side_info -> - Consuming_rate_limit - (Server_side_info.freshest server_side_info response_server_side_info)) - in - t := new_state + Created + | Some response_server_side_info -> + (match t with + | Created -> + raise_s [%message "[received_response] called before [sent_request_unchecked]."] + | Waiting_on_first_request -> Consuming_rate_limit response_server_side_info + | Consuming_rate_limit server_side_info -> + Consuming_rate_limit + (Server_side_info.freshest server_side_info response_server_side_info)) ;; end -let by_headers () = T ((module By_headers), By_headers.create ()) +let by_headers = T ((module By_headers), Created) let with_minimum_delay ~delay = T ((module With_minimum_delay), With_minimum_delay.create ~delay) @@ -251,10 +244,10 @@ module Combined = struct | None -> Now ;; - let sent_request_unchecked ts ~now = List.iter ts ~f:(sent_request_unchecked ~now) + let sent_request_unchecked ts ~now = List.map ts ~f:(sent_request_unchecked ~now) let received_response ts response = - List.iter ts ~f:(fun t -> received_response t response) + List.map ts ~f:(fun t -> received_response t response) ;; end diff --git a/reddit_api_kernel/rate_limiter.mli b/reddit_api_kernel/rate_limiter.mli index 8cfdb54d..07dfa58a 100644 --- a/reddit_api_kernel/rate_limiter.mli +++ b/reddit_api_kernel/rate_limiter.mli @@ -9,9 +9,9 @@ end type t [@@deriving sexp_of] -val by_headers : unit -> t +val by_headers : t val with_minimum_delay : delay:Time_ns.Span.t -> t val combine : t list -> t val wait_until : t -> When_to_send.t -val sent_request_unchecked : t -> now:Time_ns.t -> unit -val received_response : t -> Cohttp.Response.t -> unit +val sent_request_unchecked : t -> now:Time_ns.t -> t +val received_response : t -> Cohttp.Response.t -> t diff --git a/test/test_comment_fields.ml b/test/test_comment_fields.ml index 13a92289..06c815ef 100644 --- a/test/test_comment_fields.ml +++ b/test/test_comment_fields.ml @@ -27,9 +27,7 @@ let%expect_test "comment_fields" = Set.symmetric_diff keys_from_comment_page keys_from_info_page |> Sequence.to_list in print_s [%sexp (diff : (string, string) Either.t list)]; - [%expect - {| - ("Rate limit is resetting"(old_remaining_api_calls 995)) + [%expect {| ((First depth)) |}]; print_s [%sexp (Thing.Comment.depth first_comment : int option)]; [%expect {| (0) |}]; diff --git a/test/test_links_and_comments.ml b/test/test_links_and_comments.ml index cdd311f9..5afda00b 100644 --- a/test/test_links_and_comments.ml +++ b/test/test_links_and_comments.ml @@ -27,7 +27,7 @@ let%expect_test "unsave" = let%bind () = Connection.call_exn connection (Endpoint.unsave ~id) in (* Unsave is idempotent *) let%bind () = Connection.call_exn connection (Endpoint.unsave ~id) in - [%expect {| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; + [%expect {| |}]; return ()) ;; @@ -41,7 +41,7 @@ let%expect_test "send_replies" = let%bind () = Connection.call_exn connection (Endpoint.send_replies ~id ~enabled:false) in - [%expect {| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; + [%expect {| |}]; return ()) ;; @@ -54,7 +54,7 @@ let%expect_test "set_contest_mode" = let%bind () = Connection.call_exn connection (Endpoint.set_contest_mode ~link ~enabled:false) in - [%expect {| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; + [%expect {| |}]; return ()) ;; @@ -86,9 +86,6 @@ let%expect_test "vote" = let%bind () = Connection.call_exn connection (Endpoint.vote () ~target ~direction:Up) in - [%expect - {| - ("Rate limit is resetting"(old_remaining_api_calls 995)) - ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; + [%expect {| |}]; return ()) ;; diff --git a/test/test_rate_limiter.ml b/test/test_rate_limiter.ml index f8131c69..3c937533 100644 --- a/test/test_rate_limiter.ml +++ b/test/test_rate_limiter.ml @@ -50,7 +50,7 @@ let%expect_test _ = let time_source = Time_source.create ~now:(00 ^: 00) () in let rate_limiter = Reddit_api_async.Rate_limiter.of_synchronous - (Reddit_api_kernel.Rate_limiter.by_headers ()) + Reddit_api_kernel.Rate_limiter.by_headers (Time_source.read_only time_source) in let print () = @@ -96,7 +96,7 @@ let%expect_test _ = By_headers ( Consuming_rate_limit ( (remaining_api_calls 1) - (reset_time (1970-01-01 00:10:00.000000000Z)))))) + (reset_time "1970-01-01 00:10:00Z"))))) (response_received (has_any_waiters false)) (time_source )))) |}]; let%bind () = Rate_limiter.permit_request rate_limiter in @@ -110,7 +110,7 @@ let%expect_test _ = By_headers ( Consuming_rate_limit ( (remaining_api_calls 0) - (reset_time (1970-01-01 00:10:00.000000000Z)))))) + (reset_time "1970-01-01 00:10:00Z"))))) (response_received (has_any_waiters false)) (time_source )))) |}]; (* Receiving a response for the same reset period will not increase our limit remaining. *) @@ -127,7 +127,7 @@ let%expect_test _ = By_headers ( Consuming_rate_limit ( (remaining_api_calls 0) - (reset_time (1970-01-01 00:10:00.000000000Z)))))) + (reset_time "1970-01-01 00:10:00Z"))))) (response_received (has_any_waiters false)) (time_source )))) |}]; (* Moving to the next period increases our remaining limit. *) @@ -145,7 +145,7 @@ let%expect_test _ = By_headers ( Consuming_rate_limit ( (remaining_api_calls 10) - (reset_time (1970-01-01 00:20:00.000000000Z)))))) + (reset_time "1970-01-01 00:20:00Z"))))) (response_received (has_any_waiters false)) (time_source )))) |}]; (* Exhausting the remaining limit causes us to be not-ready. *) @@ -166,7 +166,7 @@ let%expect_test _ = By_headers ( Consuming_rate_limit ( (remaining_api_calls 0) - (reset_time (1970-01-01 00:20:00.000000000Z)))))) + (reset_time "1970-01-01 00:20:00Z"))))) (response_received (has_any_waiters false)) (time_source )))) |}]; let ready_deferred = Rate_limiter.permit_request rate_limiter in @@ -184,7 +184,7 @@ let%expect_test _ = By_headers ( Consuming_rate_limit ( (remaining_api_calls 995) - (reset_time (1970-01-01 00:30:00.000000000Z)))))) + (reset_time "1970-01-01 00:30:00Z"))))) (response_received (has_any_waiters false)) (time_source )))) |}]; (* Advancing past the reset time before we receive a response does not result @@ -204,7 +204,7 @@ let%expect_test _ = By_headers ( Consuming_rate_limit ( (remaining_api_calls 0) - (reset_time (1970-01-01 00:30:00.000000000Z)))))) + (reset_time "1970-01-01 00:30:00Z"))))) (response_received (has_any_waiters false)) (time_source )))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 30) in @@ -218,7 +218,7 @@ let%expect_test _ = By_headers ( Consuming_rate_limit ( (remaining_api_calls 0) - (reset_time (1970-01-01 00:30:00.000000000Z)))))) + (reset_time "1970-01-01 00:30:00Z"))))) (response_received (has_any_waiters false)) (time_source )))) |}]; let%bind () = Rate_limiter.permit_request rate_limiter in @@ -232,7 +232,7 @@ let%expect_test _ = By_headers ( Consuming_rate_limit ( (remaining_api_calls 995) - (reset_time (1970-01-01 00:40:00.000000000Z)))))) + (reset_time "1970-01-01 00:40:00Z"))))) (response_received (has_any_waiters false)) (time_source )))) |}]; Rate_limiter.notify_response @@ -248,7 +248,7 @@ let%expect_test _ = By_headers ( Consuming_rate_limit ( (remaining_api_calls 995) - (reset_time (1970-01-01 00:40:00.000000000Z)))))) + (reset_time "1970-01-01 00:40:00Z"))))) (response_received (has_any_waiters false)) (time_source )))) |}]; let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 31) in @@ -262,7 +262,7 @@ let%expect_test _ = By_headers ( Consuming_rate_limit ( (remaining_api_calls 995) - (reset_time (1970-01-01 00:40:00.000000000Z)))))) + (reset_time "1970-01-01 00:40:00Z"))))) (response_received (has_any_waiters false)) (time_source )))) |}]; return () diff --git a/test/test_set_subreddit_sticky.ml b/test/test_set_subreddit_sticky.ml index 7a1746fe..65618085 100644 --- a/test/test_set_subreddit_sticky.ml +++ b/test/test_set_subreddit_sticky.ml @@ -19,6 +19,6 @@ let%expect_test "set_subreddit_sticky" = connection (Endpoint.set_subreddit_sticky () ~link ~sticky_state:Unsticky) in - [%expect {| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}]; + [%expect {| |}]; return ()) ;; From f283126c9ae07d80f5aeaa9b45e33ffc30f65412 Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Wed, 24 Jan 2024 20:04:49 -0500 Subject: [PATCH 11/16] Rename synchronous Rate_limiter to Synchronous_rate_limiter --- reddit_api_async/connection.ml | 3 --- reddit_api_async/rate_limiter.ml | 2 +- reddit_api_async/rate_limiter.mli | 2 +- reddit_api_kernel/reddit_api_kernel.ml | 2 +- .../{rate_limiter.ml => synchronous_rate_limiter.ml} | 0 .../{rate_limiter.mli => synchronous_rate_limiter.mli} | 0 test/test_rate_limiter.ml | 2 +- 7 files changed, 4 insertions(+), 7 deletions(-) rename reddit_api_kernel/{rate_limiter.ml => synchronous_rate_limiter.ml} (100%) rename reddit_api_kernel/{rate_limiter.mli => synchronous_rate_limiter.mli} (100%) diff --git a/reddit_api_async/connection.ml b/reddit_api_async/connection.ml index 4053b45d..150e756d 100644 --- a/reddit_api_async/connection.ml +++ b/reddit_api_async/connection.ml @@ -1,8 +1,6 @@ open! Core open! Async -module Rate_limiter' = Rate_limiter open Reddit_api_kernel -module Rate_limiter = Rate_limiter' module Credentials = struct module Password = struct @@ -385,7 +383,6 @@ type t = T : (module T with type t = 't) * 't -> t let sexp_of_t (T ((module T), t)) = T.sexp_of_t t let all_rate_limiters () = - let module Synchronous_rate_limiter = Reddit_api_kernel.Rate_limiter in Synchronous_rate_limiter.combine [ Synchronous_rate_limiter.by_headers ; Synchronous_rate_limiter.with_minimum_delay ~delay:(Time_ns.Span.of_int_ms 100) diff --git a/reddit_api_async/rate_limiter.ml b/reddit_api_async/rate_limiter.ml index da428aa6..b62bf7a9 100644 --- a/reddit_api_async/rate_limiter.ml +++ b/reddit_api_async/rate_limiter.ml @@ -1,6 +1,6 @@ open! Core open! Async -module Synchronous_rate_limiter = Reddit_api_kernel.Rate_limiter +module Synchronous_rate_limiter = Reddit_api_kernel.Synchronous_rate_limiter type t = { mutable state : Synchronous_rate_limiter.t diff --git a/reddit_api_async/rate_limiter.mli b/reddit_api_async/rate_limiter.mli index b22875e1..8de057ac 100644 --- a/reddit_api_async/rate_limiter.mli +++ b/reddit_api_async/rate_limiter.mli @@ -3,7 +3,7 @@ open! Async type t [@@deriving sexp_of] -val of_synchronous : Reddit_api_kernel.Rate_limiter.t -> Time_source.t -> t +val of_synchronous : Reddit_api_kernel.Synchronous_rate_limiter.t -> Time_source.t -> t val permit_request : t -> unit Deferred.t val notify_response : t -> Cohttp.Response.t -> unit val is_ready : t -> bool diff --git a/reddit_api_kernel/reddit_api_kernel.ml b/reddit_api_kernel/reddit_api_kernel.ml index 0f5e2f4c..f5934086 100644 --- a/reddit_api_kernel/reddit_api_kernel.ml +++ b/reddit_api_kernel/reddit_api_kernel.ml @@ -8,7 +8,6 @@ module Listing = Listing module Mod_action = Mod_action module Moderator_report = Moderator_report module Modmail = Modmail -module Rate_limiter = Rate_limiter module Relationship = Relationship module Stylesheet = Stylesheet module Submit_text = Submit_text @@ -16,6 +15,7 @@ module Subreddit_name = Subreddit_name module Subreddit_rules = Subreddit_rules module Subreddit_settings = Subreddit_settings module Subreddit_traffic = Subreddit_traffic +module Synchronous_rate_limiter = Synchronous_rate_limiter module Thing = Thing module Thing_kind = Thing_kind module Uri_with_string_sexp = Uri_with_string_sexp diff --git a/reddit_api_kernel/rate_limiter.ml b/reddit_api_kernel/synchronous_rate_limiter.ml similarity index 100% rename from reddit_api_kernel/rate_limiter.ml rename to reddit_api_kernel/synchronous_rate_limiter.ml diff --git a/reddit_api_kernel/rate_limiter.mli b/reddit_api_kernel/synchronous_rate_limiter.mli similarity index 100% rename from reddit_api_kernel/rate_limiter.mli rename to reddit_api_kernel/synchronous_rate_limiter.mli diff --git a/test/test_rate_limiter.ml b/test/test_rate_limiter.ml index 3c937533..6521c80a 100644 --- a/test/test_rate_limiter.ml +++ b/test/test_rate_limiter.ml @@ -50,7 +50,7 @@ let%expect_test _ = let time_source = Time_source.create ~now:(00 ^: 00) () in let rate_limiter = Reddit_api_async.Rate_limiter.of_synchronous - Reddit_api_kernel.Rate_limiter.by_headers + Reddit_api_kernel.Synchronous_rate_limiter.by_headers (Time_source.read_only time_source) in let print () = From 6b8f22707f2a82c19ce933662411c73d70302455 Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Sun, 11 Feb 2024 16:31:53 -0500 Subject: [PATCH 12/16] Connection.all_rate_limters doesn't need to be a thunk --- reddit_api_async/connection.ml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/reddit_api_async/connection.ml b/reddit_api_async/connection.ml index 150e756d..8841d75a 100644 --- a/reddit_api_async/connection.ml +++ b/reddit_api_async/connection.ml @@ -382,7 +382,7 @@ type t = T : (module T with type t = 't) * 't -> t let sexp_of_t (T ((module T), t)) = T.sexp_of_t t -let all_rate_limiters () = +let all_rate_limiters = Synchronous_rate_limiter.combine [ Synchronous_rate_limiter.by_headers ; Synchronous_rate_limiter.with_minimum_delay ~delay:(Time_ns.Span.of_int_ms 100) @@ -391,7 +391,7 @@ let all_rate_limiters () = let create credentials ~user_agent = let rate_limiter = - Rate_limiter.of_synchronous (all_rate_limiters ()) (Time_source.wall_clock ()) + Rate_limiter.of_synchronous all_rate_limiters (Time_source.wall_clock ()) in T ((module Local), Local.create credentials ~user_agent ~rate_limiter) ;; @@ -868,7 +868,7 @@ module For_testing = struct | false -> recording filename placeholders in let rate_limiter = - Rate_limiter.of_synchronous (all_rate_limiters ()) Cassette.time_source + Rate_limiter.of_synchronous all_rate_limiters Cassette.time_source in let connection = T From be2765cf011c88eb62c2663e1023caeb609dd5dc Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Sun, 11 Feb 2024 16:33:52 -0500 Subject: [PATCH 13/16] Remove unnecessary anonymous open in reddit_api_async wrapper module --- reddit_api_async/reddit_api_async.ml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/reddit_api_async/reddit_api_async.ml b/reddit_api_async/reddit_api_async.ml index a70dcb69..861b7325 100644 --- a/reddit_api_async/reddit_api_async.ml +++ b/reddit_api_async/reddit_api_async.ml @@ -1,11 +1,7 @@ -open struct - module Rate_limiter' = Rate_limiter -end - include Reddit_api_kernel module Connection = Connection module Iter_comments = Iter_comments -module Rate_limiter = Rate_limiter' +module Rate_limiter = Rate_limiter module Retry_manager = Retry_manager module Stream = Stream From 60030813800443a7038830701cf8bf3298d4dcd6 Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Sun, 11 Feb 2024 16:49:15 -0500 Subject: [PATCH 14/16] Add a little documentation to synchronous_rate_limiter.mli --- reddit_api_kernel/synchronous_rate_limiter.mli | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/reddit_api_kernel/synchronous_rate_limiter.mli b/reddit_api_kernel/synchronous_rate_limiter.mli index 07dfa58a..7eadec9b 100644 --- a/reddit_api_kernel/synchronous_rate_limiter.mli +++ b/reddit_api_kernel/synchronous_rate_limiter.mli @@ -9,9 +9,20 @@ end type t [@@deriving sexp_of] +(** {1 Constructors} *) + val by_headers : t val with_minimum_delay : delay:Time_ns.Span.t -> t val combine : t list -> t -val wait_until : t -> When_to_send.t + +(** {1 Events} *) + +(** [sent_request_unchecked] should called immediately after sending a request. + It is the caller's responsibility to first call {!wait_until}. *) val sent_request_unchecked : t -> now:Time_ns.t -> t + val received_response : t -> Cohttp.Response.t -> t + +(** {1 Accessors} *) + +val wait_until : t -> When_to_send.t From ce39caa1e11a21e59df6859a0ae5a43797592255 Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Sun, 11 Feb 2024 16:57:18 -0500 Subject: [PATCH 15/16] Rename Synchronous_rate_limiter => Rate_limiter_state_machine --- reddit_api_async/connection.ml | 10 +++++----- reddit_api_async/rate_limiter.ml | 14 +++++++------- reddit_api_async/rate_limiter.mli | 6 +++++- ...te_limiter.ml => rate_limiter_state_machine.ml} | 0 ..._limiter.mli => rate_limiter_state_machine.mli} | 0 reddit_api_kernel/reddit_api_kernel.ml | 2 +- test/test_rate_limiter.ml | 4 ++-- 7 files changed, 20 insertions(+), 16 deletions(-) rename reddit_api_kernel/{synchronous_rate_limiter.ml => rate_limiter_state_machine.ml} (100%) rename reddit_api_kernel/{synchronous_rate_limiter.mli => rate_limiter_state_machine.mli} (100%) diff --git a/reddit_api_async/connection.ml b/reddit_api_async/connection.ml index 8841d75a..bffc0d8b 100644 --- a/reddit_api_async/connection.ml +++ b/reddit_api_async/connection.ml @@ -383,15 +383,15 @@ type t = T : (module T with type t = 't) * 't -> t let sexp_of_t (T ((module T), t)) = T.sexp_of_t t let all_rate_limiters = - Synchronous_rate_limiter.combine - [ Synchronous_rate_limiter.by_headers - ; Synchronous_rate_limiter.with_minimum_delay ~delay:(Time_ns.Span.of_int_ms 100) + Rate_limiter_state_machine.combine + [ Rate_limiter_state_machine.by_headers + ; Rate_limiter_state_machine.with_minimum_delay ~delay:(Time_ns.Span.of_int_ms 100) ] ;; let create credentials ~user_agent = let rate_limiter = - Rate_limiter.of_synchronous all_rate_limiters (Time_source.wall_clock ()) + Rate_limiter.of_state_machine all_rate_limiters (Time_source.wall_clock ()) in T ((module Local), Local.create credentials ~user_agent ~rate_limiter) ;; @@ -868,7 +868,7 @@ module For_testing = struct | false -> recording filename placeholders in let rate_limiter = - Rate_limiter.of_synchronous all_rate_limiters Cassette.time_source + Rate_limiter.of_state_machine all_rate_limiters Cassette.time_source in let connection = T diff --git a/reddit_api_async/rate_limiter.ml b/reddit_api_async/rate_limiter.ml index b62bf7a9..2e238cf7 100644 --- a/reddit_api_async/rate_limiter.ml +++ b/reddit_api_async/rate_limiter.ml @@ -1,22 +1,22 @@ open! Core open! Async -module Synchronous_rate_limiter = Reddit_api_kernel.Synchronous_rate_limiter +module Rate_limiter_state_machine = Reddit_api_kernel.Rate_limiter_state_machine type t = - { mutable state : Synchronous_rate_limiter.t + { mutable state : Rate_limiter_state_machine.t ; response_received : (unit, read_write) Bvar.t ; time_source : (Time_source.t[@sexp.opaque]) } [@@deriving sexp_of] -let of_synchronous state time_source = +let of_state_machine state time_source = let response_received = Bvar.create () in { state; response_received; time_source } ;; let is_ready t = let now = Time_source.now t.time_source in - match Synchronous_rate_limiter.wait_until t.state with + match Rate_limiter_state_machine.wait_until t.state with | Now -> true | Check_after_receiving_response -> false | After time -> Time_ns.( >= ) now time @@ -24,7 +24,7 @@ let is_ready t = let wait_until_ready t = Deferred.repeat_until_finished () (fun () -> - match Synchronous_rate_limiter.wait_until t.state with + match Rate_limiter_state_machine.wait_until t.state with | Now -> return (`Finished ()) | After time -> (match Time_ns.( >= ) (Time_source.now t.time_source) time with @@ -44,13 +44,13 @@ let permit_request t = | false -> return (`Repeat ()) | true -> t.state - <- Synchronous_rate_limiter.sent_request_unchecked + <- Rate_limiter_state_machine.sent_request_unchecked t.state ~now:(Time_source.now t.time_source); return (`Finished ())) ;; let notify_response t response = - t.state <- Synchronous_rate_limiter.received_response t.state response; + t.state <- Rate_limiter_state_machine.received_response t.state response; Bvar.broadcast t.response_received () ;; diff --git a/reddit_api_async/rate_limiter.mli b/reddit_api_async/rate_limiter.mli index 8de057ac..15c737aa 100644 --- a/reddit_api_async/rate_limiter.mli +++ b/reddit_api_async/rate_limiter.mli @@ -3,7 +3,11 @@ open! Async type t [@@deriving sexp_of] -val of_synchronous : Reddit_api_kernel.Synchronous_rate_limiter.t -> Time_source.t -> t +val of_state_machine + : Reddit_api_kernel.Rate_limiter_state_machine.t + -> Time_source.t + -> t + val permit_request : t -> unit Deferred.t val notify_response : t -> Cohttp.Response.t -> unit val is_ready : t -> bool diff --git a/reddit_api_kernel/synchronous_rate_limiter.ml b/reddit_api_kernel/rate_limiter_state_machine.ml similarity index 100% rename from reddit_api_kernel/synchronous_rate_limiter.ml rename to reddit_api_kernel/rate_limiter_state_machine.ml diff --git a/reddit_api_kernel/synchronous_rate_limiter.mli b/reddit_api_kernel/rate_limiter_state_machine.mli similarity index 100% rename from reddit_api_kernel/synchronous_rate_limiter.mli rename to reddit_api_kernel/rate_limiter_state_machine.mli diff --git a/reddit_api_kernel/reddit_api_kernel.ml b/reddit_api_kernel/reddit_api_kernel.ml index f5934086..6c34da72 100644 --- a/reddit_api_kernel/reddit_api_kernel.ml +++ b/reddit_api_kernel/reddit_api_kernel.ml @@ -8,6 +8,7 @@ module Listing = Listing module Mod_action = Mod_action module Moderator_report = Moderator_report module Modmail = Modmail +module Rate_limiter_state_machine = Rate_limiter_state_machine module Relationship = Relationship module Stylesheet = Stylesheet module Submit_text = Submit_text @@ -15,7 +16,6 @@ module Subreddit_name = Subreddit_name module Subreddit_rules = Subreddit_rules module Subreddit_settings = Subreddit_settings module Subreddit_traffic = Subreddit_traffic -module Synchronous_rate_limiter = Synchronous_rate_limiter module Thing = Thing module Thing_kind = Thing_kind module Uri_with_string_sexp = Uri_with_string_sexp diff --git a/test/test_rate_limiter.ml b/test/test_rate_limiter.ml index 6521c80a..b6b492fc 100644 --- a/test/test_rate_limiter.ml +++ b/test/test_rate_limiter.ml @@ -49,8 +49,8 @@ let%expect_test _ = let ( ^: ) hr min = Time_ns.add Time_ns.epoch (Time_ns.Span.create ~hr ~min ()) in let time_source = Time_source.create ~now:(00 ^: 00) () in let rate_limiter = - Reddit_api_async.Rate_limiter.of_synchronous - Reddit_api_kernel.Synchronous_rate_limiter.by_headers + Reddit_api_async.Rate_limiter.of_state_machine + Reddit_api_kernel.Rate_limiter_state_machine.by_headers (Time_source.read_only time_source) in let print () = From 92e7052ebf08137f550e3fa000b0383b350ac60d Mon Sep 17 00:00:00 2001 From: Levi Roth Date: Sat, 2 Mar 2024 09:08:21 -0500 Subject: [PATCH 16/16] Update changelog for rate limiter rewrite --- CHANGES.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index fd5b4b6d..e4a6d8c0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -17,7 +17,6 @@ - The `Log.t` initially has no output, but can have its output set by the application. - Log when `Retry_manager` retries. -- Log, at debug level, when rate limit is exhausted and when it resets. - Rename `Non_transient_error` to `Permanent_error`. - Rename `Json_object.Util.time` => `time_sec_since_epoch`. - Require Jane Street libraries >= v0.16.0. @@ -29,6 +28,9 @@ - Change `Iter_comments` to return a pipe. - Fix a crash when replying to messages by introducing the new `Endpoint.reply_to_message`. +- Revamp rate limiter. The rate limiter is now built on top of a synchronous + state machine with a small async wrapper. We eliminate various async + background jobs. ## Removed