Skip to content

Commit

Permalink
Replace keep-while cond reverse index with a prefix tree
Browse files Browse the repository at this point in the history
When deleting tree nodes in the `khepri_tree` we lookup in the reverse
index to find any conditions that are associated to paths which are
prefixes of the deleted path.

Prior to this commit we folded over the keep-while conditions reverse
index - a map - and used `lists:prefix/2` to find prefixing paths. In a
store with many nodes and tracking many keep-while conditions this can
become very expensive while deleting many nodes at once.

The parent commit introduced a prefix tree type which allows quick
lookup, given a path, of any tree nodes associated with a path which is
a prefix of the given path. We set a version 2 for khepri_machine which
upgrades the reverse index use to this new type.

Because the reverse index is private to the khepri_tree type I have
avoided introducing versioning for the `khepri_tree` module. Instead
the reverse index can either be a map - as in prior versions - or a
prefix tree. We act on the reverse index using the appropriate functions
for the type, which we detect at runtime.
  • Loading branch information
the-mikedavis committed Sep 30, 2024
1 parent 13d349b commit 1e8b3b9
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 13 deletions.
33 changes: 26 additions & 7 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@
%% </ul>
%% </td>
%% </tr>
%% <tr>
%% <td style="text-align: right; vertical-align: top;">2</td>
%% <td>
%% <ul>
%% <li>Changed the data structure for the reverse index used to track
%% keep-while conditions to be a prefix tree (see {@link khepri_prefix_tree}).
%% </li>
%% </ul>
%% </td>
%% </tr>
%% </table>

-module(khepri_machine).
Expand Down Expand Up @@ -165,10 +175,13 @@
%% Added in machine version 1.
dedups = #{} :: khepri_machine:dedups_map()}).

-opaque state_v1() :: #khepri_machine{}.
-type state_v1() :: #khepri_machine{tree :: khepri_tree:tree_v0()}.
%% State of this Ra state machine, version 1.

-type state() :: state_v1() | khepri_machine_v0:state().
-type state_v2() :: #khepri_machine{tree :: khepri_tree:tree_v1()}.
%% State of this Ra state machine, version 2.

-type state() :: state_v2() | state_v1() | khepri_machine_v0:state().
%% State of this Ra state machine.

-type triggers_map() :: #{khepri:trigger_id() =>
Expand Down Expand Up @@ -212,6 +225,7 @@
machine_init_args/0,
state/0,
state_v1/0,
state_v2/0,
machine_config/0,
triggers_map/0,
metrics/0,
Expand Down Expand Up @@ -1635,17 +1649,18 @@ overview(State) ->
keep_while_conds => KeepWhileConds}.

-spec version() -> MacVer when
MacVer :: 1.
MacVer :: 2.
%% @doc Returns the state machine version.

version() ->
1.
2.

-spec which_module(MacVer) -> Module when
MacVer :: 1 | 0,
MacVer :: 0..2,
Module :: ?MODULE.
%% @doc Returns the state machine module corresponding to the given version.

which_module(2) -> ?MODULE;
which_module(1) -> ?MODULE;
which_module(0) -> ?MODULE.

Expand Down Expand Up @@ -2313,7 +2328,7 @@ make_virgin_state(Params) ->
-endif.

-spec convert_state(OldState, OldMacVer, NewMacVer) -> NewState when
OldState :: khepri_machine_v0:state(),
OldState :: khepri_machine:state(),
OldMacVer :: ra_machine:version(),
NewMacVer :: ra_machine:version(),
NewState :: khepri_machine:state().
Expand All @@ -2339,7 +2354,11 @@ convert_state1(State, 0, 1) ->
Fields1 = Fields0 ++ [#{}],
State1 = list_to_tuple(Fields1),
?assert(is_state(State1)),
State1.
State1;
convert_state1(State, 1, 2) ->
Tree = get_tree(State),
Tree1 = khepri_tree:convert_tree(Tree, 1, 2),
set_tree(State, Tree1).

-spec update_projections(OldState, NewState) -> ok when
OldState :: khepri_machine:state(),
Expand Down
119 changes: 115 additions & 4 deletions src/khepri_tree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,32 @@
delete_matching_nodes/4,
insert_or_update_node/5,
does_path_match/3,
walk_down_the_tree/5]).
walk_down_the_tree/5,

convert_tree/3]).

-type tree_node() :: #node{}.
%% A node in the tree structure.

-type tree() :: #tree{}.
-type tree_v0() :: #tree{keep_while_conds_revidx ::
khepri_tree:keep_while_conds_revidx_v0()}.
-type tree_v1() :: #tree{keep_while_conds_revidx ::
khepri_tree:keep_while_conds_revidx_v1()}.

-type tree() :: tree_v0() | tree_v1().

-type keep_while_conds_map() :: #{khepri_path:native_path() =>
khepri_condition:native_keep_while()}.
%% Per-node `keep_while' conditions.

-type keep_while_conds_revidx() :: #{khepri_path:native_path() =>
#{khepri_path:native_path() => ok}}.
-type keep_while_conds_revidx_v0() :: #{khepri_path:native_path() =>
#{khepri_path:native_path() => ok}}.

-type keep_while_conds_revidx_v1() :: khepri_prefix_tree:tree(
#{khepri_path:native_path() => ok}).

-opaque keep_while_conds_revidx() :: keep_while_conds_revidx_v0() |
keep_while_conds_revidx_v1().
%% Internal reverse index of the keep_while conditions. If node A depends on a
%% condition on node B, then this reverse index will have a "node B => node A"
%% entry.
Expand All @@ -67,8 +80,12 @@
-type ok(Type1, Type2, Type3) :: {ok, Type1, Type2, Type3}.

-export_type([tree_node/0,
tree_v0/0,
tree_v1/0,
tree/0,
keep_while_conds_map/0,
keep_while_conds_revidx_v0/0,
keep_while_conds_revidx_v1/0,
keep_while_conds_revidx/0,
applied_changes/0]).

Expand Down Expand Up @@ -314,6 +331,19 @@ update_keep_while_conds(Tree, Watcher, KeepWhile) ->
KeepWhile :: khepri_condition:native_keep_while().

update_keep_while_conds_revidx(
#tree{keep_while_conds_revidx = KeepWhileCondsRevIdx} = Tree,
Watcher, KeepWhile) ->
case is_v1_keep_while_conds_revidx(KeepWhileCondsRevIdx) of
true ->
update_keep_while_conds_revidx_v1(Tree, Watcher, KeepWhile);
false ->
update_keep_while_conds_revidx_v0(Tree, Watcher, KeepWhile)
end.

is_v1_keep_while_conds_revidx(KeepWhileCondsRevIdx) ->
khepri_prefix_tree:is_prefix_tree(KeepWhileCondsRevIdx).

update_keep_while_conds_revidx_v0(
#tree{keep_while_conds = KeepWhileConds,
keep_while_conds_revidx = KeepWhileCondsRevIdx} = Tree,
Watcher, KeepWhile) ->
Expand All @@ -338,6 +368,37 @@ update_keep_while_conds_revidx(
end, KeepWhileCondsRevIdx1, KeepWhile),
Tree#tree{keep_while_conds_revidx = KeepWhileCondsRevIdx2}.

update_keep_while_conds_revidx_v1(
#tree{keep_while_conds = KeepWhileConds,
keep_while_conds_revidx = KeepWhileCondsRevIdx} = Tree,
Watcher, KeepWhile) ->
%% First, clean up reversed index where a watched path isn't watched
%% anymore in the new keep_while.
OldWatcheds = maps:get(Watcher, KeepWhileConds, #{}),
KeepWhileCondsRevIdx1 = maps:fold(
fun(Watched, _, KWRevIdx) ->
khepri_prefix_tree:update(
fun(Watchers) ->
Watchers1 = maps:remove(
Watcher, Watchers),
case maps:size(Watchers1) of
0 -> ?NO_PAYLOAD;
_ -> Watchers1
end
end, Watched, KWRevIdx)
end, KeepWhileCondsRevIdx, OldWatcheds),
%% Then, record the watched paths.
KeepWhileCondsRevIdx2 = maps:fold(
fun(Watched, _, KWRevIdx) ->
khepri_prefix_tree:update(
fun (?NO_PAYLOAD) ->
#{Watcher => ok};
(Watchers) ->
Watchers#{Watcher => ok}
end, Watched, KWRevIdx)
end, KeepWhileCondsRevIdx1, KeepWhile),
Tree#tree{keep_while_conds_revidx = KeepWhileCondsRevIdx2}.

%% -------------------------------------------------------------------
%% Find matching nodes.
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -1294,6 +1355,16 @@ eval_keep_while_conditions(
%%
%% Those modified in AppliedChanges must be evaluated again to decide
%% if they should be removed.
case is_v1_keep_while_conds_revidx(KeepWhileCondsRevIdx) of
true ->
eval_keep_while_conditions_v1(Tree, AppliedChanges);
false ->
eval_keep_while_conditions_v0(Tree, AppliedChanges)
end.

eval_keep_while_conditions_v0(
#tree{keep_while_conds_revidx = KeepWhileCondsRevIdx} = Tree,
AppliedChanges) ->
maps:fold(
fun
(RemovedPath, delete, ToDelete) ->
Expand All @@ -1317,6 +1388,29 @@ eval_keep_while_conditions(
end
end, #{}, AppliedChanges).

eval_keep_while_conditions_v1(
#tree{keep_while_conds_revidx = KeepWhileCondsRevIdx} = Tree,
AppliedChanges) ->
maps:fold(
fun
(RemovedPath, delete, ToDelete) ->
khepri_prefix_tree:fold_prefixes_of(
fun(Watchers, ToDelete1) ->
eval_keep_while_conditions_after_removal(
Tree, Watchers, ToDelete1)
end, ToDelete, RemovedPath, KeepWhileCondsRevIdx);
(UpdatedPath, NodeProps, ToDelete) ->
Result = khepri_prefix_tree:find_path(
UpdatedPath, KeepWhileCondsRevIdx),
case Result of
{ok, Watchers} ->
eval_keep_while_conditions_after_update(
Tree, UpdatedPath, NodeProps, Watchers, ToDelete);
error ->
ToDelete
end
end, #{}, AppliedChanges).

eval_keep_while_conditions_after_update(
#tree{keep_while_conds = KeepWhileConds} = Tree,
UpdatedPath, NodeProps, Watchers, ToDelete) ->
Expand Down Expand Up @@ -1399,3 +1493,20 @@ remove_expired_nodes(
applied_changes = AppliedChanges2},
remove_expired_nodes(Rest, Walk1)
end.

%% -------------------------------------------------------------------
%% Conversion between tree versions.
%% -------------------------------------------------------------------

convert_tree(Tree, MacVer, MacVer) ->
Tree;
convert_tree(Tree, 0, 1) ->
Tree;
convert_tree(Tree, 1, 2) ->
%% In version 2 the reverse index for keep while conditions was converted
%% into a prefix tree. See the `keep_while_conds_revidx_v0()' and
%% `keep_while_conds_revidx_v1()` types.
#tree{keep_while_conds_revidx = KeepWhileCondsRevIdxV0} = Tree,
KeepWhileCondsRevIdxV1 = khepri_prefix_tree:from_map(
KeepWhileCondsRevIdxV0),
Tree#tree{keep_while_conds_revidx = KeepWhileCondsRevIdxV1}.
3 changes: 1 addition & 2 deletions src/khepri_tree.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,4 @@

-record(tree, {root = #node{} :: khepri_tree:tree_node(),
keep_while_conds = #{} :: khepri_tree:keep_while_conds_map(),
keep_while_conds_revidx = #{} ::
khepri_tree:keep_while_conds_revidx()}).
keep_while_conds_revidx = #{}}).

0 comments on commit 1e8b3b9

Please sign in to comment.