From 655edf44f5a73428857bf704b0ee0127510e7e1d Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 19 Sep 2024 13:27:38 -0400 Subject: [PATCH] khepri_machine: Drop expired dedups on tick --- src/khepri_machine.erl | 58 ++++++++++++++++++++---------------------- src/khepri_machine.hrl | 2 ++ src/khepri_utils.erl | 20 +++++++++++++++ 3 files changed, 49 insertions(+), 31 deletions(-) diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index a61392dd..d6743ec2 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -1260,6 +1260,19 @@ handle_aux( Tree = get_tree(State), ok = restore_projection(Projection, Tree, PathPattern), {no_reply, AuxState, IntState}; +handle_aux(_RaState, cast, tick, AuxState, IntState) -> + State = ra_aux:machine_state(IntState), + Ts = erlang:system_time(millisecond), + Dedups = get_dedups(State), + CanExpire = fun(_CommandRef, {_Reply, Expiry}) -> Ts >= Expiry end, + Effs = case khepri_utils:maps_any(CanExpire, Dedups) of + true -> + ExpireDedups = #expire_dedups{}, + [{append, ExpireDedups}]; + false -> + [] + end, + {no_reply, AuxState, IntState, Effs}; handle_aux(_RaState, _Type, _Command, AuxState, IntState) -> {no_reply, AuxState, IntState}. @@ -1458,6 +1471,19 @@ apply( end, Ret = {State1, ok}, post_apply(Ret, Meta); +apply( + #{machine_version := MacVer, + system_time := Timestamp} = Meta, + #expire_dedups{}, + State) when MacVer >= 1 -> + Dedups = get_dedups(State), + Dedups1 = maps:filter( + fun(_CommandRef, {_Reply, Expiry}) -> + Expiry > Timestamp + end, Dedups), + State1 = set_dedups(State, Dedups1), + Ret = {State1, ok}, + post_apply(Ret, Meta); apply(Meta, {machine_version, OldMacVer, NewMacVer}, OldState) -> NewState = convert_state(OldState, OldMacVer, NewMacVer), Ret = {NewState, ok}, @@ -1490,9 +1516,7 @@ apply(#{machine_version := MacVer} = Meta, UnknownCommand, State) -> post_apply({State, Result}, Meta) -> post_apply({State, Result, []}, Meta); post_apply({_State, _Result, _SideEffects} = Ret, Meta) -> - Ret1 = bump_applied_command_count(Ret, Meta), - Ret2 = drop_expired_dedups(Ret1, Meta), - Ret2. + bump_applied_command_count(Ret, Meta). -spec bump_applied_command_count(ApplyRet, Meta) -> {State, Result, SideEffects} when @@ -1532,34 +1556,6 @@ reset_applied_command_count(State) -> Metrics1 = maps:remove(applied_command_count, Metrics), set_metrics(State, Metrics1). --spec drop_expired_dedups(ApplyRet, Meta) -> - {State, Result, SideEffects} when - ApplyRet :: {State, Result, SideEffects}, - State :: state(), - Result :: any(), - Meta :: ra_machine:command_meta_data(), - SideEffects :: ra_machine:effects(). -%% @private - -drop_expired_dedups( - {State, Result, SideEffects}, - #{system_time := Timestamp}) -> - Dedups = get_dedups(State), - %% This would look cleaner written with `maps:filter/2' but it turns out - %% that function is very inefficient. - %% TODO: explain why. - Dedups1 = maps:fold( - fun(CommandRef, {_Reply, Expiry}, Acc) -> - case Expiry >= Timestamp of - true -> - maps:remove(CommandRef, Acc); - false -> - Acc - end - end, Dedups, Dedups), - State1 = set_dedups(State, Dedups1), - {State1, Result, SideEffects}. - %% @private state_enter(leader, State) -> diff --git a/src/khepri_machine.hrl b/src/khepri_machine.hrl index 24765001..b32971f1 100644 --- a/src/khepri_machine.hrl +++ b/src/khepri_machine.hrl @@ -64,6 +64,8 @@ -record(dedup_ack, {ref :: reference()}). +-record(expire_dedups, {}). + %% Old commands, kept for backward-compatibility. -record(unregister_projection, {name :: khepri_projection:name()}). diff --git a/src/khepri_utils.erl b/src/khepri_utils.erl index 659f19d2..969d3932 100644 --- a/src/khepri_utils.erl +++ b/src/khepri_utils.erl @@ -20,6 +20,7 @@ -export([start_timeout_window/1, end_timeout_window/2, sleep/2, + maps_any/2, is_ra_server_alive/1, node_props_to_payload/2, @@ -85,6 +86,25 @@ sleep(Time, Timeout) when Time > Timeout -> timer:sleep(Timeout), 0. +-spec maps_any(Predicate, Map) -> boolean() when + Predicate :: fun((Key, Value) -> boolean()), + Map :: #{Key => Value}, + Key :: term(), + Value :: term(). + +maps_any(Fun, Map) when is_map(Map) -> + do_maps_any(Fun, maps:next(maps:iterator(Map))). + +do_maps_any(Fun, {Key, Value, Iterator}) -> + case Fun(Key, Value) of + true -> + true; + false -> + do_maps_any(Fun, maps:next(Iterator)) + end; +do_maps_any(_Fun, none) -> + false. + -spec is_ra_server_alive(RaServer) -> IsAlive when RaServer :: ra:server_id(), IsAlive :: boolean().