Skip to content

Commit

Permalink
Replace keep-while cond reverse index with a prefix tree
Browse files Browse the repository at this point in the history
When deleting tree nodes in the `khepri_tree` we lookup in the reverse
index to find any conditions that are associated to paths which are
prefixes of the deleted path.

Prior to this commit we folded over the keep-while conditions reverse
index - a map - and used `lists:prefix/2` to find prefixing paths. In a
store with many nodes and tracking many keep-while conditions this can
become very expensive while deleting many nodes at once.

The parent commit introduced a prefix tree type which allows quick
lookup, given a path, of any tree nodes associated with a path which is
a prefix of the given path. We set a version 2 for khepri_machine which
upgrades the reverse index use to this new type.

Because the reverse index is private to the khepri_tree type I have
avoided introducing versioning for the `khepri_tree` module. Instead
the reverse index can either be a map - as in prior versions - or a
prefix tree. We act on the reverse index using the appropriate functions
for the type, which we detect at runtime.
  • Loading branch information
the-mikedavis committed Sep 24, 2024
1 parent 49237f5 commit 19e5afe
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 10 deletions.
35 changes: 28 additions & 7 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@
%% </ul>
%% </td>
%% </tr>
%% <tr>
%% <td style="text-align: right; vertical-align: top;">2</td>
%% <td>
%% <ul>
%% <li>Changed the data structure for the reverse index used to track
%% keep-while conditions to be a prefix tree (see {@link khepri_prefix_tree}).
%% </li>
%% </ul>
%% </td>
%% </tr>
%% </table>

-module(khepri_machine).
Expand Down Expand Up @@ -168,7 +178,12 @@
-opaque state_v1() :: #khepri_machine{}.
%% State of this Ra state machine, version 1.

-type state() :: state_v1() | khepri_machine_v0:state().
-opaque state_v2() :: #khepri_machine{}.
%% State of this Ra state machine, version 2.

-type state() :: state_v2() |
state_v1() |
khepri_machine_v0:state().
%% State of this Ra state machine.

-type triggers_map() :: #{khepri:trigger_id() =>
Expand Down Expand Up @@ -212,6 +227,7 @@
machine_init_args/0,
state/0,
state_v1/0,
state_v2/0,
machine_config/0,
triggers_map/0,
metrics/0,
Expand Down Expand Up @@ -1635,17 +1651,18 @@ overview(State) ->
keep_while_conds => KeepWhileConds}.

-spec version() -> MacVer when
MacVer :: 1.
MacVer :: 2.
%% @doc Returns the state machine version.

version() ->
1.
2.

-spec which_module(MacVer) -> Module when
MacVer :: 1 | 0,
MacVer :: 2 | 1 | 0,
Module :: ?MODULE.
%% @doc Returns the state machine module corresponding to the given version.

which_module(2) -> ?MODULE;
which_module(1) -> ?MODULE;
which_module(0) -> ?MODULE.

Expand Down Expand Up @@ -2313,7 +2330,7 @@ make_virgin_state(Params) ->
-endif.

-spec convert_state(OldState, OldMacVer, NewMacVer) -> NewState when
OldState :: khepri_machine_v0:state(),
OldState :: khepri_machine:state(),
OldMacVer :: ra_machine:version(),
NewMacVer :: ra_machine:version(),
NewState :: khepri_machine:state().
Expand All @@ -2327,7 +2344,7 @@ convert_state(State, OldMacVer, NewMacVer) ->
OldMacVer1 = N,
NewMacVer1 = erlang:min(N + 1, NewMacVer),
convert_state1(State1, OldMacVer1, NewMacVer1)
end, State, lists:seq(OldMacVer, NewMacVer).
end, State, lists:seq(OldMacVer, NewMacVer)).

convert_state1(State, MacVer, MacVer) ->
State;
Expand All @@ -2339,7 +2356,11 @@ convert_state1(State, 0, 1) ->
Fields1 = Fields0 ++ [#{}],
State1 = list_to_tuple(Fields1),
?assert(is_state(State1)),
State1.
State1;
convert_state1(State, 1, 2) ->
Tree = get_tree(State),
Tree1 = khepri_tree:convert_tree(Tree, 1, 2),
set_tree(State, Tree1).

-spec update_projections(OldState, NewState) -> ok when
OldState :: khepri_machine:state(),
Expand Down
106 changes: 103 additions & 3 deletions src/khepri_tree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
delete_matching_nodes/4,
insert_or_update_node/5,
does_path_match/3,
walk_down_the_tree/5]).
walk_down_the_tree/5,

convert_tree/3]).

-type tree_node() :: #node{}.
%% A node in the tree structure.
Expand All @@ -40,8 +42,14 @@
khepri_condition:native_keep_while()}.
%% Per-node `keep_while' conditions.

-type keep_while_conds_revidx() :: #{khepri_path:native_path() =>
#{khepri_path:native_path() => ok}}.
-type keep_while_conds_revidx_v0() :: #{khepri_path:native_path() =>
#{khepri_path:native_path() => ok}}.

-type keep_while_conds_revidx_v1() :: khepri_prefix_tree:tree(
#{khepri_path:native_path() => ok}).

-type keep_while_conds_revidx() :: keep_while_conds_revidx_v0() |
keep_while_conds_revidx_v1().
%% Internal reverse index of the keep_while conditions. If node A depends on a
%% condition on node B, then this reverse index will have a "node B => node A"
%% entry.
Expand Down Expand Up @@ -306,6 +314,19 @@ update_keep_while_conds(Tree, Watcher, KeepWhile) ->
KeepWhile :: khepri_condition:native_keep_while().

update_keep_while_conds_revidx(
#tree{keep_while_conds_revidx = KeepWhileCondsRevIdx} = Tree,
Watcher, KeepWhile) ->
case is_v1_keep_while_conds_revidx(KeepWhileCondsRevIdx) of
true ->
update_keep_while_conds_revidx_v1(Tree, Watcher, KeepWhile);
false ->
update_keep_while_conds_revidx_v0(Tree, Watcher, KeepWhile)
end.

is_v1_keep_while_conds_revidx(KeepWhileCondsRevIdx) ->
khepri_prefix_tree:is_prefix_tree(KeepWhileCondsRevIdx).

update_keep_while_conds_revidx_v0(
#tree{keep_while_conds = KeepWhileConds,
keep_while_conds_revidx = KeepWhileCondsRevIdx} = Tree,
Watcher, KeepWhile) ->
Expand All @@ -330,6 +351,37 @@ update_keep_while_conds_revidx(
end, KeepWhileCondsRevIdx1, KeepWhile),
Tree#tree{keep_while_conds_revidx = KeepWhileCondsRevIdx2}.

update_keep_while_conds_revidx_v1(
#tree{keep_while_conds = KeepWhileConds,
keep_while_conds_revidx = KeepWhileCondsRevIdx} = Tree,
Watcher, KeepWhile) ->
%% First, clean up reversed index where a watched path isn't watched
%% anymore in the new keep_while.
OldWatcheds = maps:get(Watcher, KeepWhileConds, #{}),
KeepWhileCondsRevIdx1 = maps:fold(
fun(Watched, _, KWRevIdx) ->
khepri_prefix_tree:update(
fun(Watchers) ->
Watchers1 = maps:remove(
Watcher, Watchers),
case maps:size(Watchers1) of
0 -> ?NO_PAYLOAD;
_ -> Watchers1
end
end, Watched, KWRevIdx)
end, KeepWhileCondsRevIdx, OldWatcheds),
%% Then, record the watched paths.
KeepWhileCondsRevIdx2 = maps:fold(
fun(Watched, _, KWRevIdx) ->
khepri_prefix_tree:update(
fun (?NO_PAYLOAD) ->
#{Watcher => ok};
(Watchers) ->
Watchers#{Watcher => ok}
end, Watched, KWRevIdx)
end, KeepWhileCondsRevIdx1, KeepWhile),
Tree#tree{keep_while_conds_revidx = KeepWhileCondsRevIdx2}.

%% -------------------------------------------------------------------
%% Find matching nodes.
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -1286,6 +1338,16 @@ eval_keep_while_conditions(
%%
%% Those modified in AppliedChanges must be evaluated again to decide
%% if they should be removed.
case is_v1_keep_while_conds_revidx(KeepWhileCondsRevIdx) of
true ->
eval_keep_while_conditions_v1(Tree, AppliedChanges);
false ->
eval_keep_while_conditions_v0(Tree, AppliedChanges)
end.

eval_keep_while_conditions_v0(
#tree{keep_while_conds_revidx = KeepWhileCondsRevIdx} = Tree,
AppliedChanges) ->
maps:fold(
fun
(RemovedPath, delete, ToDelete) ->
Expand All @@ -1309,6 +1371,27 @@ eval_keep_while_conditions(
end
end, #{}, AppliedChanges).

eval_keep_while_conditions_v1(
#tree{keep_while_conds_revidx = KeepWhileCondsRevIdx} = Tree,
AppliedChanges) ->
maps:fold(
fun
(RemovedPath, delete, ToDelete) ->
khepri_prefix_tree:fold_prefixes_of(
fun(Watchers, ToDelete1) ->
eval_keep_while_conditions_after_removal(
Tree, Watchers, ToDelete1)
end, ToDelete, RemovedPath, KeepWhileCondsRevIdx);
(UpdatedPath, NodeProps, ToDelete) ->
case khepri_prefix_tree:find_path(UpdatedPath, KeepWhileCondsRevIdx) of
{ok, Watchers} ->
eval_keep_while_conditions_after_update(
Tree, UpdatedPath, NodeProps, Watchers, ToDelete);
error ->
ToDelete
end
end, #{}, AppliedChanges).

eval_keep_while_conditions_after_update(
#tree{keep_while_conds = KeepWhileConds} = Tree,
UpdatedPath, NodeProps, Watchers, ToDelete) ->
Expand Down Expand Up @@ -1391,3 +1474,20 @@ remove_expired_nodes(
applied_changes = AppliedChanges2},
remove_expired_nodes(Rest, Walk1)
end.

%% -------------------------------------------------------------------
%% Conversion between tree versions.
%% -------------------------------------------------------------------

convert_tree(Tree, MacVer, MacVer) ->
Tree;
convert_tree(Tree, 0, 1) ->
Tree;
convert_tree(Tree, 1, 2) ->
%% In version 2 the reverse index for keep while conditions was converted
%% into a prefix tree. See the `keep_while_conds_revidx_v0()' and
%% `keep_while_conds_revidx_v1()` types.
#tree{keep_while_conds_revidx = KeepWhileCondsRevIdxV0} = Tree,
KeepWhileCondsRevIdxV1 = khepri_prefix_tree:from_map(
KeepWhileCondsRevIdxV0),
Tree#tree{keep_while_conds_revidx = KeepWhileCondsRevIdxV1}.

0 comments on commit 19e5afe

Please sign in to comment.