From 1df55e3943cd7025df3695e0f3c0bd70f0e424a1 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 19 Sep 2024 13:07:02 -0400 Subject: [PATCH 1/3] khepri_machine: Migrate to the new Ra aux API Ra 2.10.1 introduced a new `handle_aux/5` callback that takes the place of `handle_aux/6`. Instead of passing the log state and machine state separately, this API passes a new `ra_aux:internal_state()` opaque argument which you can read log or machine state out of with helper functions from the `ra_aux` module. This commit only updates to use the new callback: there should be no functional change from this commit. --- src/khepri_machine.erl | 31 +++++++++++++++++-------------- src/khepri_projection.erl | 2 +- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index 1e9855e8..273a7c57 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -85,7 +85,7 @@ %% ra_machine callbacks. -export([init/1, init_aux/1, - handle_aux/6, + handle_aux/5, apply/3, state_enter/2, snapshot_installed/4, @@ -1236,14 +1236,16 @@ init(Params) -> init_aux(StoreId) -> #khepri_machine_aux{store_id = StoreId}. --spec handle_aux(RaState, Type, Command, AuxState, LogState, MachineState) -> - {no_reply, AuxState, LogState} when +-spec handle_aux(RaState, Type, Command, AuxState, IntState) -> + Ret when RaState :: ra_server:ra_state(), Type :: {call, ra:from()} | cast, Command :: term(), AuxState :: aux_state(), - LogState :: ra_log:state(), - MachineState :: state(). + IntState :: ra_aux:internal_state(), + Ret :: {no_reply, AuxState, IntState} | + {no_reply, AuxState, IntState, Effects}, + Effects :: ra_machine:effects(). %% @private handle_aux( @@ -1252,12 +1254,12 @@ handle_aux( old_props = OldProps, new_props = NewProps, projection = Projection}, - AuxState, LogState, _MachineState) -> + AuxState, IntState) -> khepri_projection:trigger(Projection, Path, OldProps, NewProps), - {no_reply, AuxState, LogState}; + {no_reply, AuxState, IntState}; handle_aux( - _RaState, cast, restore_projections, AuxState, LogState, - State) -> + _RaState, cast, restore_projections, AuxState, IntState) -> + State = ra_aux:machine_state(IntState), Tree = get_tree(State), ProjectionTree = get_projections(State), khepri_pattern_tree:foreach( @@ -1266,16 +1268,17 @@ handle_aux( [restore_projection(Projection, Tree, PathPattern) || Projection <- Projections] end), - {no_reply, AuxState, LogState}; + {no_reply, AuxState, IntState}; handle_aux( _RaState, cast, #restore_projection{projection = Projection, pattern = PathPattern}, - AuxState, LogState, State) -> + AuxState, IntState) -> + State = ra_aux:machine_state(IntState), Tree = get_tree(State), ok = restore_projection(Projection, Tree, PathPattern), - {no_reply, AuxState, LogState}; -handle_aux(_RaState, _Type, _Command, AuxState, LogState, _MachineState) -> - {no_reply, AuxState, LogState}. + {no_reply, AuxState, IntState}; +handle_aux(_RaState, _Type, _Command, AuxState, IntState) -> + {no_reply, AuxState, IntState}. restore_projection(Projection, Tree, PathPattern) -> _ = khepri_projection:init(Projection), diff --git a/src/khepri_projection.erl b/src/khepri_projection.erl index a05def99..ffeb82ce 100644 --- a/src/khepri_projection.erl +++ b/src/khepri_projection.erl @@ -308,7 +308,7 @@ trigger( undefined -> %% A table might have been deleted by an `unregister_projections' %% effect in between when a `trigger_projection' effect is created - %% and when it is handled in `khepri_machine:handle_aux/6`. In this + %% and when it is handled in `khepri_machine:handle_aux/5`. In this %% case we should no-op the trigger effect. ok; Table -> From 0d7e0d7b76f14c4db6eae90f498b23c662c0bc4e Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 1 Oct 2024 11:15:54 -0400 Subject: [PATCH 2/3] test_ra_server_helpers: Allow passing Ra server configuration This will be used in the child commit to write a test which sets a low `tick_timeout` configuration. --- test/test_ra_server_helpers.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test/test_ra_server_helpers.erl b/test/test_ra_server_helpers.erl index 1e1433b2..c9d4b47c 100644 --- a/test/test_ra_server_helpers.erl +++ b/test/test_ra_server_helpers.erl @@ -11,15 +11,20 @@ -include_lib("stdlib/include/assert.hrl"). -export([setup/1, + setup/2, cleanup/1]). setup(Testcase) -> + setup(Testcase, #{}). + +setup(Testcase, CustomConfig) -> _ = logger:set_primary_config(level, warning), {ok, _} = application:ensure_all_started(khepri), khepri_utils:init_list_of_modules_to_skip(), #{ra_system := RaSystem} = Props = helpers:start_ra_system(Testcase), - {ok, StoreId} = khepri:start(RaSystem, Testcase), + RaServerConfig = maps:put(cluster_name, Testcase, CustomConfig), + {ok, StoreId} = khepri:start(RaSystem, RaServerConfig), Props#{store_id => StoreId}. cleanup(#{store_id := StoreId} = Props) -> From c522a88c3c87f997dceab81acfa13c0f769ced6b Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 1 Oct 2024 11:16:17 -0400 Subject: [PATCH 3/3] khepri_machine: Expire dedups on tick As part of the `khepri_machine:post_apply/2` helper which is run after any command is applied, we use `maps:filter/2` to eliminate any entries in the `dedups` field of the machine state which are expired according to the command's timestamp. This `drop_expired_dedups/2` step becomes a bottleneck though when a Khepri store handles many transactions at once. For example in RabbitMQ, queue deletion is done with a transaction submitted from each queue process. When many (for example five thousand) queues are deleted at once, `drop_expired_dedups/2` becomes a noticeable chunk of a flamegraph and specifically the `maps:filter/2` within. `maps:filter/2` is slow here because the BIF used to implement it collects a list of all key-value pairs for which the predicate returns true, sorts it, and then creates a new hashmap from it. We are unlikely to expire any given dedup when handling a command, especially when submitting many commands at once, so we end up essentially calling `maps:from_list(maps:to_list(Dedups))`. It is a small improvement to replace `maps:filter/2` with `maps:fold/3`, a case expression and `maps:remove/2` (reflecting that we will always fold over the map but rarely remove elements) but it is not enough to eliminate this rather large chunk of the flamegraph. Instead the solution in this commit is to move the detection of expired dedups to the "tick" aux effect. Ra emits this effect periodically: every 1s by default but configurable. By moving this detection to `handle_aux/5` we remove it from the "hot path" of `apply/3`. Even better though: we are unlikely to actually need to expire any dedups. In the case of queue deletion in RabbitMQ for example, we are likely to handle all in-flight transactions and handle the subsequent `#dedup_ack{}` commands before even evaluating a `tick` effect. If we do handle a tick effect while transactions are in-flight then we are unlikely to need to expire them anyways so we will only scan the map with the new `khepri_utils:maps_any/2` helper. If we need to expire dedups, the aux handler for the `tick` effect appends a new `#expire_dedups{}` command to the log which does the same as `drop_expired_dedups/2` prior to this commit. --- src/khepri_machine.erl | 74 ++++++++++++++- src/khepri_machine.hrl | 7 ++ test/protect_against_dups_option.erl | 129 +++++++++++++++------------ 3 files changed, 151 insertions(+), 59 deletions(-) diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index 273a7c57..7b196535 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -49,6 +49,8 @@ %%
  • Changed the data structure for the reverse index used to track %% keep-while conditions to be a prefix tree (see {@link khepri_prefix_tree}). %%
  • +%%
  • Moved the expiration of dedups to the `tick' aux effect (see {@link +%% handle_aux/5}). This also introduces a new command `#drop_dedups{}'.
  • %% %% %% @@ -119,7 +121,8 @@ get_dedups/1]). -ifdef(TEST). --export([make_virgin_state/1, +-export([do_process_sync_command/3, + make_virgin_state/1, convert_state/3, set_tree/2]). -endif. @@ -1277,6 +1280,37 @@ handle_aux( Tree = get_tree(State), ok = restore_projection(Projection, Tree, PathPattern), {no_reply, AuxState, IntState}; +handle_aux(leader, cast, tick, AuxState, IntState) -> + %% Expiring dedups in the tick handler is only available on versions 2 + %% and greater. In versions 0 and 1, expiration of dedups is done in + %% `drop_expired_dedups/2'. This proved to be quite expensive when handling + %% a very large batch of transactions at once, so this expiration step was + %% moved to the `tick' handler in version 2. + case ra_aux:effective_machine_version(IntState) of + EffectiveMacVer when EffectiveMacVer >= 2 -> + State = ra_aux:machine_state(IntState), + Timestamp = erlang:system_time(millisecond), + Dedups = get_dedups(State), + RefsToDrop = maps:fold( + fun(CommandRef, {_Reply, Expiry}, Acc) -> + case Expiry =< Timestamp of + true -> + [CommandRef | Acc]; + false -> + Acc + end + end, [], Dedups), + Effects = case RefsToDrop of + [] -> + []; + _ -> + DropDedups = #drop_dedups{refs = RefsToDrop}, + [{append, DropDedups}] + end, + {no_reply, AuxState, IntState, Effects}; + _ -> + {no_reply, AuxState, IntState} + end; handle_aux(_RaState, _Type, _Command, AuxState, IntState) -> {no_reply, AuxState, IntState}. @@ -1475,6 +1509,20 @@ apply( end, Ret = {State1, ok}, post_apply(Ret, Meta); +apply( + #{machine_version := MacVer} = Meta, + #drop_dedups{refs = RefsToDrop}, + State) when MacVer >= 2 -> + %% `#drop_dedups{}' is emitted by the `handle_aux/5' clause for the `tick' + %% effect to periodically drop dedups that have expired. This expiration + %% was originally done in `post_apply/2' via `drop_expired_dedups/2' until + %% machine version 2. Note that `drop_expired_dedups/2' is used until a + %% cluster reaches an effective machine version of 2 or higher. + Dedups = get_dedups(State), + Dedups1 = maps:without(RefsToDrop, Dedups), + State1 = set_dedups(State, Dedups1), + Ret = {State1, ok}, + post_apply(Ret, Meta); apply(Meta, {machine_version, OldMacVer, NewMacVer}, OldState) -> NewState = convert_state(OldState, OldMacVer, NewMacVer), Ret = {NewState, ok}, @@ -1556,18 +1604,38 @@ reset_applied_command_count(State) -> Result :: any(), Meta :: ra_machine:command_meta_data(), SideEffects :: ra_machine:effects(). +%% Removes any dedups from the `dedups' field in state that have expired +%% according to the timestamp in the handled command. +%% +%% This function is a no-op in any other version than version 1. This proved to +%% be expensive to execute as part of `apply/3' so dedup expiration moved to +%% the `handle_aux/5' for `tick' which is executed periodically. See that +%% function clause above for more information. +%% %% @private drop_expired_dedups( {State, Result, SideEffects}, - #{system_time := Timestamp}) -> + #{system_time := Timestamp, + machine_version := MacVer}) when MacVer =< 1 -> Dedups = get_dedups(State), + %% Historical note: `maps:filter/2' can be surprisingly expensive when + %% used in a tight loop like `apply/3' depending on how many elements are + %% retained. As of Erlang/OTP 27, the BIF which implements `maps:filter/2' + %% collects any key-value pairs for which the predicate returns `true' into + %% a list, sorts/dedups the list and then creates a new map. This is slow + %% if the filter function always returns `true'. In cases like this where + %% the common usage is to retain most elements, `maps:fold/3' plus a `case' + %% expression and `maps:remove/2' is likely to be less expensive. Dedups1 = maps:filter( fun(_CommandRef, {_Reply, Expiry}) -> Expiry >= Timestamp end, Dedups), State1 = set_dedups(State, Dedups1), - {State1, Result, SideEffects}. + {State1, Result, SideEffects}; +drop_expired_dedups({State, Result, SideEffects}, _Meta) -> + %% No-op on versions 2 and higher. + {State, Result, SideEffects}. %% @private diff --git a/src/khepri_machine.hrl b/src/khepri_machine.hrl index 2946bc81..306b6c88 100644 --- a/src/khepri_machine.hrl +++ b/src/khepri_machine.hrl @@ -64,6 +64,13 @@ -record(dedup_ack, {ref :: reference()}). +-record(drop_dedups, {refs :: [reference()]}). +%% A command introduced in machine version 2 which is meant to drop expired +%% dedups. +%% +%% This is emitted internally by the `handle_aux/5' callback clause which +%% handles the `tick' Ra aux effect. + %% Old commands, kept for backward-compatibility. -record(unregister_projection, {name :: khepri_projection:name()}). diff --git a/test/protect_against_dups_option.erl b/test/protect_against_dups_option.erl index 85c3daf0..196755ee 100644 --- a/test/protect_against_dups_option.erl +++ b/test/protect_against_dups_option.erl @@ -127,62 +127,79 @@ dedup_and_dedup_ack_test() -> ?assertEqual(ok, Ret2), ?assertEqual([], SE2). -dedup_expiry_test() -> - S00 = khepri_machine:init(?MACH_PARAMS()), - S0 = khepri_machine:convert_state(S00, 0, 1), - - Command = #put{path = [foo], - payload = khepri_payload:data(value), - options = #{expect_specific_node => true, - props_to_return => [payload, - payload_version]}}, - CommandRef = make_ref(), - Delay = 2000, - Expiry = erlang:system_time(millisecond) + Delay, - DedupCommand = #dedup{ref = CommandRef, - expiry = Expiry, - command = Command}, - {S1, Ret1, SE1} = khepri_machine:apply(?META, DedupCommand, S0), - ExpectedRet = {ok, #{[foo] => #{payload_version => 1}}}, - - Dedups1 = khepri_machine:get_dedups(S1), - ?assertEqual(#{CommandRef => {ExpectedRet, Expiry}}, Dedups1), - - Root1 = khepri_machine:get_root(S1), - ?assertEqual( - #node{ - props = - #{payload_version => 1, - child_list_version => 2}, - child_nodes = - #{foo => - #node{ - props = ?INIT_NODE_PROPS, - payload = khepri_payload:data(value)}}}, - Root1), - ?assertEqual(ExpectedRet, Ret1), - ?assertEqual([], SE1), - - timer:sleep(Delay + 1000), - - %% The put command is idempotent, so not really ideal to test - %% deduplication. Instead, we mess up with the state and silently restore - %% the initial empty tree. If the dedup mechanism works, the returned - %% state shouldn't have the `foo' node either because it didn't process - %% the command. - PatchedS1 = khepri_machine:set_tree(S1, khepri_machine:get_tree(S0)), - {S2, Ret2, SE2} = khepri_machine:apply(?META, DedupCommand, PatchedS1), - - %% The dedups entry was dropped at the end of apply because it expired. - Dedups2 = khepri_machine:get_dedups(S2), - ?assertEqual(#{}, Dedups2), - - Root0 = khepri_machine:get_root(S0), - Root2 = khepri_machine:get_root(S2), - ?assertEqual(Root0, Root2), - - ?assertEqual(ExpectedRet, Ret2), - ?assertEqual([], SE2). +dedup_expiry_test_() -> + TickTimeout = 200, + Config = #{tick_timeout => TickTimeout}, + StoredProcPath = [sproc], + Path = [stock, wood, <<"oak">>], + Command = #tx{'fun' = StoredProcPath, args = []}, + CommandRef = make_ref(), + Expiry = erlang:system_time(millisecond), + DedupCommand = #dedup{ref = CommandRef, + command = Command, + expiry = Expiry}, + {setup, + fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME, Config) end, + fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end, + [{inorder, + [{"Store a procedure for later use", + ?_assertEqual( + ok, + khepri:put( + ?FUNCTION_NAME, StoredProcPath, + fun() -> + {ok, N} = khepri_tx:get(Path), + khepri_tx:put(Path, N + 1) + end))}, + + {"Store initial data", + ?_assertEqual( + ok, + khepri:put(?FUNCTION_NAME, Path, 1))}, + + {"Trigger the transaction", + ?_assertEqual( + ok, + khepri_machine:do_process_sync_command( + ?FUNCTION_NAME, DedupCommand, #{}))}, + + {"The transaction was applied and the data is incremented", + ?_assertEqual( + {ok, 2}, + khepri:get(?FUNCTION_NAME, Path))}, + + {"Trigger the transaction again before the dedup can be expired", + ?_assertEqual( + ok, + khepri_machine:do_process_sync_command( + ?FUNCTION_NAME, DedupCommand, #{}))}, + + {"The transaction was deduplicated and the data is unchanged", + ?_assertEqual( + {ok, 2}, + khepri:get(?FUNCTION_NAME, Path))}, + + {"Sleep and send the same transaction command", + ?_test( + begin + %% Sleep a little extra for the sake of slow CI runners. + %% During this sleep time the machine will receive Ra's + %% periodic `tick' aux effect which will trigger expiration of + %% dedups. + SleepTime = TickTimeout + erlang:floor(TickTimeout * 3 / 4), + timer:sleep(SleepTime), + %% The dedup should be expired so this duplicate command should + %% be handled and the data should be incremented. + ?assertEqual( + ok, + khepri_machine:do_process_sync_command( + ?FUNCTION_NAME, DedupCommand, #{})) + end)}, + + {"The transaction was applied again and the data is incremented", + ?_assertEqual( + {ok, 3}, + khepri:get(?FUNCTION_NAME, Path))}]}]}. dedup_ack_after_no_dedup_test() -> S00 = khepri_machine:init(?MACH_PARAMS()),