Skip to content

Commit

Permalink
Rate limiter: track received_response directly on state
Browse files Browse the repository at this point in the history
  • Loading branch information
leviroth committed Jan 13, 2024
1 parent 8d3cb5f commit 4615b69
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 32 deletions.
27 changes: 13 additions & 14 deletions reddit_api_async/rate_limiter.ml
Original file line number Diff line number Diff line change
Expand Up @@ -186,27 +186,23 @@ module By_headers = struct
module State = struct
type t =
| Created
| Waiting_on_first_request
| Waiting_on_first_request of { received_response : unit Ivar.t }
| Consuming_rate_limit of Server_side_info.t
[@@deriving sexp_of]
end

type t =
{ mutable state : State.t
; updated : (unit, read_write) Bvar.t
; time_source : (Time_source.t[@sexp.opaque])
}
[@@deriving sexp_of]

let create ~time_source =
let updated = Bvar.create () in
{ time_source; state = Created; updated }
;;
let create ~time_source = { time_source; state = Created }

let is_ready { state; time_source; _ } =
match state with
| Created -> true
| Waiting_on_first_request -> false
| Waiting_on_first_request _ -> false
| Consuming_rate_limit { remaining_api_calls; reset_time } ->
remaining_api_calls > 0 || Time_ns.( >= ) (Time_source.now time_source) reset_time
;;
Expand All @@ -215,12 +211,11 @@ module By_headers = struct
Deferred.repeat_until_finished () (fun () ->
match t.state with
| Created -> return (`Finished ())
| Waiting_on_first_request ->
let%bind () = Bvar.wait t.updated in
| Waiting_on_first_request { received_response } ->
let%bind () = Ivar.read received_response in
return (`Repeat ())
| Consuming_rate_limit { remaining_api_calls; reset_time } ->
let now = Time_source.now t.time_source in
(* TODO: Move above? *)
(match Time_ns.( >= ) now reset_time with
| true ->
t.state
Expand All @@ -238,8 +233,11 @@ module By_headers = struct
(* TODO: Allow retries once per minute? *)

let update_server_side_info t ~new_server_side_info =
(match t.state with
| State.Created | State.Consuming_rate_limit _ -> ()
| State.Waiting_on_first_request { received_response } ->
Ivar.fill_if_empty received_response ());
t.state <- Consuming_rate_limit new_server_side_info;
Bvar.broadcast t.updated ();
match
( new_server_side_info.remaining_api_calls > 0
, Time_ns.equal new_server_side_info.reset_time Time_ns.epoch )
Expand All @@ -255,9 +253,10 @@ module By_headers = struct
let permit_request t =
let%bind () = wait_until_ready t in
match t.state with
| Waiting_on_first_request -> (* TODO is this impossible? *) assert false
| Waiting_on_first_request _ -> (* TODO is this impossible? *) assert false
| Created ->
t.state <- Waiting_on_first_request;
let received_response = Ivar.create () in
t.state <- Waiting_on_first_request { received_response };
return ()
| Consuming_rate_limit ({ remaining_api_calls; _ } as server_side_info) ->
let new_server_side_info : Server_side_info.t =
Expand All @@ -282,7 +281,7 @@ module By_headers = struct
let new_server_side_info =
match t.state with
| Created -> raise_s [%message "[notify_response] called before [permit_request]"]
| Waiting_on_first_request -> response_server_side_info
| Waiting_on_first_request _ -> response_server_side_info
| Consuming_rate_limit server_side_info ->
(match
Comparable.lift
Expand Down
3 changes: 2 additions & 1 deletion test/test_comment_fields.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ let%expect_test "comment_fields" =
Set.symmetric_diff keys_from_comment_page keys_from_info_page |> Sequence.to_list
in
print_s [%sexp (diff : (string, string) Either.t list)];
[%expect {|
[%expect
{|
("Rate limit is resetting"(old_remaining_api_calls 995))
((First depth)) |}];
print_s [%sexp (Thing.Comment.depth first_comment : int option)];
Expand Down
7 changes: 4 additions & 3 deletions test/test_links_and_comments.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ let%expect_test "send_replies" =
let%bind () =
Connection.call_exn connection (Endpoint.send_replies ~id ~enabled:false)
in
[%expect{| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}];
[%expect {| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}];
return ())
;;

Expand All @@ -54,7 +54,7 @@ let%expect_test "set_contest_mode" =
let%bind () =
Connection.call_exn connection (Endpoint.set_contest_mode ~link ~enabled:false)
in
[%expect{| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}];
[%expect {| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}];
return ())
;;

Expand Down Expand Up @@ -86,7 +86,8 @@ let%expect_test "vote" =
let%bind () =
Connection.call_exn connection (Endpoint.vote () ~target ~direction:Up)
in
[%expect{|
[%expect
{|
("Rate limit is resetting"(old_remaining_api_calls 995))
("Rate limit is resetting"(old_remaining_api_calls 995)) |}];
return ())
Expand Down
16 changes: 3 additions & 13 deletions test/test_rate_limiter.ml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ let%expect_test _ =
(is_ready true)
(rate_limiter (
By_headers (
(state Created) (updated (has_any_waiters false)) (time_source <opaque>))))) |}];
(state Created)
(time_source <opaque>))))) |}];
(* Initially we can permit one request. *)
let%bind () = Rate_limiter.permit_request rate_limiter in
print ();
Expand All @@ -77,8 +78,7 @@ let%expect_test _ =
(is_ready false)
(rate_limiter (
By_headers (
(state Waiting_on_first_request)
(updated (has_any_waiters false))
(state (Waiting_on_first_request (received_response Empty)))
(time_source <opaque>))))) |}];
(* Receiving a response allows us to send another request. *)
Rate_limiter.notify_response
Expand All @@ -96,7 +96,6 @@ let%expect_test _ =
Consuming_rate_limit (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:10:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
(* Receiving a response for the same reset period will not increase our limit remaining. *)
Rate_limiter.notify_response
Expand All @@ -113,7 +112,6 @@ let%expect_test _ =
Consuming_rate_limit (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:10:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
(* Moving to the next period increases our remaining limit. *)
Rate_limiter.notify_response
Expand All @@ -130,7 +128,6 @@ let%expect_test _ =
Consuming_rate_limit (
(remaining_api_calls 10)
(reset_time (1970-01-01 00:20:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
(* Exhausting the remaining limit causes us to be not-ready. *)
let%bind () =
Expand All @@ -153,7 +150,6 @@ let%expect_test _ =
Consuming_rate_limit (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:20:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
let ready_deferred = Rate_limiter.permit_request rate_limiter in
[%expect {| |}];
Expand All @@ -171,7 +167,6 @@ let%expect_test _ =
Consuming_rate_limit (
(remaining_api_calls 995)
(reset_time (1970-01-01 00:30:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
(* Advancing past the reset time before we receive a response does not result
in a double reset. *)
Expand All @@ -191,7 +186,6 @@ let%expect_test _ =
Consuming_rate_limit (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:30:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 30) in
print ();
Expand All @@ -205,7 +199,6 @@ let%expect_test _ =
Consuming_rate_limit (
(remaining_api_calls 0)
(reset_time (1970-01-01 00:30:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
let%bind () = Rate_limiter.permit_request rate_limiter in
print ();
Expand All @@ -219,7 +212,6 @@ let%expect_test _ =
Consuming_rate_limit (
(remaining_api_calls 995)
(reset_time (1970-01-01 00:40:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
Rate_limiter.notify_response
rate_limiter
Expand All @@ -235,7 +227,6 @@ let%expect_test _ =
Consuming_rate_limit (
(remaining_api_calls 995)
(reset_time (1970-01-01 00:40:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
let%bind () = Time_source.advance_by_alarms time_source ~to_:(00 ^: 31) in
print ();
Expand All @@ -249,7 +240,6 @@ let%expect_test _ =
Consuming_rate_limit (
(remaining_api_calls 995)
(reset_time (1970-01-01 00:40:00.000000000Z)))))
(updated (has_any_waiters false))
(time_source <opaque>))))) |}];
return ()
;;
2 changes: 1 addition & 1 deletion test/test_set_subreddit_sticky.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ let%expect_test "set_subreddit_sticky" =
connection
(Endpoint.set_subreddit_sticky () ~link ~sticky_state:Unsticky)
in
[%expect{| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}];
[%expect {| ("Rate limit is resetting"(old_remaining_api_calls 995)) |}];
return ())
;;

0 comments on commit 4615b69

Please sign in to comment.