Skip to content

Commit

Permalink
khepri_machine: Drop expired dedups on tick
Browse files Browse the repository at this point in the history
  • Loading branch information
the-mikedavis committed Sep 19, 2024
1 parent f456cae commit 655edf4
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 31 deletions.
58 changes: 27 additions & 31 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,19 @@ handle_aux(
Tree = get_tree(State),
ok = restore_projection(Projection, Tree, PathPattern),
{no_reply, AuxState, IntState};
handle_aux(_RaState, cast, tick, AuxState, IntState) ->
State = ra_aux:machine_state(IntState),
Ts = erlang:system_time(millisecond),
Dedups = get_dedups(State),
CanExpire = fun(_CommandRef, {_Reply, Expiry}) -> Ts >= Expiry end,
Effs = case khepri_utils:maps_any(CanExpire, Dedups) of
true ->
ExpireDedups = #expire_dedups{},
[{append, ExpireDedups}];
false ->
[]
end,
{no_reply, AuxState, IntState, Effs};
handle_aux(_RaState, _Type, _Command, AuxState, IntState) ->
{no_reply, AuxState, IntState}.

Expand Down Expand Up @@ -1458,6 +1471,19 @@ apply(
end,
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(
#{machine_version := MacVer,
system_time := Timestamp} = Meta,
#expire_dedups{},
State) when MacVer >= 1 ->
Dedups = get_dedups(State),
Dedups1 = maps:filter(
fun(_CommandRef, {_Reply, Expiry}) ->
Expiry > Timestamp
end, Dedups),
State1 = set_dedups(State, Dedups1),
Ret = {State1, ok},
post_apply(Ret, Meta);
apply(Meta, {machine_version, OldMacVer, NewMacVer}, OldState) ->
NewState = convert_state(OldState, OldMacVer, NewMacVer),
Ret = {NewState, ok},
Expand Down Expand Up @@ -1490,9 +1516,7 @@ apply(#{machine_version := MacVer} = Meta, UnknownCommand, State) ->
post_apply({State, Result}, Meta) ->
post_apply({State, Result, []}, Meta);
post_apply({_State, _Result, _SideEffects} = Ret, Meta) ->
Ret1 = bump_applied_command_count(Ret, Meta),
Ret2 = drop_expired_dedups(Ret1, Meta),
Ret2.
bump_applied_command_count(Ret, Meta).

-spec bump_applied_command_count(ApplyRet, Meta) ->
{State, Result, SideEffects} when
Expand Down Expand Up @@ -1532,34 +1556,6 @@ reset_applied_command_count(State) ->
Metrics1 = maps:remove(applied_command_count, Metrics),
set_metrics(State, Metrics1).

-spec drop_expired_dedups(ApplyRet, Meta) ->
{State, Result, SideEffects} when
ApplyRet :: {State, Result, SideEffects},
State :: state(),
Result :: any(),
Meta :: ra_machine:command_meta_data(),
SideEffects :: ra_machine:effects().
%% @private

drop_expired_dedups(
{State, Result, SideEffects},
#{system_time := Timestamp}) ->
Dedups = get_dedups(State),
%% This would look cleaner written with `maps:filter/2' but it turns out
%% that function is very inefficient.
%% TODO: explain why.
Dedups1 = maps:fold(
fun(CommandRef, {_Reply, Expiry}, Acc) ->
case Expiry >= Timestamp of
true ->
maps:remove(CommandRef, Acc);
false ->
Acc
end
end, Dedups, Dedups),
State1 = set_dedups(State, Dedups1),
{State1, Result, SideEffects}.

%% @private

state_enter(leader, State) ->
Expand Down
2 changes: 2 additions & 0 deletions src/khepri_machine.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@

-record(dedup_ack, {ref :: reference()}).

-record(expire_dedups, {}).

%% Old commands, kept for backward-compatibility.

-record(unregister_projection, {name :: khepri_projection:name()}).
20 changes: 20 additions & 0 deletions src/khepri_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
-export([start_timeout_window/1,
end_timeout_window/2,
sleep/2,
maps_any/2,
is_ra_server_alive/1,

node_props_to_payload/2,
Expand Down Expand Up @@ -85,6 +86,25 @@ sleep(Time, Timeout) when Time > Timeout ->
timer:sleep(Timeout),
0.

-spec maps_any(Predicate, Map) -> boolean() when
Predicate :: fun((Key, Value) -> boolean()),
Map :: #{Key => Value},
Key :: term(),
Value :: term().

maps_any(Fun, Map) when is_map(Map) ->
do_maps_any(Fun, maps:next(maps:iterator(Map))).

do_maps_any(Fun, {Key, Value, Iterator}) ->
case Fun(Key, Value) of
true ->
true;
false ->
do_maps_any(Fun, maps:next(Iterator))
end;
do_maps_any(_Fun, none) ->
false.

-spec is_ra_server_alive(RaServer) -> IsAlive when
RaServer :: ra:server_id(),
IsAlive :: boolean().
Expand Down

0 comments on commit 655edf4

Please sign in to comment.