Skip to content

Commit

Permalink
khepri_machine: Expire dedups on tick
Browse files Browse the repository at this point in the history
As part of the `khepri_machine:post_apply/2` helper which is run after
any command is applied, we use `maps:filter/2` to eliminate any entries
in the `dedups` field of the machine state which are expired according
to the command's timestamp. This `drop_expired_dedups/2` step becomes a
bottleneck though when a Khepri store handles many transactions at once.

For example in RabbitMQ, queue deletion is done with a transaction
submitted from each queue process. When many (for example five thousand)
queues are deleted at once, `drop_expired_dedups/2` becomes a noticeable
chunk of a flamegraph and specifically the `maps:filter/2` within.

`maps:filter/2` is slow here because the BIF used to implement it
collects a list of all key-value pairs for which the predicate returns
true, sorts it, and then creates a new hashmap from it. We are unlikely
to expire any given dedup when handling a command, especially when
submitting many commands at once, so we end up essentially calling
`maps:from_list(maps:to_list(Dedups))`.

It is a small improvement to replace `maps:filter/2` with `maps:fold/3`,
a case expression and `maps:remove/2` (reflecting that we will always
fold over the map but rarely remove elements) but it is not enough to
eliminate this rather large chunk of the flamegraph.

Instead the solution in this commit is to move the detection of expired
dedups to the "tick" aux effect. Ra emits this effect periodically (say
every few seconds). By moving this detection to `handle_aux/5` we remove
it from the "hot path" of `apply/3`. Even better though: we are unlikely
to actually need to expire any dedups. In the case of queue deletion in
RabbitMQ for example, we are likely to handle all in-flight transactions
and handle the subsequent `#dedup_ack{}` commands before even evaluating
a `tick` effect. If we do handle a tick effect while transactions are
in-flight then we are unlikely to need to expire them anyways so we will
only scan the map with the new `khepri_utils:maps_any/2` helper.

If we need to expire dedups, the aux handler for the `tick` effect
appends a new `#expire_dedups{}` command to the log which does the same
as `drop_expired_dedups/2` prior to this commit.
  • Loading branch information
the-mikedavis committed Sep 23, 2024
1 parent d97c695 commit 435ac23
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 23 deletions.
50 changes: 27 additions & 23 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,19 @@ handle_aux(
Tree = get_tree(State),
ok = restore_projection(Projection, Tree, PathPattern),
{no_reply, AuxState, IntState};
handle_aux(_RaState, cast, tick, AuxState, IntState) ->
State = ra_aux:machine_state(IntState),
Ts = erlang:system_time(millisecond),
Dedups = get_dedups(State),
CanExpire = fun(_CommandRef, {_Reply, Expiry}) -> Expiry =< Ts end,
Effs = case khepri_utils:maps_any(CanExpire, Dedups) of
true ->
ExpireDedups = #expire_dedups{},
[{append, ExpireDedups}];
false ->
[]
end,
{no_reply, AuxState, IntState, Effs};
handle_aux(_RaState, _Type, _Command, AuxState, IntState) ->
{no_reply, AuxState, IntState}.

Expand Down Expand Up @@ -1458,6 +1471,19 @@ apply(
end,
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(
#{machine_version := MacVer,
system_time := Timestamp} = Meta,
#expire_dedups{},
State) when MacVer >= 1 ->
Dedups = get_dedups(State),
Dedups1 = maps:filter(
fun(_CommandRef, {_Reply, Expiry}) ->
Expiry > Timestamp
end, Dedups),
State1 = set_dedups(State, Dedups1),
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(Meta, {machine_version, OldMacVer, NewMacVer}, OldState) ->
NewState = convert_state(OldState, OldMacVer, NewMacVer),
Ret = {NewState, ok},
Expand Down Expand Up @@ -1490,9 +1516,7 @@ apply(#{machine_version := MacVer} = Meta, UnknownCommand, State) ->
post_apply({State, Result}, Meta) ->
post_apply({State, Result, []}, Meta);
post_apply({_State, _Result, _SideEffects} = Ret, Meta) ->
Ret1 = bump_applied_command_count(Ret, Meta),
Ret2 = drop_expired_dedups(Ret1, Meta),
Ret2.
bump_applied_command_count(Ret, Meta).

-spec bump_applied_command_count(ApplyRet, Meta) ->
{State, Result, SideEffects} when
Expand Down Expand Up @@ -1532,26 +1556,6 @@ reset_applied_command_count(State) ->
Metrics1 = maps:remove(applied_command_count, Metrics),
set_metrics(State, Metrics1).

-spec drop_expired_dedups(ApplyRet, Meta) ->
{State, Result, SideEffects} when
ApplyRet :: {State, Result, SideEffects},
State :: state(),
Result :: any(),
Meta :: ra_machine:command_meta_data(),
SideEffects :: ra_machine:effects().
%% @private

drop_expired_dedups(
{State, Result, SideEffects},
#{system_time := Timestamp}) ->
Dedups = get_dedups(State),
Dedups1 = maps:filter(
fun(_CommandRef, {_Reply, Expiry}) ->
Expiry >= Timestamp
end, Dedups),
State1 = set_dedups(State, Dedups1),
{State1, Result, SideEffects}.

%% @private

state_enter(leader, State) ->
Expand Down
2 changes: 2 additions & 0 deletions src/khepri_machine.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@

-record(dedup_ack, {ref :: reference()}).

-record(expire_dedups, {}).

%% Old commands, kept for backward-compatibility.

-record(unregister_projection, {name :: khepri_projection:name()}).
20 changes: 20 additions & 0 deletions src/khepri_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
-export([start_timeout_window/1,
end_timeout_window/2,
sleep/2,
maps_any/2,
is_ra_server_alive/1,

node_props_to_payload/2,
Expand Down Expand Up @@ -83,6 +84,25 @@ sleep(Time, Timeout) when Time > Timeout ->
timer:sleep(Timeout),
0.

-spec maps_any(Predicate, Map) -> boolean() when
Predicate :: fun((Key, Value) -> boolean()),
Map :: #{Key => Value},
Key :: term(),
Value :: term().

maps_any(Fun, Map) when is_map(Map) ->
maps_any1(Fun, maps:next(maps:iterator(Map))).

maps_any1(Fun, {Key, Value, Iterator}) ->
case Fun(Key, Value) of
true ->
true;
false ->
maps_any1(Fun, maps:next(Iterator))
end;
maps_any1(_Fun, none) ->
false.

-spec is_ra_server_alive(RaServer) -> IsAlive when
RaServer :: ra:server_id(),
IsAlive :: boolean().
Expand Down
15 changes: 15 additions & 0 deletions test/utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,18 @@ timeout_window_with_high_timeout_test() ->
timer:sleep(Sleep),
NewTimeout = khepri_utils:end_timeout_window(Timeout, Window),
?assert(NewTimeout =< Timeout - Sleep).

maps_any_test() ->
IsEven = fun(N) -> (N band 1) == 0 end,
ValueIsEven = fun(_Key, Value) -> IsEven(Value) end,

?assertNot(khepri_utils:maps_any(ValueIsEven, #{})),
?assertNot(khepri_utils:maps_any(ValueIsEven, #{k => 1})),
?assert(khepri_utils:maps_any(ValueIsEven, #{k => 2})),

Map1 = #{N => N || N <- lists:seq(1, 100)},
?assert(khepri_utils:maps_any(ValueIsEven, Map1)),
Map2 = #{N => N || N <- lists:seq(1, 100), not IsEven(N)},
?assertNot(khepri_utils:maps_any(ValueIsEven, Map2)),

ok.

0 comments on commit 435ac23

Please sign in to comment.