Skip to content

Commit

Permalink
khepri_machine: Expire dedups on tick
Browse files Browse the repository at this point in the history
As part of the `khepri_machine:post_apply/2` helper which is run after
any command is applied, we use `maps:filter/2` to eliminate any entries
in the `dedups` field of the machine state which are expired according
to the command's timestamp. This `drop_expired_dedups/2` step becomes a
bottleneck though when a Khepri store handles many transactions at once.

For example in RabbitMQ, queue deletion is done with a transaction
submitted from each queue process. When many (for example five thousand)
queues are deleted at once, `drop_expired_dedups/2` becomes a noticeable
chunk of a flamegraph and specifically the `maps:filter/2` within.

`maps:filter/2` is slow here because the BIF used to implement it
collects a list of all key-value pairs for which the predicate returns
true, sorts it, and then creates a new hashmap from it. We are unlikely
to expire any given dedup when handling a command, especially when
submitting many commands at once, so we end up essentially calling
`maps:from_list(maps:to_list(Dedups))`.

It is a small improvement to replace `maps:filter/2` with `maps:fold/3`,
a case expression and `maps:remove/2` (reflecting that we will always
fold over the map but rarely remove elements) but it is not enough to
eliminate this rather large chunk of the flamegraph.

Instead the solution in this commit is to move the detection of expired
dedups to the "tick" aux effect. Ra emits this effect periodically:
every 1s by default but configurable. By moving this detection to
`handle_aux/5` we remove it from the "hot path" of `apply/3`. Even
better though: we are unlikely to actually need to expire any dedups.
In the case of queue deletion in RabbitMQ for example, we are likely
to handle all in-flight transactions and handle the subsequent
`#dedup_ack{}` commands before even evaluating a `tick` effect. If we
do handle a tick effect while transactions are in-flight then we are
unlikely to need to expire them anyways so we will only scan the map
with the new `khepri_utils:maps_any/2` helper.

If we need to expire dedups, the aux handler for the `tick` effect
appends a new `#expire_dedups{}` command to the log which does the same
as `drop_expired_dedups/2` prior to this commit.
  • Loading branch information
the-mikedavis committed Sep 23, 2024
1 parent f746ba7 commit 0861e3c
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 81 deletions.
53 changes: 29 additions & 24 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@
get_dedups/1]).

-ifdef(TEST).
-export([make_virgin_state/1,
-export([do_process_sync_command/3,
make_virgin_state/1,
convert_state/3,
set_tree/2]).
-endif.
Expand Down Expand Up @@ -1263,6 +1264,19 @@ handle_aux(
Tree = get_tree(State),
ok = restore_projection(Projection, Tree, PathPattern),
{no_reply, AuxState, IntState};
handle_aux(_RaState, cast, tick, AuxState, IntState) ->
State = ra_aux:machine_state(IntState),
Ts = erlang:system_time(millisecond),
Dedups = get_dedups(State),
CanExpire = fun(_CommandRef, {_Reply, Expiry}) -> Expiry =< Ts end,
Effects = case khepri_utils:maps_any(CanExpire, Dedups) of
true ->
ExpireDedups = #expire_dedups{},
[{append, ExpireDedups}];
false ->
[]
end,
{no_reply, AuxState, IntState, Effects};
handle_aux(_RaState, _Type, _Command, AuxState, IntState) ->
{no_reply, AuxState, IntState}.

Expand Down Expand Up @@ -1461,6 +1475,19 @@ apply(
end,
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(
#{machine_version := MacVer,
system_time := Timestamp} = Meta,
#expire_dedups{},
State) when MacVer >= 1 ->
Dedups = get_dedups(State),
Dedups1 = maps:filter(
fun(_CommandRef, {_Reply, Expiry}) ->
Expiry > Timestamp
end, Dedups),
State1 = set_dedups(State, Dedups1),
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(Meta, {machine_version, OldMacVer, NewMacVer}, OldState) ->
NewState = convert_state(OldState, OldMacVer, NewMacVer),
Ret = {NewState, ok},
Expand Down Expand Up @@ -1493,9 +1520,7 @@ apply(#{machine_version := MacVer} = Meta, UnknownCommand, State) ->
post_apply({State, Result}, Meta) ->
post_apply({State, Result, []}, Meta);
post_apply({_State, _Result, _SideEffects} = Ret, Meta) ->
Ret1 = bump_applied_command_count(Ret, Meta),
Ret2 = drop_expired_dedups(Ret1, Meta),
Ret2.
bump_applied_command_count(Ret, Meta).

-spec bump_applied_command_count(ApplyRet, Meta) ->
{State, Result, SideEffects} when
Expand Down Expand Up @@ -1535,26 +1560,6 @@ reset_applied_command_count(State) ->
Metrics1 = maps:remove(applied_command_count, Metrics),
set_metrics(State, Metrics1).

-spec drop_expired_dedups(ApplyRet, Meta) ->
{State, Result, SideEffects} when
ApplyRet :: {State, Result, SideEffects},
State :: state(),
Result :: any(),
Meta :: ra_machine:command_meta_data(),
SideEffects :: ra_machine:effects().
%% @private

drop_expired_dedups(
{State, Result, SideEffects},
#{system_time := Timestamp}) ->
Dedups = get_dedups(State),
Dedups1 = maps:filter(
fun(_CommandRef, {_Reply, Expiry}) ->
Expiry >= Timestamp
end, Dedups),
State1 = set_dedups(State, Dedups1),
{State1, Result, SideEffects}.

%% @private

state_enter(leader, State) ->
Expand Down
2 changes: 2 additions & 0 deletions src/khepri_machine.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@

-record(dedup_ack, {ref :: reference()}).

-record(expire_dedups, {}).

%% Old commands, kept for backward-compatibility.

-record(unregister_projection, {name :: khepri_projection:name()}).
20 changes: 20 additions & 0 deletions src/khepri_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
-export([start_timeout_window/1,
end_timeout_window/2,
sleep/2,
maps_any/2,
is_ra_server_alive/1,

node_props_to_payload/2,
Expand Down Expand Up @@ -83,6 +84,25 @@ sleep(Time, Timeout) when Time > Timeout ->
timer:sleep(Timeout),
0.

-spec maps_any(Predicate, Map) -> boolean() when
Predicate :: fun((Key, Value) -> boolean()),
Map :: #{Key => Value},
Key :: term(),
Value :: term().

maps_any(Fun, Map) when is_map(Map) ->
maps_any1(Fun, maps:next(maps:iterator(Map))).

maps_any1(Fun, {Key, Value, Iterator}) ->
case Fun(Key, Value) of
true ->
true;
false ->
maps_any1(Fun, maps:next(Iterator))
end;
maps_any1(_Fun, none) ->
false.

-spec is_ra_server_alive(RaServer) -> IsAlive when
RaServer :: ra:server_id(),
IsAlive :: boolean().
Expand Down
128 changes: 72 additions & 56 deletions test/protect_against_dups_option.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,62 +127,78 @@ dedup_and_dedup_ack_test() ->
?assertEqual(ok, Ret2),
?assertEqual([], SE2).

dedup_expiry_test() ->
S00 = khepri_machine:init(?MACH_PARAMS()),
S0 = khepri_machine:convert_state(S00, 0, 1),

Command = #put{path = [foo],
payload = khepri_payload:data(value),
options = #{expect_specific_node => true,
props_to_return => [payload,
payload_version]}},
CommandRef = make_ref(),
Delay = 2000,
Expiry = erlang:system_time(millisecond) + Delay,
DedupCommand = #dedup{ref = CommandRef,
expiry = Expiry,
command = Command},
{S1, Ret1, SE1} = khepri_machine:apply(?META, DedupCommand, S0),
ExpectedRet = {ok, #{[foo] => #{payload_version => 1}}},

Dedups1 = khepri_machine:get_dedups(S1),
?assertEqual(#{CommandRef => {ExpectedRet, Expiry}}, Dedups1),

Root1 = khepri_machine:get_root(S1),
?assertEqual(
#node{
props =
#{payload_version => 1,
child_list_version => 2},
child_nodes =
#{foo =>
#node{
props = ?INIT_NODE_PROPS,
payload = khepri_payload:data(value)}}},
Root1),
?assertEqual(ExpectedRet, Ret1),
?assertEqual([], SE1),

timer:sleep(Delay + 1000),

%% The put command is idempotent, so not really ideal to test
%% deduplication. Instead, we mess up with the state and silently restore
%% the initial empty tree. If the dedup mechanism works, the returned
%% state shouldn't have the `foo' node either because it didn't process
%% the command.
PatchedS1 = khepri_machine:set_tree(S1, khepri_machine:get_tree(S0)),
{S2, Ret2, SE2} = khepri_machine:apply(?META, DedupCommand, PatchedS1),

%% The dedups entry was dropped at the end of apply because it expired.
Dedups2 = khepri_machine:get_dedups(S2),
?assertEqual(#{}, Dedups2),

Root0 = khepri_machine:get_root(S0),
Root2 = khepri_machine:get_root(S2),
?assertEqual(Root0, Root2),

?assertEqual(ExpectedRet, Ret2),
?assertEqual([], SE2).
dedup_expiry_test_() ->
TickTimeout = 200,
Config = #{tick_timeout => TickTimeout},
StoredProcPath = [sproc],
Path = [stock, wood, <<"oak">>],
Command = #tx{'fun' = StoredProcPath, args = []},
CommandRef = make_ref(),
Expiry = erlang:system_time(millisecond),
DedupCommand = #dedup{ref = CommandRef,
command = Command,
expiry = Expiry},
{setup,
fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME, Config) end,
fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
[{inorder,
[{"Store a procedure for later use",
?_assertEqual(
ok,
khepri:put(
?FUNCTION_NAME, StoredProcPath,
fun() ->
{ok, N} = khepri_tx:get(Path),
khepri_tx:put(Path, N + 1)
end))},

{"Store initial data",
?_assertEqual(
ok,
khepri:put(?FUNCTION_NAME, Path, 1))},

{"Trigger the transaction",
?_assertEqual(
ok,
khepri_machine:do_process_sync_command(
?FUNCTION_NAME, DedupCommand, #{}))},

{"The transaction was applied and the data is incremented",
?_assertEqual(
{ok, 2},
khepri:get(?FUNCTION_NAME, Path))},

{"Trigger the transaction again before the dedup can be expired",
?_assertEqual(
ok,
khepri_machine:do_process_sync_command(
?FUNCTION_NAME, DedupCommand, #{}))},

{"The transaction was deduplicated and the data is unchanged",
?_assertEqual(
{ok, 2},
khepri:get(?FUNCTION_NAME, Path))},

{"Sleep and send the same transaction command",
?_test(
begin
%% Sleep a little extra for the sake of slow CI runners.
%% During this sleep time the machine will receive Ra's
%% periodic `tick' aux effect which will trigger expiration of
%% dedups.
SleepTime = TickTimeout + erlang:floor(TickTimeout * 3 / 4),
timer:sleep(SleepTime),
%% The dedup should be expired so this command should
?assertEqual(
ok,
khepri_machine:do_process_sync_command(
?FUNCTION_NAME, DedupCommand, #{}))
end)},

{"The transaction was applied again and the data is incremented",
?_assertEqual(
{ok, 3},
khepri:get(?FUNCTION_NAME, Path))}]}]}.

dedup_ack_after_no_dedup_test() ->
S00 = khepri_machine:init(?MACH_PARAMS()),
Expand Down
7 changes: 6 additions & 1 deletion test/test_ra_server_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@
-include_lib("stdlib/include/assert.hrl").

-export([setup/1,
setup/2,
cleanup/1]).

setup(Testcase) ->
setup(Testcase, #{}).

setup(Testcase, CustomConfig) ->
_ = logger:set_primary_config(level, warning),
{ok, _} = application:ensure_all_started(khepri),
khepri_utils:init_list_of_modules_to_skip(),

#{ra_system := RaSystem} = Props = helpers:start_ra_system(Testcase),
{ok, StoreId} = khepri:start(RaSystem, Testcase),
RaServerConfig = maps:put(cluster_name, Testcase, CustomConfig),
{ok, StoreId} = khepri:start(RaSystem, RaServerConfig),
Props#{store_id => StoreId}.

cleanup(#{store_id := StoreId} = Props) ->
Expand Down
15 changes: 15 additions & 0 deletions test/utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,18 @@ timeout_window_with_high_timeout_test() ->
timer:sleep(Sleep),
NewTimeout = khepri_utils:end_timeout_window(Timeout, Window),
?assert(NewTimeout =< Timeout - Sleep).

maps_any_test() ->
IsEven = fun(N) -> (N band 1) == 0 end,
ValueIsEven = fun(_Key, Value) -> IsEven(Value) end,

?assertNot(khepri_utils:maps_any(ValueIsEven, #{})),
?assertNot(khepri_utils:maps_any(ValueIsEven, #{k => 1})),
?assert(khepri_utils:maps_any(ValueIsEven, #{k => 2})),

Map1 = #{N => N || N <- lists:seq(1, 100)},
?assert(khepri_utils:maps_any(ValueIsEven, Map1)),
Map2 = #{N => N || N <- lists:seq(1, 100), not IsEven(N)},
?assertNot(khepri_utils:maps_any(ValueIsEven, Map2)),

ok.

0 comments on commit 0861e3c

Please sign in to comment.