Skip to content

Commit

Permalink
Merge pull request #297 from rabbitmq/md/expire-dedups-on-tick
Browse files Browse the repository at this point in the history
Expire dedups on `tick`
  • Loading branch information
dumbbell authored Nov 4, 2024
2 parents 5db448b + c522a88 commit 3aacfb5
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 75 deletions.
105 changes: 88 additions & 17 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
%% <li>Changed the data structure for the reverse index used to track
%% keep-while conditions to be a prefix tree (see {@link khepri_prefix_tree}).
%% </li>
%% <li>Moved the expiration of dedups to the `tick' aux effect (see {@link
%% handle_aux/5}). This also introduces a new command `#drop_dedups{}'.</li>
%% </ul>
%% </td>
%% </tr>
Expand Down Expand Up @@ -85,7 +87,7 @@
%% ra_machine callbacks.
-export([init/1,
init_aux/1,
handle_aux/6,
handle_aux/5,
apply/3,
state_enter/2,
snapshot_installed/4,
Expand Down Expand Up @@ -119,7 +121,8 @@
get_dedups/1]).

-ifdef(TEST).
-export([make_virgin_state/1,
-export([do_process_sync_command/3,
make_virgin_state/1,
convert_state/3,
set_tree/2]).
-endif.
Expand Down Expand Up @@ -1236,14 +1239,16 @@ init(Params) ->
init_aux(StoreId) ->
#khepri_machine_aux{store_id = StoreId}.

-spec handle_aux(RaState, Type, Command, AuxState, LogState, MachineState) ->
{no_reply, AuxState, LogState} when
-spec handle_aux(RaState, Type, Command, AuxState, IntState) ->
Ret when
RaState :: ra_server:ra_state(),
Type :: {call, ra:from()} | cast,
Command :: term(),
AuxState :: aux_state(),
LogState :: ra_log:state(),
MachineState :: state().
IntState :: ra_aux:internal_state(),
Ret :: {no_reply, AuxState, IntState} |
{no_reply, AuxState, IntState, Effects},
Effects :: ra_machine:effects().
%% @private

handle_aux(
Expand All @@ -1252,12 +1257,12 @@ handle_aux(
old_props = OldProps,
new_props = NewProps,
projection = Projection},
AuxState, LogState, _MachineState) ->
AuxState, IntState) ->
khepri_projection:trigger(Projection, Path, OldProps, NewProps),
{no_reply, AuxState, LogState};
{no_reply, AuxState, IntState};
handle_aux(
_RaState, cast, restore_projections, AuxState, LogState,
State) ->
_RaState, cast, restore_projections, AuxState, IntState) ->
State = ra_aux:machine_state(IntState),
Tree = get_tree(State),
ProjectionTree = get_projections(State),
khepri_pattern_tree:foreach(
Expand All @@ -1266,16 +1271,48 @@ handle_aux(
[restore_projection(Projection, Tree, PathPattern) ||
Projection <- Projections]
end),
{no_reply, AuxState, LogState};
{no_reply, AuxState, IntState};
handle_aux(
_RaState, cast,
#restore_projection{projection = Projection, pattern = PathPattern},
AuxState, LogState, State) ->
AuxState, IntState) ->
State = ra_aux:machine_state(IntState),
Tree = get_tree(State),
ok = restore_projection(Projection, Tree, PathPattern),
{no_reply, AuxState, LogState};
handle_aux(_RaState, _Type, _Command, AuxState, LogState, _MachineState) ->
{no_reply, AuxState, LogState}.
{no_reply, AuxState, IntState};
handle_aux(leader, cast, tick, AuxState, IntState) ->
%% Expiring dedups in the tick handler is only available on versions 2
%% and greater. In versions 0 and 1, expiration of dedups is done in
%% `drop_expired_dedups/2'. This proved to be quite expensive when handling
%% a very large batch of transactions at once, so this expiration step was
%% moved to the `tick' handler in version 2.
case ra_aux:effective_machine_version(IntState) of
EffectiveMacVer when EffectiveMacVer >= 2 ->
State = ra_aux:machine_state(IntState),
Timestamp = erlang:system_time(millisecond),
Dedups = get_dedups(State),
RefsToDrop = maps:fold(
fun(CommandRef, {_Reply, Expiry}, Acc) ->
case Expiry =< Timestamp of
true ->
[CommandRef | Acc];
false ->
Acc
end
end, [], Dedups),
Effects = case RefsToDrop of
[] ->
[];
_ ->
DropDedups = #drop_dedups{refs = RefsToDrop},
[{append, DropDedups}]
end,
{no_reply, AuxState, IntState, Effects};
_ ->
{no_reply, AuxState, IntState}
end;
handle_aux(_RaState, _Type, _Command, AuxState, IntState) ->
{no_reply, AuxState, IntState}.

restore_projection(Projection, Tree, PathPattern) ->
_ = khepri_projection:init(Projection),
Expand Down Expand Up @@ -1472,6 +1509,20 @@ apply(
end,
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(
#{machine_version := MacVer} = Meta,
#drop_dedups{refs = RefsToDrop},
State) when MacVer >= 2 ->
%% `#drop_dedups{}' is emitted by the `handle_aux/5' clause for the `tick'
%% effect to periodically drop dedups that have expired. This expiration
%% was originally done in `post_apply/2' via `drop_expired_dedups/2' until
%% machine version 2. Note that `drop_expired_dedups/2' is used until a
%% cluster reaches an effective machine version of 2 or higher.
Dedups = get_dedups(State),
Dedups1 = maps:without(RefsToDrop, Dedups),
State1 = set_dedups(State, Dedups1),
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(Meta, {machine_version, OldMacVer, NewMacVer}, OldState) ->
NewState = convert_state(OldState, OldMacVer, NewMacVer),
Ret = {NewState, ok},
Expand Down Expand Up @@ -1553,18 +1604,38 @@ reset_applied_command_count(State) ->
Result :: any(),
Meta :: ra_machine:command_meta_data(),
SideEffects :: ra_machine:effects().
%% Removes any dedups from the `dedups' field in state that have expired
%% according to the timestamp in the handled command.
%%
%% This function is a no-op in any other version than version 1. This proved to
%% be expensive to execute as part of `apply/3' so dedup expiration moved to
%% the `handle_aux/5' for `tick' which is executed periodically. See that
%% function clause above for more information.
%%
%% @private

drop_expired_dedups(
{State, Result, SideEffects},
#{system_time := Timestamp}) ->
#{system_time := Timestamp,
machine_version := MacVer}) when MacVer =< 1 ->
Dedups = get_dedups(State),
%% Historical note: `maps:filter/2' can be surprisingly expensive when
%% used in a tight loop like `apply/3' depending on how many elements are
%% retained. As of Erlang/OTP 27, the BIF which implements `maps:filter/2'
%% collects any key-value pairs for which the predicate returns `true' into
%% a list, sorts/dedups the list and then creates a new map. This is slow
%% if the filter function always returns `true'. In cases like this where
%% the common usage is to retain most elements, `maps:fold/3' plus a `case'
%% expression and `maps:remove/2' is likely to be less expensive.
Dedups1 = maps:filter(
fun(_CommandRef, {_Reply, Expiry}) ->
Expiry >= Timestamp
end, Dedups),
State1 = set_dedups(State, Dedups1),
{State1, Result, SideEffects}.
{State1, Result, SideEffects};
drop_expired_dedups({State, Result, SideEffects}, _Meta) ->
%% No-op on versions 2 and higher.
{State, Result, SideEffects}.

%% @private

Expand Down
7 changes: 7 additions & 0 deletions src/khepri_machine.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@

-record(dedup_ack, {ref :: reference()}).

-record(drop_dedups, {refs :: [reference()]}).
%% A command introduced in machine version 2 which is meant to drop expired
%% dedups.
%%
%% This is emitted internally by the `handle_aux/5' callback clause which
%% handles the `tick' Ra aux effect.

%% Old commands, kept for backward-compatibility.

-record(unregister_projection, {name :: khepri_projection:name()}).
2 changes: 1 addition & 1 deletion src/khepri_projection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ trigger(
undefined ->
%% A table might have been deleted by an `unregister_projections'
%% effect in between when a `trigger_projection' effect is created
%% and when it is handled in `khepri_machine:handle_aux/6`. In this
%% and when it is handled in `khepri_machine:handle_aux/5`. In this
%% case we should no-op the trigger effect.
ok;
Table ->
Expand Down
129 changes: 73 additions & 56 deletions test/protect_against_dups_option.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,62 +127,79 @@ dedup_and_dedup_ack_test() ->
?assertEqual(ok, Ret2),
?assertEqual([], SE2).

dedup_expiry_test() ->
S00 = khepri_machine:init(?MACH_PARAMS()),
S0 = khepri_machine:convert_state(S00, 0, 1),

Command = #put{path = [foo],
payload = khepri_payload:data(value),
options = #{expect_specific_node => true,
props_to_return => [payload,
payload_version]}},
CommandRef = make_ref(),
Delay = 2000,
Expiry = erlang:system_time(millisecond) + Delay,
DedupCommand = #dedup{ref = CommandRef,
expiry = Expiry,
command = Command},
{S1, Ret1, SE1} = khepri_machine:apply(?META, DedupCommand, S0),
ExpectedRet = {ok, #{[foo] => #{payload_version => 1}}},

Dedups1 = khepri_machine:get_dedups(S1),
?assertEqual(#{CommandRef => {ExpectedRet, Expiry}}, Dedups1),

Root1 = khepri_machine:get_root(S1),
?assertEqual(
#node{
props =
#{payload_version => 1,
child_list_version => 2},
child_nodes =
#{foo =>
#node{
props = ?INIT_NODE_PROPS,
payload = khepri_payload:data(value)}}},
Root1),
?assertEqual(ExpectedRet, Ret1),
?assertEqual([], SE1),

timer:sleep(Delay + 1000),

%% The put command is idempotent, so not really ideal to test
%% deduplication. Instead, we mess up with the state and silently restore
%% the initial empty tree. If the dedup mechanism works, the returned
%% state shouldn't have the `foo' node either because it didn't process
%% the command.
PatchedS1 = khepri_machine:set_tree(S1, khepri_machine:get_tree(S0)),
{S2, Ret2, SE2} = khepri_machine:apply(?META, DedupCommand, PatchedS1),

%% The dedups entry was dropped at the end of apply because it expired.
Dedups2 = khepri_machine:get_dedups(S2),
?assertEqual(#{}, Dedups2),

Root0 = khepri_machine:get_root(S0),
Root2 = khepri_machine:get_root(S2),
?assertEqual(Root0, Root2),

?assertEqual(ExpectedRet, Ret2),
?assertEqual([], SE2).
dedup_expiry_test_() ->
TickTimeout = 200,
Config = #{tick_timeout => TickTimeout},
StoredProcPath = [sproc],
Path = [stock, wood, <<"oak">>],
Command = #tx{'fun' = StoredProcPath, args = []},
CommandRef = make_ref(),
Expiry = erlang:system_time(millisecond),
DedupCommand = #dedup{ref = CommandRef,
command = Command,
expiry = Expiry},
{setup,
fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME, Config) end,
fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
[{inorder,
[{"Store a procedure for later use",
?_assertEqual(
ok,
khepri:put(
?FUNCTION_NAME, StoredProcPath,
fun() ->
{ok, N} = khepri_tx:get(Path),
khepri_tx:put(Path, N + 1)
end))},

{"Store initial data",
?_assertEqual(
ok,
khepri:put(?FUNCTION_NAME, Path, 1))},

{"Trigger the transaction",
?_assertEqual(
ok,
khepri_machine:do_process_sync_command(
?FUNCTION_NAME, DedupCommand, #{}))},

{"The transaction was applied and the data is incremented",
?_assertEqual(
{ok, 2},
khepri:get(?FUNCTION_NAME, Path))},

{"Trigger the transaction again before the dedup can be expired",
?_assertEqual(
ok,
khepri_machine:do_process_sync_command(
?FUNCTION_NAME, DedupCommand, #{}))},

{"The transaction was deduplicated and the data is unchanged",
?_assertEqual(
{ok, 2},
khepri:get(?FUNCTION_NAME, Path))},

{"Sleep and send the same transaction command",
?_test(
begin
%% Sleep a little extra for the sake of slow CI runners.
%% During this sleep time the machine will receive Ra's
%% periodic `tick' aux effect which will trigger expiration of
%% dedups.
SleepTime = TickTimeout + erlang:floor(TickTimeout * 3 / 4),
timer:sleep(SleepTime),
%% The dedup should be expired so this duplicate command should
%% be handled and the data should be incremented.
?assertEqual(
ok,
khepri_machine:do_process_sync_command(
?FUNCTION_NAME, DedupCommand, #{}))
end)},

{"The transaction was applied again and the data is incremented",
?_assertEqual(
{ok, 3},
khepri:get(?FUNCTION_NAME, Path))}]}]}.

dedup_ack_after_no_dedup_test() ->
S00 = khepri_machine:init(?MACH_PARAMS()),
Expand Down
7 changes: 6 additions & 1 deletion test/test_ra_server_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@
-include_lib("stdlib/include/assert.hrl").

-export([setup/1,
setup/2,
cleanup/1]).

setup(Testcase) ->
setup(Testcase, #{}).

setup(Testcase, CustomConfig) ->
_ = logger:set_primary_config(level, warning),
{ok, _} = application:ensure_all_started(khepri),
khepri_utils:init_list_of_modules_to_skip(),

#{ra_system := RaSystem} = Props = helpers:start_ra_system(Testcase),
{ok, StoreId} = khepri:start(RaSystem, Testcase),
RaServerConfig = maps:put(cluster_name, Testcase, CustomConfig),
{ok, StoreId} = khepri:start(RaSystem, RaServerConfig),
Props#{store_id => StoreId}.

cleanup(#{store_id := StoreId} = Props) ->
Expand Down

0 comments on commit 3aacfb5

Please sign in to comment.