Skip to content

Commit

Permalink
khepri_tree: Add an option to accumulate keep-while expirations
Browse files Browse the repository at this point in the history
This change adds a type `khepri:delete_options()` similar to
`khepri:put_options()` which includes a new option
`return_keep_while_expirations`. Setting this option in a call to
`khepri_adv:delete_many/3` or `khepri_tx_adv:delete_many/2` will include
any tree nodes deleted because of expired keep-while conditions in the
`khepri_adv:many_results()` return value.

In order to accumulate these deletions we need to modify
`khepri_machine:remove_expired_nodes/2` slightly to pass the fold
function and accumulator for the current `#walk{..}`.

This option is useful for callers if they need to do something with any
tree nodes which were deleted by keep-while conditions. For example in
RabbitMQ we currently delete all bindings which are associated with a
queue in a transaction. Bindings should always be removed when their
associated queue is removed so they seem like an ideal case to use a
keep-while condition. We send notifications and invoke callbacks with
the set of bindings which were deleted though, so we need to return
these deleted records. With this change we can avoid performing binding
deletion in a transaction and instead only delete the queue record while
providing the new option. This is significantly more efficient because
of the way the Khepri store is organized in RabbitMQ.
  • Loading branch information
the-mikedavis committed Oct 7, 2024
1 parent a69d642 commit 0d3b3f5
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 22 deletions.
19 changes: 17 additions & 2 deletions src/khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,16 @@
%% created/updated tree node.</li>
%% </ul>

-type delete_options() :: #{return_keep_while_expirations => boolean()}.
%% Options specific to deletions.
%%
%% <ul>
%% <li>`return_keep_while_expirations' allows accumulating all Khepri tree
%% nodes deleted because of expired keep-while conditions in the return of
%% {@link khepri_adv:delete_many/3} and {@link khepri_tx_adv:delete_many/2}.
%% </li>
%% </ul>

-type fold_fun() :: fun((khepri_path:native_path(),
khepri:node_props(),
khepri:fold_acc()) -> khepri:fold_acc()).
Expand Down Expand Up @@ -472,6 +482,7 @@
query_options/0,
tree_options/0,
put_options/0,
delete_options/0,

fold_fun/0,
fold_acc/0,
Expand Down Expand Up @@ -2575,7 +2586,9 @@ delete_many(PathPattern) ->
Ret :: khepri:minimal_ret();
(PathPattern, Options) -> Ret when
PathPattern :: khepri_path:pattern(),
Options :: khepri:command_options() | khepri:tree_options(),
Options :: khepri:command_options() |
khepri:tree_options() |
khepri:delete_options(),
Ret :: khepri:minimal_ret().

%% @doc Deletes all tree nodes matching the given path pattern.
Expand All @@ -2600,7 +2613,9 @@ delete_many(PathPattern, Options) when is_map(Options) ->
-spec delete_many(StoreId, PathPattern, Options) -> Ret when
StoreId :: khepri:store_id(),
PathPattern :: khepri_path:pattern(),
Options :: khepri:command_options() | khepri:tree_options(),
Options :: khepri:command_options() |
khepri:tree_options() |
khepri:delete_options(),
Ret :: khepri:minimal_ret() | khepri_machine:async_ret().
%% @doc Deletes all tree nodes matching the given path pattern.
%%
Expand Down
11 changes: 9 additions & 2 deletions src/khepri_adv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,9 @@ delete_many(PathPattern) ->
Ret :: khepri_adv:many_results();
(PathPattern, Options) -> Ret when
PathPattern :: khepri_path:pattern(),
Options :: khepri:command_options() | khepri:tree_options(),
Options :: khepri:command_options() |
khepri:tree_options() |
khepri:delete_options(),
Ret :: khepri_adv:many_results().

%% @doc Deletes all tree nodes matching the given path pattern.
Expand All @@ -942,7 +944,9 @@ delete_many(PathPattern, Options) when is_map(Options) ->
-spec delete_many(StoreId, PathPattern, Options) -> Ret when
StoreId :: khepri:store_id(),
PathPattern :: khepri_path:pattern(),
Options :: khepri:command_options() | khepri:tree_options(),
Options :: khepri:command_options() |
khepri:tree_options() |
khepri:delete_options(),
Ret :: khepri_adv:many_results() | khepri_machine:async_ret().
%% @doc Deletes all tree nodes matching the given path pattern.
%%
Expand All @@ -958,6 +962,9 @@ delete_many(PathPattern, Options) when is_map(Options) ->
%% When doing an asynchronous update, the {@link handle_async_ret/1}
%% function should be used to handle the message received from Ra.
%%
%% The `return_keep_while_expirations' option may be used with this command to
%% return all tree nodes which were removed by expired keep-while conditions.
%%
%% Example:
%% ```
%% %% Delete the tree node at `/:foo/:bar'.
Expand Down
33 changes: 19 additions & 14 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,9 @@ put(_StoreId, PathPattern, Payload, _Options) ->
-spec delete(StoreId, PathPattern, Options) -> Ret when
StoreId :: khepri:store_id(),
PathPattern :: khepri_path:pattern(),
Options :: khepri:command_options() | khepri:tree_options(),
Options :: khepri:command_options() |
khepri:tree_options() |
khepri:delete_options(),
Ret :: khepri_machine:common_ret() | khepri_machine:async_ret().
%% @doc Deletes all tree nodes matching the path pattern.
%%
Expand Down Expand Up @@ -746,37 +748,40 @@ split_query_options(Options) ->
end, {#{}, #{}}, Options1).

-spec split_command_options(Options) ->
{CommandOptions, TreeAndPutOptions} when
Options :: CommandOptions | TreeAndPutOptions,
{CommandOptions, OtherOptions} when
Options :: CommandOptions | OtherOptions,
CommandOptions :: khepri:command_options(),
TreeAndPutOptions :: khepri:tree_options() | khepri:put_options().
OtherOptions :: khepri:tree_options() |
khepri:put_options() |
khepri:delete_options().
%% @private

split_command_options(Options) ->
Options1 = set_default_options(Options),
maps:fold(
fun
(Option, Value, {C, TP}) when
(Option, Value, {C, O}) when
Option =:= reply_from orelse
Option =:= timeout orelse
Option =:= async ->
C1 = C#{Option => Value},
{C1, TP};
{C1, O};
(props_to_return, [], Acc) ->
Acc;
(Option, Value, {C, TP}) when
(Option, Value, {C, O}) when
Option =:= expect_specific_node orelse
Option =:= props_to_return orelse
Option =:= include_root_props ->
TP1 = TP#{Option => Value},
{C, TP1};
(keep_while, KeepWhile, {C, TP}) ->
%% `keep_while' is kept in `TreeAndPutOptions' here. The state
Option =:= include_root_props orelse
Option =:= return_keep_while_expirations ->
O1 = O#{Option => Value},
{C, O1};
(keep_while, KeepWhile, {C, O}) ->
%% `keep_while' is kept in `OtherOptions' here. The state
%% machine will extract it in `apply()'.
KeepWhile1 = khepri_condition:ensure_native_keep_while(
KeepWhile),
TP1 = TP#{keep_while => KeepWhile1},
{C, TP1}
O1 = O#{keep_while => KeepWhile1},
{C, O1}
end, {#{}, #{}}, Options1).

-spec split_put_options(TreeAndPutOptions) -> {TreeOptions, PutOptions} when
Expand Down
27 changes: 25 additions & 2 deletions src/khepri_tree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ does_path_match(
{tree :: #tree{},
node :: #node{} | delete,
path_pattern :: khepri_path:native_pattern(),
tree_options :: khepri:tree_options(),
tree_options :: khepri:tree_options() | khepri:delete_options(),
%% Used to remember the path of the node the walk is currently on.
reversed_path = [] :: khepri_path:native_path(),
%% Used to update parents up in the tree in a tail-recursive function.
Expand Down Expand Up @@ -768,7 +768,7 @@ walk_down_the_tree(Tree, PathPattern, TreeOptions, Fun, FunAcc) ->
AppliedChanges, Fun, FunAcc) -> Ret when
Tree :: tree(),
PathPattern :: khepri_path:native_pattern(),
TreeOptions :: khepri:tree_options(),
TreeOptions :: khepri:tree_options() | khepri:delete_options(),
AppliedChanges :: applied_changes(),
Fun :: walk_down_the_tree_fun(),
FunAcc :: any(),
Expand Down Expand Up @@ -1517,6 +1517,29 @@ is_parent_being_removed1([], _) ->

remove_expired_nodes([], Walk) ->
{ok, Walk};
remove_expired_nodes(
[PathToDelete | Rest],
#walk{tree = Tree,
applied_changes = AppliedChanges,
'fun' = Fun,
fun_acc = Acc,
tree_options =
#{return_keep_while_expirations := true} = TreeOptions} = Walk) ->
%% Note that this is essentially the same as the function clause below
%% where we call `delete_matching_nodes/4'. The only difference is that
%% we pass down the same tree options, walk function and accumulator.
Result = walk_down_the_tree(
Tree, PathToDelete, TreeOptions, AppliedChanges, Fun, Acc),
case Result of
{ok, Tree1, AppliedChanges1, Acc1} ->
AppliedChanges2 = merge_applied_changes(
AppliedChanges, AppliedChanges1),
Walk1 = Walk#walk{tree = Tree1,
node = Tree1#tree.root,
applied_changes = AppliedChanges2,
fun_acc = Acc1},
remove_expired_nodes(Rest, Walk1)
end;
remove_expired_nodes(
[PathToDelete | Rest],
#walk{tree = Tree, applied_changes = AppliedChanges} = Walk) ->
Expand Down
2 changes: 1 addition & 1 deletion src/khepri_tx.erl
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ delete_many(PathPattern) ->

-spec delete_many(PathPattern, Options) -> Ret when
PathPattern :: khepri_path:pattern(),
Options :: khepri:tree_options(),
Options :: khepri:tree_options() | khepri:delete_options(),
Ret :: khepri:minimal_ret().
%% @doc Deletes all tree nodes matching the given path pattern.
%%
Expand Down
2 changes: 1 addition & 1 deletion src/khepri_tx_adv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ delete_many(PathPattern) ->

-spec delete_many(PathPattern, Options) -> Ret when
PathPattern :: khepri_path:pattern(),
Options :: khepri:tree_options(),
Options :: khepri:tree_options() | khepri:delete_options(),
Ret :: khepri_adv:many_results().
%% @doc Deletes all tree nodes matching the given path pattern.
%%
Expand Down
40 changes: 40 additions & 0 deletions test/advanced_delete.erl
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,46 @@ delete_many_on_existing_node_with_condition_false_test_() ->
payload_version => 1}},
khepri_adv:get(?FUNCTION_NAME, [foo]))]}.

delete_many_return_keep_while_expirations_test_() ->
{setup,
fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end,
fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
[?_assertEqual(
ok,
khepri:create(?FUNCTION_NAME, [a, b, c], val1)),
?_assertEqual(
ok,
khepri:create(
?FUNCTION_NAME, [d, e], val2,
#{keep_while => #{[a, b, c] => #if_node_exists{exists = true}}})),
?_assertEqual(
ok,
khepri:create(
?FUNCTION_NAME, [f, g], val3,
#{keep_while => #{[d, e] => #if_node_exists{exists = true}}})),
?_assertMatch(
{ok, #{[a, b, c] := #{data := val1},
[d, e] := #{data := val2},
[f, g] := #{data := val3}}},
khepri_adv:delete_many(
?FUNCTION_NAME, [a, b, c],
#{return_keep_while_expirations => true})),
?_assertEqual(
{error, ?khepri_error(node_not_found, #{node_name => a,
node_path => [a],
node_is_target => false})},
khepri_adv:get(?FUNCTION_NAME, [a, b, c])),
?_assertEqual(
{error, ?khepri_error(node_not_found, #{node_name => d,
node_path => [d],
node_is_target => false})},
khepri_adv:get(?FUNCTION_NAME, [d, e])),
?_assertEqual(
{error, ?khepri_error(node_not_found, #{node_name => f,
node_path => [f],
node_is_target => false})},
khepri_adv:get(?FUNCTION_NAME, [f, g]))]}.

clear_payload_from_non_existing_node_test_() ->
{setup,
fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end,
Expand Down

0 comments on commit 0d3b3f5

Please sign in to comment.