From 0861e3cfbc70dcc1086307e422bb9ceebd70ab8d Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 19 Sep 2024 13:27:38 -0400 Subject: [PATCH] khepri_machine: Expire dedups on tick As part of the `khepri_machine:post_apply/2` helper which is run after any command is applied, we use `maps:filter/2` to eliminate any entries in the `dedups` field of the machine state which are expired according to the command's timestamp. This `drop_expired_dedups/2` step becomes a bottleneck though when a Khepri store handles many transactions at once. For example in RabbitMQ, queue deletion is done with a transaction submitted from each queue process. When many (for example five thousand) queues are deleted at once, `drop_expired_dedups/2` becomes a noticeable chunk of a flamegraph and specifically the `maps:filter/2` within. `maps:filter/2` is slow here because the BIF used to implement it collects a list of all key-value pairs for which the predicate returns true, sorts it, and then creates a new hashmap from it. We are unlikely to expire any given dedup when handling a command, especially when submitting many commands at once, so we end up essentially calling `maps:from_list(maps:to_list(Dedups))`. It is a small improvement to replace `maps:filter/2` with `maps:fold/3`, a case expression and `maps:remove/2` (reflecting that we will always fold over the map but rarely remove elements) but it is not enough to eliminate this rather large chunk of the flamegraph. Instead the solution in this commit is to move the detection of expired dedups to the "tick" aux effect. Ra emits this effect periodically: every 1s by default but configurable. By moving this detection to `handle_aux/5` we remove it from the "hot path" of `apply/3`. Even better though: we are unlikely to actually need to expire any dedups. In the case of queue deletion in RabbitMQ for example, we are likely to handle all in-flight transactions and handle the subsequent `#dedup_ack{}` commands before even evaluating a `tick` effect. If we do handle a tick effect while transactions are in-flight then we are unlikely to need to expire them anyways so we will only scan the map with the new `khepri_utils:maps_any/2` helper. If we need to expire dedups, the aux handler for the `tick` effect appends a new `#expire_dedups{}` command to the log which does the same as `drop_expired_dedups/2` prior to this commit. --- src/khepri_machine.erl | 53 ++++++----- src/khepri_machine.hrl | 2 + src/khepri_utils.erl | 20 +++++ test/protect_against_dups_option.erl | 128 +++++++++++++++------------ test/test_ra_server_helpers.erl | 7 +- test/utils.erl | 15 ++++ 6 files changed, 144 insertions(+), 81 deletions(-) diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index 39e22eac..d18752a5 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -110,7 +110,8 @@ get_dedups/1]). -ifdef(TEST). --export([make_virgin_state/1, +-export([do_process_sync_command/3, + make_virgin_state/1, convert_state/3, set_tree/2]). -endif. @@ -1263,6 +1264,19 @@ handle_aux( Tree = get_tree(State), ok = restore_projection(Projection, Tree, PathPattern), {no_reply, AuxState, IntState}; +handle_aux(_RaState, cast, tick, AuxState, IntState) -> + State = ra_aux:machine_state(IntState), + Ts = erlang:system_time(millisecond), + Dedups = get_dedups(State), + CanExpire = fun(_CommandRef, {_Reply, Expiry}) -> Expiry =< Ts end, + Effects = case khepri_utils:maps_any(CanExpire, Dedups) of + true -> + ExpireDedups = #expire_dedups{}, + [{append, ExpireDedups}]; + false -> + [] + end, + {no_reply, AuxState, IntState, Effects}; handle_aux(_RaState, _Type, _Command, AuxState, IntState) -> {no_reply, AuxState, IntState}. @@ -1461,6 +1475,19 @@ apply( end, Ret = {State1, ok}, post_apply(Ret, Meta); +apply( + #{machine_version := MacVer, + system_time := Timestamp} = Meta, + #expire_dedups{}, + State) when MacVer >= 1 -> + Dedups = get_dedups(State), + Dedups1 = maps:filter( + fun(_CommandRef, {_Reply, Expiry}) -> + Expiry > Timestamp + end, Dedups), + State1 = set_dedups(State, Dedups1), + Ret = {State1, ok}, + post_apply(Ret, Meta); apply(Meta, {machine_version, OldMacVer, NewMacVer}, OldState) -> NewState = convert_state(OldState, OldMacVer, NewMacVer), Ret = {NewState, ok}, @@ -1493,9 +1520,7 @@ apply(#{machine_version := MacVer} = Meta, UnknownCommand, State) -> post_apply({State, Result}, Meta) -> post_apply({State, Result, []}, Meta); post_apply({_State, _Result, _SideEffects} = Ret, Meta) -> - Ret1 = bump_applied_command_count(Ret, Meta), - Ret2 = drop_expired_dedups(Ret1, Meta), - Ret2. + bump_applied_command_count(Ret, Meta). -spec bump_applied_command_count(ApplyRet, Meta) -> {State, Result, SideEffects} when @@ -1535,26 +1560,6 @@ reset_applied_command_count(State) -> Metrics1 = maps:remove(applied_command_count, Metrics), set_metrics(State, Metrics1). --spec drop_expired_dedups(ApplyRet, Meta) -> - {State, Result, SideEffects} when - ApplyRet :: {State, Result, SideEffects}, - State :: state(), - Result :: any(), - Meta :: ra_machine:command_meta_data(), - SideEffects :: ra_machine:effects(). -%% @private - -drop_expired_dedups( - {State, Result, SideEffects}, - #{system_time := Timestamp}) -> - Dedups = get_dedups(State), - Dedups1 = maps:filter( - fun(_CommandRef, {_Reply, Expiry}) -> - Expiry >= Timestamp - end, Dedups), - State1 = set_dedups(State, Dedups1), - {State1, Result, SideEffects}. - %% @private state_enter(leader, State) -> diff --git a/src/khepri_machine.hrl b/src/khepri_machine.hrl index 24765001..b32971f1 100644 --- a/src/khepri_machine.hrl +++ b/src/khepri_machine.hrl @@ -64,6 +64,8 @@ -record(dedup_ack, {ref :: reference()}). +-record(expire_dedups, {}). + %% Old commands, kept for backward-compatibility. -record(unregister_projection, {name :: khepri_projection:name()}). diff --git a/src/khepri_utils.erl b/src/khepri_utils.erl index fb0f978d..d0b4a532 100644 --- a/src/khepri_utils.erl +++ b/src/khepri_utils.erl @@ -18,6 +18,7 @@ -export([start_timeout_window/1, end_timeout_window/2, sleep/2, + maps_any/2, is_ra_server_alive/1, node_props_to_payload/2, @@ -83,6 +84,25 @@ sleep(Time, Timeout) when Time > Timeout -> timer:sleep(Timeout), 0. +-spec maps_any(Predicate, Map) -> boolean() when + Predicate :: fun((Key, Value) -> boolean()), + Map :: #{Key => Value}, + Key :: term(), + Value :: term(). + +maps_any(Fun, Map) when is_map(Map) -> + maps_any1(Fun, maps:next(maps:iterator(Map))). + +maps_any1(Fun, {Key, Value, Iterator}) -> + case Fun(Key, Value) of + true -> + true; + false -> + maps_any1(Fun, maps:next(Iterator)) + end; +maps_any1(_Fun, none) -> + false. + -spec is_ra_server_alive(RaServer) -> IsAlive when RaServer :: ra:server_id(), IsAlive :: boolean(). diff --git a/test/protect_against_dups_option.erl b/test/protect_against_dups_option.erl index 85c3daf0..221a517b 100644 --- a/test/protect_against_dups_option.erl +++ b/test/protect_against_dups_option.erl @@ -127,62 +127,78 @@ dedup_and_dedup_ack_test() -> ?assertEqual(ok, Ret2), ?assertEqual([], SE2). -dedup_expiry_test() -> - S00 = khepri_machine:init(?MACH_PARAMS()), - S0 = khepri_machine:convert_state(S00, 0, 1), - - Command = #put{path = [foo], - payload = khepri_payload:data(value), - options = #{expect_specific_node => true, - props_to_return => [payload, - payload_version]}}, - CommandRef = make_ref(), - Delay = 2000, - Expiry = erlang:system_time(millisecond) + Delay, - DedupCommand = #dedup{ref = CommandRef, - expiry = Expiry, - command = Command}, - {S1, Ret1, SE1} = khepri_machine:apply(?META, DedupCommand, S0), - ExpectedRet = {ok, #{[foo] => #{payload_version => 1}}}, - - Dedups1 = khepri_machine:get_dedups(S1), - ?assertEqual(#{CommandRef => {ExpectedRet, Expiry}}, Dedups1), - - Root1 = khepri_machine:get_root(S1), - ?assertEqual( - #node{ - props = - #{payload_version => 1, - child_list_version => 2}, - child_nodes = - #{foo => - #node{ - props = ?INIT_NODE_PROPS, - payload = khepri_payload:data(value)}}}, - Root1), - ?assertEqual(ExpectedRet, Ret1), - ?assertEqual([], SE1), - - timer:sleep(Delay + 1000), - - %% The put command is idempotent, so not really ideal to test - %% deduplication. Instead, we mess up with the state and silently restore - %% the initial empty tree. If the dedup mechanism works, the returned - %% state shouldn't have the `foo' node either because it didn't process - %% the command. - PatchedS1 = khepri_machine:set_tree(S1, khepri_machine:get_tree(S0)), - {S2, Ret2, SE2} = khepri_machine:apply(?META, DedupCommand, PatchedS1), - - %% The dedups entry was dropped at the end of apply because it expired. - Dedups2 = khepri_machine:get_dedups(S2), - ?assertEqual(#{}, Dedups2), - - Root0 = khepri_machine:get_root(S0), - Root2 = khepri_machine:get_root(S2), - ?assertEqual(Root0, Root2), - - ?assertEqual(ExpectedRet, Ret2), - ?assertEqual([], SE2). +dedup_expiry_test_() -> + TickTimeout = 200, + Config = #{tick_timeout => TickTimeout}, + StoredProcPath = [sproc], + Path = [stock, wood, <<"oak">>], + Command = #tx{'fun' = StoredProcPath, args = []}, + CommandRef = make_ref(), + Expiry = erlang:system_time(millisecond), + DedupCommand = #dedup{ref = CommandRef, + command = Command, + expiry = Expiry}, + {setup, + fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME, Config) end, + fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end, + [{inorder, + [{"Store a procedure for later use", + ?_assertEqual( + ok, + khepri:put( + ?FUNCTION_NAME, StoredProcPath, + fun() -> + {ok, N} = khepri_tx:get(Path), + khepri_tx:put(Path, N + 1) + end))}, + + {"Store initial data", + ?_assertEqual( + ok, + khepri:put(?FUNCTION_NAME, Path, 1))}, + + {"Trigger the transaction", + ?_assertEqual( + ok, + khepri_machine:do_process_sync_command( + ?FUNCTION_NAME, DedupCommand, #{}))}, + + {"The transaction was applied and the data is incremented", + ?_assertEqual( + {ok, 2}, + khepri:get(?FUNCTION_NAME, Path))}, + + {"Trigger the transaction again before the dedup can be expired", + ?_assertEqual( + ok, + khepri_machine:do_process_sync_command( + ?FUNCTION_NAME, DedupCommand, #{}))}, + + {"The transaction was deduplicated and the data is unchanged", + ?_assertEqual( + {ok, 2}, + khepri:get(?FUNCTION_NAME, Path))}, + + {"Sleep and send the same transaction command", + ?_test( + begin + %% Sleep a little extra for the sake of slow CI runners. + %% During this sleep time the machine will receive Ra's + %% periodic `tick' aux effect which will trigger expiration of + %% dedups. + SleepTime = TickTimeout + erlang:floor(TickTimeout * 3 / 4), + timer:sleep(SleepTime), + %% The dedup should be expired so this command should + ?assertEqual( + ok, + khepri_machine:do_process_sync_command( + ?FUNCTION_NAME, DedupCommand, #{})) + end)}, + + {"The transaction was applied again and the data is incremented", + ?_assertEqual( + {ok, 3}, + khepri:get(?FUNCTION_NAME, Path))}]}]}. dedup_ack_after_no_dedup_test() -> S00 = khepri_machine:init(?MACH_PARAMS()), diff --git a/test/test_ra_server_helpers.erl b/test/test_ra_server_helpers.erl index 1e1433b2..c9d4b47c 100644 --- a/test/test_ra_server_helpers.erl +++ b/test/test_ra_server_helpers.erl @@ -11,15 +11,20 @@ -include_lib("stdlib/include/assert.hrl"). -export([setup/1, + setup/2, cleanup/1]). setup(Testcase) -> + setup(Testcase, #{}). + +setup(Testcase, CustomConfig) -> _ = logger:set_primary_config(level, warning), {ok, _} = application:ensure_all_started(khepri), khepri_utils:init_list_of_modules_to_skip(), #{ra_system := RaSystem} = Props = helpers:start_ra_system(Testcase), - {ok, StoreId} = khepri:start(RaSystem, Testcase), + RaServerConfig = maps:put(cluster_name, Testcase, CustomConfig), + {ok, StoreId} = khepri:start(RaSystem, RaServerConfig), Props#{store_id => StoreId}. cleanup(#{store_id := StoreId} = Props) -> diff --git a/test/utils.erl b/test/utils.erl index 99c688f2..072bebda 100644 --- a/test/utils.erl +++ b/test/utils.erl @@ -74,3 +74,18 @@ timeout_window_with_high_timeout_test() -> timer:sleep(Sleep), NewTimeout = khepri_utils:end_timeout_window(Timeout, Window), ?assert(NewTimeout =< Timeout - Sleep). + +maps_any_test() -> + IsEven = fun(N) -> (N band 1) == 0 end, + ValueIsEven = fun(_Key, Value) -> IsEven(Value) end, + + ?assertNot(khepri_utils:maps_any(ValueIsEven, #{})), + ?assertNot(khepri_utils:maps_any(ValueIsEven, #{k => 1})), + ?assert(khepri_utils:maps_any(ValueIsEven, #{k => 2})), + + Map1 = #{N => N || N <- lists:seq(1, 100)}, + ?assert(khepri_utils:maps_any(ValueIsEven, Map1)), + Map2 = #{N => N || N <- lists:seq(1, 100), not IsEven(N)}, + ?assertNot(khepri_utils:maps_any(ValueIsEven, Map2)), + + ok.