Skip to content

Commit

Permalink
khepri_machine: Expire dedups on tick
Browse files Browse the repository at this point in the history
As part of the `khepri_machine:post_apply/2` helper which is run after
any command is applied, we use `maps:filter/2` to eliminate any entries
in the `dedups` field of the machine state which are expired according
to the command's timestamp. This `drop_expired_dedups/2` step becomes a
bottleneck though when a Khepri store handles many transactions at once.

For example in RabbitMQ, queue deletion is done with a transaction
submitted from each queue process. When many (for example five thousand)
queues are deleted at once, `drop_expired_dedups/2` becomes a noticeable
chunk of a flamegraph and specifically the `maps:filter/2` within.

`maps:filter/2` is slow here because the BIF used to implement it
collects a list of all key-value pairs for which the predicate returns
true, sorts it, and then creates a new hashmap from it. We are unlikely
to expire any given dedup when handling a command, especially when
submitting many commands at once, so we end up essentially calling
`maps:from_list(maps:to_list(Dedups))`.

It is a small improvement to replace `maps:filter/2` with `maps:fold/3`,
a case expression and `maps:remove/2` (reflecting that we will always
fold over the map but rarely remove elements) but it is not enough to
eliminate this rather large chunk of the flamegraph.

Instead the solution in this commit is to move the detection of expired
dedups to the "tick" aux effect. Ra emits this effect periodically:
every 1s by default but configurable. By moving this detection to
`handle_aux/5` we remove it from the "hot path" of `apply/3`. Even
better though: we are unlikely to actually need to expire any dedups.
In the case of queue deletion in RabbitMQ for example, we are likely
to handle all in-flight transactions and handle the subsequent
`#dedup_ack{}` commands before even evaluating a `tick` effect. If we
do handle a tick effect while transactions are in-flight then we are
unlikely to need to expire them anyways so we will only scan the map
with the new `khepri_utils:maps_any/2` helper.

If we need to expire dedups, the aux handler for the `tick` effect
appends a new `#expire_dedups{}` command to the log which does the same
as `drop_expired_dedups/2` prior to this commit.
  • Loading branch information
the-mikedavis committed Sep 24, 2024
1 parent 7f8e8d0 commit f17e12f
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 60 deletions.
63 changes: 60 additions & 3 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
%% <li>Changed the data structure for the reverse index used to track
%% keep-while conditions to be a prefix tree (see {@link khepri_prefix_tree}).
%% </li>
%% <li>Moved the expiration of dedups to the `tick' aux effect (see {@link
%% handle_aux/5}). This also introduces a new command `#drop_dedups{}'.</li>
%% </ul>
%% </td>
%% </tr>
Expand Down Expand Up @@ -119,7 +121,8 @@
get_dedups/1]).

-ifdef(TEST).
-export([make_virgin_state/1,
-export([do_process_sync_command/3,
make_virgin_state/1,
convert_state/3,
set_tree/2]).
-endif.
Expand Down Expand Up @@ -1278,6 +1281,40 @@ handle_aux(
Tree = get_tree(State),
ok = restore_projection(Projection, Tree, PathPattern),
{no_reply, AuxState, IntState};
handle_aux(leader, cast, tick, AuxState, IntState) ->
%% Expiring dedups in the tick handler is only available on versions 2
%% and greater. In versions 0 and 1, expiration of dedups is done in
%% `drop_expired_dedups/2'. This proved to be quite expensive when handling
%% a very large batch of transactions at once, so this expiration step was
%% moved to the `tick' handler in version 2.
%%
%% The version 2 increment is required for this change because we introduce
%% a new command: `#drop_dedups{}`.
case ra_aux:effective_machine_version(IntState) of
N when N >= 2 ->
State = ra_aux:machine_state(IntState),
Timestamp = erlang:system_time(millisecond),
Dedups = get_dedups(State),
RefsToDrop = maps:fold(
fun(CommandRef, {_Reply, Expiry}, Acc) ->
case Expiry =< Timestamp of
true ->
[CommandRef | Acc];
false ->
Acc
end
end, [], Dedups),
Effects = case RefsToDrop of
[] ->
[];
_ ->
DropDedups = #drop_dedups{refs = RefsToDrop},
[{append, DropDedups}]
end,
{no_reply, AuxState, IntState, Effects};
_ ->
{no_reply, AuxState, IntState}
end;
handle_aux(_RaState, _Type, _Command, AuxState, IntState) ->
{no_reply, AuxState, IntState}.

Expand Down Expand Up @@ -1476,6 +1513,15 @@ apply(
end,
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(
#{machine_version := MacVer} = Meta,
#drop_dedups{refs = RefsToDrop},
State) when MacVer >= 1 ->
Dedups = get_dedups(State),
Dedups1 = maps:without(RefsToDrop, Dedups),
State1 = set_dedups(State, Dedups1),
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(Meta, {machine_version, OldMacVer, NewMacVer}, OldState) ->
NewState = convert_state(OldState, OldMacVer, NewMacVer),
Ret = {NewState, ok},
Expand Down Expand Up @@ -1557,18 +1603,29 @@ reset_applied_command_count(State) ->
Result :: any(),
Meta :: ra_machine:command_meta_data(),
SideEffects :: ra_machine:effects().
%% Removes any dedups from the `dedups' field in state that have expired
%% according to the timestamp in the handled command.
%%
%% This function is a no-op in any other version than version 1. This proved to
%% be expensive to execute as part of `apply/3' so dedup expiration moved to
%% the `handle_aux/5' for `tick' which is executed periodically. See that
%% function clause above for more information.
%%
%% @private

drop_expired_dedups(
{State, Result, SideEffects},
#{system_time := Timestamp}) ->
#{system_time := Timestamp,
machine_version := MacVer}) when MacVer =< 1 ->
Dedups = get_dedups(State),
Dedups1 = maps:filter(
fun(_CommandRef, {_Reply, Expiry}) ->
Expiry >= Timestamp
end, Dedups),
State1 = set_dedups(State, Dedups1),
{State1, Result, SideEffects}.
{State1, Result, SideEffects};
drop_expired_dedups({State, Result, SideEffects}, _Meta) ->
{State, Result, SideEffects}.

%% @private

Expand Down
2 changes: 2 additions & 0 deletions src/khepri_machine.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@

-record(dedup_ack, {ref :: reference()}).

-record(drop_dedups, {refs :: [reference()]}).

%% Old commands, kept for backward-compatibility.

-record(unregister_projection, {name :: khepri_projection:name()}).
129 changes: 73 additions & 56 deletions test/protect_against_dups_option.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,62 +127,79 @@ dedup_and_dedup_ack_test() ->
?assertEqual(ok, Ret2),
?assertEqual([], SE2).

dedup_expiry_test() ->
S00 = khepri_machine:init(?MACH_PARAMS()),
S0 = khepri_machine:convert_state(S00, 0, 1),

Command = #put{path = [foo],
payload = khepri_payload:data(value),
options = #{expect_specific_node => true,
props_to_return => [payload,
payload_version]}},
CommandRef = make_ref(),
Delay = 2000,
Expiry = erlang:system_time(millisecond) + Delay,
DedupCommand = #dedup{ref = CommandRef,
expiry = Expiry,
command = Command},
{S1, Ret1, SE1} = khepri_machine:apply(?META, DedupCommand, S0),
ExpectedRet = {ok, #{[foo] => #{payload_version => 1}}},

Dedups1 = khepri_machine:get_dedups(S1),
?assertEqual(#{CommandRef => {ExpectedRet, Expiry}}, Dedups1),

Root1 = khepri_machine:get_root(S1),
?assertEqual(
#node{
props =
#{payload_version => 1,
child_list_version => 2},
child_nodes =
#{foo =>
#node{
props = ?INIT_NODE_PROPS,
payload = khepri_payload:data(value)}}},
Root1),
?assertEqual(ExpectedRet, Ret1),
?assertEqual([], SE1),

timer:sleep(Delay + 1000),

%% The put command is idempotent, so not really ideal to test
%% deduplication. Instead, we mess up with the state and silently restore
%% the initial empty tree. If the dedup mechanism works, the returned
%% state shouldn't have the `foo' node either because it didn't process
%% the command.
PatchedS1 = khepri_machine:set_tree(S1, khepri_machine:get_tree(S0)),
{S2, Ret2, SE2} = khepri_machine:apply(?META, DedupCommand, PatchedS1),

%% The dedups entry was dropped at the end of apply because it expired.
Dedups2 = khepri_machine:get_dedups(S2),
?assertEqual(#{}, Dedups2),

Root0 = khepri_machine:get_root(S0),
Root2 = khepri_machine:get_root(S2),
?assertEqual(Root0, Root2),

?assertEqual(ExpectedRet, Ret2),
?assertEqual([], SE2).
dedup_expiry_test_() ->
TickTimeout = 200,
Config = #{tick_timeout => TickTimeout},
StoredProcPath = [sproc],
Path = [stock, wood, <<"oak">>],
Command = #tx{'fun' = StoredProcPath, args = []},
CommandRef = make_ref(),
Expiry = erlang:system_time(millisecond),
DedupCommand = #dedup{ref = CommandRef,
command = Command,
expiry = Expiry},
{setup,
fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME, Config) end,
fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
[{inorder,
[{"Store a procedure for later use",
?_assertEqual(
ok,
khepri:put(
?FUNCTION_NAME, StoredProcPath,
fun() ->
{ok, N} = khepri_tx:get(Path),
khepri_tx:put(Path, N + 1)
end))},

{"Store initial data",
?_assertEqual(
ok,
khepri:put(?FUNCTION_NAME, Path, 1))},

{"Trigger the transaction",
?_assertEqual(
ok,
khepri_machine:do_process_sync_command(
?FUNCTION_NAME, DedupCommand, #{}))},

{"The transaction was applied and the data is incremented",
?_assertEqual(
{ok, 2},
khepri:get(?FUNCTION_NAME, Path))},

{"Trigger the transaction again before the dedup can be expired",
?_assertEqual(
ok,
khepri_machine:do_process_sync_command(
?FUNCTION_NAME, DedupCommand, #{}))},

{"The transaction was deduplicated and the data is unchanged",
?_assertEqual(
{ok, 2},
khepri:get(?FUNCTION_NAME, Path))},

{"Sleep and send the same transaction command",
?_test(
begin
%% Sleep a little extra for the sake of slow CI runners.
%% During this sleep time the machine will receive Ra's
%% periodic `tick' aux effect which will trigger expiration of
%% dedups.
SleepTime = TickTimeout + erlang:floor(TickTimeout * 3 / 4),
timer:sleep(SleepTime),
%% The dedup should be expired so this duplicate command should
%% be handled and the data should be incremented.
?assertEqual(
ok,
khepri_machine:do_process_sync_command(
?FUNCTION_NAME, DedupCommand, #{}))
end)},

{"The transaction was applied again and the data is incremented",
?_assertEqual(
{ok, 3},
khepri:get(?FUNCTION_NAME, Path))}]}]}.

dedup_ack_after_no_dedup_test() ->
S00 = khepri_machine:init(?MACH_PARAMS()),
Expand Down
7 changes: 6 additions & 1 deletion test/test_ra_server_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@
-include_lib("stdlib/include/assert.hrl").

-export([setup/1,
setup/2,
cleanup/1]).

setup(Testcase) ->
setup(Testcase, #{}).

setup(Testcase, CustomConfig) ->
_ = logger:set_primary_config(level, warning),
{ok, _} = application:ensure_all_started(khepri),
khepri_utils:init_list_of_modules_to_skip(),

#{ra_system := RaSystem} = Props = helpers:start_ra_system(Testcase),
{ok, StoreId} = khepri:start(RaSystem, Testcase),
RaServerConfig = maps:put(cluster_name, Testcase, CustomConfig),
{ok, StoreId} = khepri:start(RaSystem, RaServerConfig),
Props#{store_id => StoreId}.

cleanup(#{store_id := StoreId} = Props) ->
Expand Down

0 comments on commit f17e12f

Please sign in to comment.