Skip to content

Commit

Permalink
Further modifications to rate limiter - removing event
Browse files Browse the repository at this point in the history
  • Loading branch information
leviroth committed Oct 8, 2023
1 parent 2f71073 commit e6e38c5
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 115 deletions.
123 changes: 69 additions & 54 deletions reddit_api_async/rate_limiter.ml
Original file line number Diff line number Diff line change
Expand Up @@ -167,79 +167,90 @@ module By_headers = struct
let freshest = Base.Comparable.max compare_by_inferred_age
end

module State = struct
type t =
| Created
| Waiting_on_first_request
| Known of Server_side_info.t
[@@deriving sexp_of]
end

type t =
{ ready : (unit, read_write) Mvar.t
{ mutable state : State.t
; updated : (unit, read_write) Bvar.t
; time_source : (Time_source.t[@sexp.opaque])
; mutable reset_event : ((Nothing.t, unit) Time_source.Event.t[@sexp.opaque]) option
; mutable server_side_info : Server_side_info.t option
}
[@@deriving sexp_of]

let create ~time_source =
let ready = Mvar.create () in
Mvar.set ready ();
{ server_side_info = None; reset_event = None; time_source; ready }
let updated = Bvar.create () in
{ time_source; state = Created; updated }
;;

let is_ready { ready; _ } = not (Mvar.is_empty ready)
let wait_until_ready { ready; _ } = Mvar.value_available ready

let rec schedule_reset_at_time t time =
let schedule_fresh_event () =
t.reset_event
<- Some
(Time_source.Event.run_at
t.time_source
time
(fun () ->
Mvar.set t.ready ();
(* In case something prevents our first request from receiving
a response, we will periodically allow retries. *)
schedule_reset_at_time t (Time_ns.add time Time_ns.Span.minute))
())
in
match t.reset_event with
| None -> schedule_fresh_event ()
| Some event ->
let scheduled_time = Time_source.Event.scheduled_at event in
(match Time_ns.( < ) scheduled_time time with
| false -> ()
| true ->
(match Time_source.Event.reschedule_at event time with
| Ok -> ()
| Previously_aborted _ -> .
| Previously_happened () -> schedule_fresh_event ()))
let is_ready { state; time_source; _ } =
match state with
| Created -> true
| Waiting_on_first_request -> false
| Known { remaining_api_calls; reset_time } ->
remaining_api_calls > 0 || Time_ns.( >= ) (Time_source.now time_source) reset_time
;;

let wait_until_ready { state; time_source; updated } =
Deferred.repeat_until_finished () (fun () ->
match state with
| Created -> return (`Finished ())
| Waiting_on_first_request ->
let%bind () = Bvar.wait updated in
return (`Repeat ())
| Known { remaining_api_calls; reset_time } ->
(match
remaining_api_calls > 0
|| Time_ns.( >= ) (Time_source.now time_source) reset_time
with
| true -> return (`Finished ())
| false ->
let%bind () =
Deferred.any
[ (* TODO reset time needs to actually... reset things*)
Time_source.at time_source reset_time
; Bvar.wait updated
]
in
return (`Repeat ())))
;;

(* TODO: Allow retries once per minute? *)

let update_server_side_info t ~new_server_side_info =
t.server_side_info <- Some new_server_side_info;
schedule_reset_at_time t new_server_side_info.reset_time;
t.state <- Known new_server_side_info;
Bvar.broadcast t.updated ();
match
( new_server_side_info.remaining_api_calls > 0
, Time_ns.equal new_server_side_info.reset_time Time_ns.epoch )
with
| false, false -> ()
| false, false | true, (true | false) -> ()
| false, true ->
[%log.debug
Import.log
"Rate limit exhausted"
~reset_time:(new_server_side_info.reset_time : Time_ns_unix.t)]
| true, _ -> Mvar.set t.ready ()
;;

let permit_request t =
let%bind () = Mvar.take t.ready in
(match t.server_side_info with
| None -> ()
| Some ({ remaining_api_calls; _ } as server_side_info) ->
(match remaining_api_calls with
| 0 -> ()
| n ->
let new_server_side_info =
{ server_side_info with remaining_api_calls = n - 1 }
in
update_server_side_info t ~new_server_side_info));
return ()
let%bind () = wait_until_ready t in
match t.state with
| Waiting_on_first_request -> assert false
| Created ->
t.state <- Waiting_on_first_request;
return ()
| Known ({ remaining_api_calls; _ } as server_side_info) ->
let new_server_side_info : Server_side_info.t =
match remaining_api_calls with
| 0 -> server_side_info
| n -> { server_side_info with remaining_api_calls = n - 1 }
in
update_server_side_info t ~new_server_side_info;
return ()
;;

let notify_response t response =
Expand All @@ -249,12 +260,16 @@ module By_headers = struct
(* We assume that, in the absence of ratelimit headers, we must have hit
some authentication failure. As a heuristic to avoid getting stuck, we
immediately reset [t.ready]. *)
Mvar.set t.ready ()
(* TODO: What to do here? *)
t.state <- Created
| Some response_server_side_info ->
let new_server_side_info =
match t.server_side_info with
| None -> response_server_side_info
| Some server_side_info ->
match t.state with
| Created ->
(* TODO: this could happen if the module is misused *)
assert false
| Waiting_on_first_request -> response_server_side_info
| Known server_side_info ->
(match
Comparable.lift
[%compare: Time_ns.t]
Expand Down
128 changes: 67 additions & 61 deletions test/test_rate_limiter.ml
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ let%expect_test _ =
(is_ready true)
(rate_limiter (
By_headers (
(ready (()))
(time_source <opaque>)
(reset_event ())
(server_side_info ()))))) |}];
(state Created) (updated (has_any_waiters false)) (time_source <opaque>))))) |}];
(* Initially we can permit one request. *)
let%bind () = Rate_limiter.permit_request rate_limiter in
print ();
Expand All @@ -80,10 +77,9 @@ let%expect_test _ =
(is_ready false)
(rate_limiter (
By_headers (
(ready ())
(time_source <opaque>)
(reset_event ())
(server_side_info ()))))) |}];
(state Waiting_on_first_request)
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
(* Receiving a response allows us to send another request. *)
Rate_limiter.notify_response
rate_limiter
Expand All @@ -96,11 +92,12 @@ let%expect_test _ =
(is_ready false)
(rate_limiter (
By_headers (
(ready ())
(time_source <opaque>)
(reset_event (<opaque>))
(server_side_info ((
(remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z))))))))) |}];
(state (
Known (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:10:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
(* Receiving a response for the same reset period will not increase our limit remaining. *)
Rate_limiter.notify_response
rate_limiter
Expand All @@ -112,11 +109,12 @@ let%expect_test _ =
(is_ready false)
(rate_limiter (
By_headers (
(ready ())
(time_source <opaque>)
(reset_event (<opaque>))
(server_side_info ((
(remaining_api_calls 0) (reset_time (1970-01-01 00:10:00.000000000Z))))))))) |}];
(state (
Known (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:10:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
(* Moving to the next period increases our remaining limit. *)
Rate_limiter.notify_response
rate_limiter
Expand All @@ -128,11 +126,12 @@ let%expect_test _ =
(is_ready true)
(rate_limiter (
By_headers (
(ready (()))
(time_source <opaque>)
(reset_event (<opaque>))
(server_side_info ((
(remaining_api_calls 10) (reset_time (1970-01-01 00:20:00.000000000Z))))))))) |}];
(state (
Known (
(remaining_api_calls 10)
(reset_time (1970-01-01 00:20:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
(* Exhausting the remaining limit causes us to be not-ready. *)
let%bind () =
Deferred.repeat_until_finished 10 (function
Expand All @@ -149,11 +148,12 @@ let%expect_test _ =
(is_ready false)
(rate_limiter (
By_headers (
(ready ())
(time_source <opaque>)
(reset_event (<opaque>))
(server_side_info ((
(remaining_api_calls 0) (reset_time (1970-01-01 00:20:00.000000000Z))))))))) |}];
(state (
Known (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:20:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
(* Advancing the time allows us to send another request. *)
let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 20) in
print ();
Expand All @@ -163,11 +163,12 @@ let%expect_test _ =
(is_ready true)
(rate_limiter (
By_headers (
(ready (()))
(time_source <opaque>)
(reset_event (<opaque>))
(server_side_info ((
(remaining_api_calls 0) (reset_time (1970-01-01 00:20:00.000000000Z))))))))) |}];
(state (
Known (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:20:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
(* Advancing past the reset time before we receive a response does not result
in a double reset. *)
let%bind () = Rate_limiter.permit_request rate_limiter in
Expand All @@ -183,11 +184,12 @@ let%expect_test _ =
(is_ready false)
(rate_limiter (
By_headers (
(ready ())
(time_source <opaque>)
(reset_event (<opaque>))
(server_side_info ((
(remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}];
(state (
Known (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:30:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 30) in
print ();
[%expect
Expand All @@ -196,51 +198,55 @@ let%expect_test _ =
(is_ready true)
(rate_limiter (
By_headers (
(ready (()))
(time_source <opaque>)
(reset_event (<opaque>))
(server_side_info ((
(remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}];
(state (
Known (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:30:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
let%bind () = Rate_limiter.permit_request rate_limiter in
print ();
[%expect
{|
((time (1970-01-01 00:30:00.000000000Z))
(is_ready false)
(is_ready true)
(rate_limiter (
By_headers (
(ready ())
(time_source <opaque>)
(reset_event (<opaque>))
(server_side_info ((
(remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}];
(state (
Known (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:30:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
Rate_limiter.notify_response
rate_limiter
(build_header ~server_time:(00 ^: 29) ~limit_remaining:1);
print ();
[%expect
{|
((time (1970-01-01 00:30:00.000000000Z))
(is_ready false)
(is_ready true)
(rate_limiter (
By_headers (
(ready ())
(time_source <opaque>)
(reset_event (<opaque>))
(server_side_info ((
(remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}];
(state (
Known (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:30:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 31) in
print ();
[%expect
{|
((time (1970-01-01 00:31:00.000000000Z))
(is_ready false)
(is_ready true)
(rate_limiter (
By_headers (
(ready ())
(time_source <opaque>)
(reset_event (<opaque>))
(server_side_info ((
(remaining_api_calls 0) (reset_time (1970-01-01 00:30:00.000000000Z))))))))) |}];
(state (
Known (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:30:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
return ()
;;

0 comments on commit e6e38c5

Please sign in to comment.