Skip to content

Commit

Permalink
khepri_machine: Request delete_reason if effective machine version …
Browse files Browse the repository at this point in the history
…>= 2 only

[Why]
The `delete_reason` property to return was introduces with machine
version 2. Older versions have two issues:
* They don't know this property
* They crash on unknown properties

[How]
We request this new property only if the effective machine version is
greater or equal to 2.

Note that we have to find the store ID from a PID when setting default
options for calls in transactions because the Khepri state machine
doesn't have it in its state... Something to improve when we have to
update the machine state record.

There is a separate patch to ignore unknown properties in the future.
  • Loading branch information
dumbbell committed Dec 12, 2024
1 parent 6f88694 commit dafe6a2
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 20 deletions.
71 changes: 56 additions & 15 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@
%% For internal use only.
-export([clear_cache/1,
ack_triggers_execution/2,
split_query_options/1,
split_command_options/1,
split_query_options/2,
split_command_options/2,
split_put_options/1,
insert_or_update_node/6,
delete_matching_nodes/4,
Expand Down Expand Up @@ -283,7 +283,7 @@ fold(StoreId, PathPattern, Fun, Acc, Options)
is_function(Fun, 3) ->
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
{QueryOptions, TreeOptions} = split_query_options(Options),
{QueryOptions, TreeOptions} = split_query_options(StoreId, Options),
Query = fun(State) ->
Tree = get_tree(State),
try
Expand Down Expand Up @@ -348,7 +348,8 @@ put(StoreId, PathPattern, Payload, Options)
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
Payload1 = khepri_payload:prepare(Payload),
{CommandOptions, TreeAndPutOptions} = split_command_options(Options),
{CommandOptions, TreeAndPutOptions} = split_command_options(
StoreId, Options),
Command = #put{path = PathPattern1,
payload = Payload1,
options = TreeAndPutOptions},
Expand Down Expand Up @@ -376,7 +377,7 @@ put(_StoreId, PathPattern, Payload, _Options) ->
delete(StoreId, PathPattern, Options) when ?IS_KHEPRI_STORE_ID(StoreId) ->
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
{CommandOptions, TreeOptions} = split_command_options(Options),
{CommandOptions, TreeOptions} = split_command_options(StoreId, Options),
%% TODO: Ensure `PutOptions' are not set this map.
Command = #delete{path = PathPattern1,
options = TreeOptions},
Expand Down Expand Up @@ -722,14 +723,17 @@ get_projections_state(StoreId, Options)
end,
process_query(StoreId, Query, Options).

-spec split_query_options(Options) -> {QueryOptions, TreeOptions} when
-spec split_query_options(StoreId | Pid, Options) ->
{QueryOptions, TreeOptions} when
StoreId :: khepri:store_id(),
Pid :: pid(),
Options :: QueryOptions | TreeOptions,
QueryOptions :: khepri:query_options(),
TreeOptions :: khepri:tree_options().
%% @private

split_query_options(Options) ->
Options1 = set_default_options(Options),
split_query_options(StoreId, Options) ->
Options1 = set_default_options(StoreId, Options),
maps:fold(
fun
(Option, Value, {Q, T}) when
Expand All @@ -748,15 +752,17 @@ split_query_options(Options) ->
{Q, T1}
end, {#{}, #{}}, Options1).

-spec split_command_options(Options) ->
-spec split_command_options(StoreId | Pid, Options) ->
{CommandOptions, TreeAndPutOptions} when
StoreId :: khepri:store_id(),
Pid :: pid(),
Options :: CommandOptions | TreeAndPutOptions,
CommandOptions :: khepri:command_options(),
TreeAndPutOptions :: khepri:tree_options() | khepri:put_options().
%% @private

split_command_options(Options) ->
Options1 = set_default_options(Options),
split_command_options(StoreId, Options) ->
Options1 = set_default_options(StoreId, Options),
maps:fold(
fun
(Option, Value, {C, TP}) when
Expand Down Expand Up @@ -799,7 +805,7 @@ split_put_options(TreeAndPutOptions) ->
{T1, P}
end, {#{}, #{}}, TreeAndPutOptions).

set_default_options(Options) ->
set_default_options(StoreId, Options) ->
%% By default, return payload-related properties. The caller can set
%% `props_to_return' to an empty map to get a minimal return value.
Options1 = case Options of
Expand All @@ -808,7 +814,21 @@ set_default_options(Options) ->
_ ->
Options#{props_to_return => ?DEFAULT_PROPS_TO_RETURN}
end,
Options1.
%% We need to remove `delete_reason' from the list if the whole cluster is
%% still using a machine version that doesn't know about it. Otherwise old
%% versions of Khepri will crash when gathering the properties.
PropsToReturn0 = maps:get(props_to_return, Options1),
PropsToReturn1 = case effective_version(StoreId) of
{ok, EffectiveMacVer} when EffectiveMacVer >= 2 ->
PropsToReturn0;
_ ->
%% `delete_reason' was added in machine version
%% 2. Also, previous versions didn't ignore
%% unknown props_to_return and crashed.
PropsToReturn0 -- [delete_reason]
end,
Options2 = Options1#{props_to_return => PropsToReturn1},
Options2.

-spec process_command(StoreId, Command, Options) -> Ret when
StoreId :: khepri:store_id(),
Expand Down Expand Up @@ -1736,13 +1756,14 @@ which_module(2) -> ?MODULE;
which_module(1) -> ?MODULE;
which_module(0) -> ?MODULE.

-spec effective_version(StoreId) -> Ret when
-spec effective_version(StoreId | Pid) -> Ret when
StoreId :: khepri:store_id(),
Pid :: pid(),
Ret :: khepri:ok(EffectiveMacVer) | khepri:error(),
EffectiveMacVer :: ra_machine:version().
%% @doc Returns the effective state machine version of the local Ra server.

effective_version(StoreId) ->
effective_version(StoreId) when ?IS_KHEPRI_STORE_ID(StoreId) ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
case ra_counters:counters(RaServer, [effective_machine_version]) of
Expand All @@ -1760,6 +1781,26 @@ effective_version(StoreId) ->
error => Error}),
{error, Reason}
end
end;
effective_version(Pid) when is_pid(Pid) ->
%% FIXME: When called from the machine/Ra server, we don't have the
%% `StoreId' because we didn't keep it in state even though it is available
%% in the init arguments... This will need a change to the state record
%% unfortunalety.
%%
%% Meanwhile, we can query the registered name for the calling process, it
%% matches the `StoreId'. This is an internal Ra implementation detail, so
%% this should be fixed the next time we have a reason to update the state
%% record.
case erlang:process_info(Pid, registered_name) of
{registered_name, StoreId} when is_atom(StoreId) ->
effective_version(StoreId);
[] ->
Reason = ?khepri_error(
effective_machine_version_not_defined,
#{ra_server_pid => Pid,
error => no_registered_name_for_pid}),
{error, Reason}
end.

%% -------------------------------------------------------------------
Expand Down
6 changes: 4 additions & 2 deletions src/khepri_tx.erl
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ count(PathPattern, Options) ->
{State, _SideEffects} = khepri_tx_adv:get_tx_state(),
Tree = khepri_machine:get_tree(State),
Fun = fun khepri_tree:count_node_cb/3,
{_QueryOptions, TreeOptions} = khepri_machine:split_query_options(Options),
{_QueryOptions, TreeOptions} =
khepri_machine:split_query_options(self(), Options),
TreeOptions1 = TreeOptions#{expect_specific_node => false},
Ret = khepri_tree:fold(Tree, PathPattern1, Fun, 0, TreeOptions1),
case Ret of
Expand Down Expand Up @@ -518,7 +519,8 @@ fold(PathPattern, Fun, Acc, Options) ->
PathPattern1 = khepri_tx_adv:path_from_string(PathPattern),
{State, _SideEffects} = khepri_tx_adv:get_tx_state(),
Tree = khepri_machine:get_tree(State),
{_QueryOptions, TreeOptions} = khepri_machine:split_query_options(Options),
{_QueryOptions, TreeOptions} =
khepri_machine:split_query_options(self(), Options),
TreeOptions1 = TreeOptions#{expect_specific_node => false},
Ret = khepri_tree:fold(Tree, PathPattern1, Fun, Acc, TreeOptions1),
case Ret of
Expand Down
7 changes: 4 additions & 3 deletions src/khepri_tx_adv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ get_many(PathPattern, Options) ->

do_get_many(PathPattern, Fun, Acc, Options) ->
PathPattern1 = path_from_string(PathPattern),
{_QueryOptions, TreeOptions} = khepri_machine:split_query_options(Options),
{_QueryOptions, TreeOptions} =
khepri_machine:split_query_options(self(), Options),
{State, _SideEffects} = get_tx_state(),
Tree = khepri_machine:get_tree(State),
Ret = khepri_tree:fold(Tree, PathPattern1, Fun, Acc, TreeOptions),
Expand Down Expand Up @@ -211,7 +212,7 @@ put_many(PathPattern, Data, Options) ->
PathPattern1 = path_from_string(PathPattern),
Payload1 = khepri_payload:wrap(Data),
{_CommandOptions, TreeAndPutOptions} =
khepri_machine:split_command_options(Options),
khepri_machine:split_command_options(self(), Options),
{TreeOptions, PutOptions} =
khepri_machine:split_put_options(TreeAndPutOptions),
%% TODO: Ensure `CommandOptions' is unset.
Expand Down Expand Up @@ -402,7 +403,7 @@ delete_many(PathPattern, Options) ->
ensure_updates_are_allowed(),
PathPattern1 = path_from_string(PathPattern),
{_CommandOptions, TreeOptions} =
khepri_machine:split_command_options(Options),
khepri_machine:split_command_options(self(), Options),
%% TODO: Ensure `CommandOptions' is empty and `TreeOptions' doesn't
%% contains put options.
Fun = fun(State, SideEffects) ->
Expand Down

0 comments on commit dafe6a2

Please sign in to comment.