From 435ac2342560a9f2e9260526739cf2d9ee05bf5e Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 19 Sep 2024 13:27:38 -0400 Subject: [PATCH] khepri_machine: Expire dedups on tick As part of the `khepri_machine:post_apply/2` helper which is run after any command is applied, we use `maps:filter/2` to eliminate any entries in the `dedups` field of the machine state which are expired according to the command's timestamp. This `drop_expired_dedups/2` step becomes a bottleneck though when a Khepri store handles many transactions at once. For example in RabbitMQ, queue deletion is done with a transaction submitted from each queue process. When many (for example five thousand) queues are deleted at once, `drop_expired_dedups/2` becomes a noticeable chunk of a flamegraph and specifically the `maps:filter/2` within. `maps:filter/2` is slow here because the BIF used to implement it collects a list of all key-value pairs for which the predicate returns true, sorts it, and then creates a new hashmap from it. We are unlikely to expire any given dedup when handling a command, especially when submitting many commands at once, so we end up essentially calling `maps:from_list(maps:to_list(Dedups))`. It is a small improvement to replace `maps:filter/2` with `maps:fold/3`, a case expression and `maps:remove/2` (reflecting that we will always fold over the map but rarely remove elements) but it is not enough to eliminate this rather large chunk of the flamegraph. Instead the solution in this commit is to move the detection of expired dedups to the "tick" aux effect. Ra emits this effect periodically (say every few seconds). By moving this detection to `handle_aux/5` we remove it from the "hot path" of `apply/3`. Even better though: we are unlikely to actually need to expire any dedups. In the case of queue deletion in RabbitMQ for example, we are likely to handle all in-flight transactions and handle the subsequent `#dedup_ack{}` commands before even evaluating a `tick` effect. If we do handle a tick effect while transactions are in-flight then we are unlikely to need to expire them anyways so we will only scan the map with the new `khepri_utils:maps_any/2` helper. If we need to expire dedups, the aux handler for the `tick` effect appends a new `#expire_dedups{}` command to the log which does the same as `drop_expired_dedups/2` prior to this commit. --- src/khepri_machine.erl | 50 +++++++++++++++++++++++------------------- src/khepri_machine.hrl | 2 ++ src/khepri_utils.erl | 20 +++++++++++++++++ test/utils.erl | 15 +++++++++++++ 4 files changed, 64 insertions(+), 23 deletions(-) diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index 403aaf09..ea09a5d0 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -1260,6 +1260,19 @@ handle_aux( Tree = get_tree(State), ok = restore_projection(Projection, Tree, PathPattern), {no_reply, AuxState, IntState}; +handle_aux(_RaState, cast, tick, AuxState, IntState) -> + State = ra_aux:machine_state(IntState), + Ts = erlang:system_time(millisecond), + Dedups = get_dedups(State), + CanExpire = fun(_CommandRef, {_Reply, Expiry}) -> Expiry =< Ts end, + Effs = case khepri_utils:maps_any(CanExpire, Dedups) of + true -> + ExpireDedups = #expire_dedups{}, + [{append, ExpireDedups}]; + false -> + [] + end, + {no_reply, AuxState, IntState, Effs}; handle_aux(_RaState, _Type, _Command, AuxState, IntState) -> {no_reply, AuxState, IntState}. @@ -1458,6 +1471,19 @@ apply( end, Ret = {State1, ok}, post_apply(Ret, Meta); +apply( + #{machine_version := MacVer, + system_time := Timestamp} = Meta, + #expire_dedups{}, + State) when MacVer >= 1 -> + Dedups = get_dedups(State), + Dedups1 = maps:filter( + fun(_CommandRef, {_Reply, Expiry}) -> + Expiry > Timestamp + end, Dedups), + State1 = set_dedups(State, Dedups1), + Ret = {State1, ok}, + post_apply(Ret, Meta); apply(Meta, {machine_version, OldMacVer, NewMacVer}, OldState) -> NewState = convert_state(OldState, OldMacVer, NewMacVer), Ret = {NewState, ok}, @@ -1490,9 +1516,7 @@ apply(#{machine_version := MacVer} = Meta, UnknownCommand, State) -> post_apply({State, Result}, Meta) -> post_apply({State, Result, []}, Meta); post_apply({_State, _Result, _SideEffects} = Ret, Meta) -> - Ret1 = bump_applied_command_count(Ret, Meta), - Ret2 = drop_expired_dedups(Ret1, Meta), - Ret2. + bump_applied_command_count(Ret, Meta). -spec bump_applied_command_count(ApplyRet, Meta) -> {State, Result, SideEffects} when @@ -1532,26 +1556,6 @@ reset_applied_command_count(State) -> Metrics1 = maps:remove(applied_command_count, Metrics), set_metrics(State, Metrics1). --spec drop_expired_dedups(ApplyRet, Meta) -> - {State, Result, SideEffects} when - ApplyRet :: {State, Result, SideEffects}, - State :: state(), - Result :: any(), - Meta :: ra_machine:command_meta_data(), - SideEffects :: ra_machine:effects(). -%% @private - -drop_expired_dedups( - {State, Result, SideEffects}, - #{system_time := Timestamp}) -> - Dedups = get_dedups(State), - Dedups1 = maps:filter( - fun(_CommandRef, {_Reply, Expiry}) -> - Expiry >= Timestamp - end, Dedups), - State1 = set_dedups(State, Dedups1), - {State1, Result, SideEffects}. - %% @private state_enter(leader, State) -> diff --git a/src/khepri_machine.hrl b/src/khepri_machine.hrl index 24765001..b32971f1 100644 --- a/src/khepri_machine.hrl +++ b/src/khepri_machine.hrl @@ -64,6 +64,8 @@ -record(dedup_ack, {ref :: reference()}). +-record(expire_dedups, {}). + %% Old commands, kept for backward-compatibility. -record(unregister_projection, {name :: khepri_projection:name()}). diff --git a/src/khepri_utils.erl b/src/khepri_utils.erl index fb0f978d..d0b4a532 100644 --- a/src/khepri_utils.erl +++ b/src/khepri_utils.erl @@ -18,6 +18,7 @@ -export([start_timeout_window/1, end_timeout_window/2, sleep/2, + maps_any/2, is_ra_server_alive/1, node_props_to_payload/2, @@ -83,6 +84,25 @@ sleep(Time, Timeout) when Time > Timeout -> timer:sleep(Timeout), 0. +-spec maps_any(Predicate, Map) -> boolean() when + Predicate :: fun((Key, Value) -> boolean()), + Map :: #{Key => Value}, + Key :: term(), + Value :: term(). + +maps_any(Fun, Map) when is_map(Map) -> + maps_any1(Fun, maps:next(maps:iterator(Map))). + +maps_any1(Fun, {Key, Value, Iterator}) -> + case Fun(Key, Value) of + true -> + true; + false -> + maps_any1(Fun, maps:next(Iterator)) + end; +maps_any1(_Fun, none) -> + false. + -spec is_ra_server_alive(RaServer) -> IsAlive when RaServer :: ra:server_id(), IsAlive :: boolean(). diff --git a/test/utils.erl b/test/utils.erl index 99c688f2..072bebda 100644 --- a/test/utils.erl +++ b/test/utils.erl @@ -74,3 +74,18 @@ timeout_window_with_high_timeout_test() -> timer:sleep(Sleep), NewTimeout = khepri_utils:end_timeout_window(Timeout, Window), ?assert(NewTimeout =< Timeout - Sleep). + +maps_any_test() -> + IsEven = fun(N) -> (N band 1) == 0 end, + ValueIsEven = fun(_Key, Value) -> IsEven(Value) end, + + ?assertNot(khepri_utils:maps_any(ValueIsEven, #{})), + ?assertNot(khepri_utils:maps_any(ValueIsEven, #{k => 1})), + ?assert(khepri_utils:maps_any(ValueIsEven, #{k => 2})), + + Map1 = #{N => N || N <- lists:seq(1, 100)}, + ?assert(khepri_utils:maps_any(ValueIsEven, Map1)), + Map2 = #{N => N || N <- lists:seq(1, 100), not IsEven(N)}, + ?assertNot(khepri_utils:maps_any(ValueIsEven, Map2)), + + ok.