Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mria_lb): add incompatibility reasons to custom compatibility check #181

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 62 additions & 45 deletions src/mria_lb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ do_update(State = #s{core_nodes = OldCoreNodes, node_info = OldNodeInfo}) ->
maybe_report_netsplit(OldCoreNodes, Clusters),
{IsChanged, NewCoreNodes} = find_best_cluster(OldCoreNodes, Clusters),
%% Update shards:
ShardBadness = shard_badness(maps:with(NewCoreNodes, NodeInfo)),
{ShardBadness, IncompatNodesInfo} = shard_badness(maps:with(NewCoreNodes, NodeInfo)),
_ = maybe_report_incompatibility(ShardBadness, IncompatNodesInfo),
maps:map(fun(Shard, {Node, _Badness}) ->
mria_status:notify_core_node_up(Shard, Node)
end,
Expand All @@ -190,6 +191,13 @@ do_update(State = #s{core_nodes = OldCoreNodes, node_info = OldNodeInfo}) ->
ping_new_nodes(NewCoreNodes, DiscoveredReplicants),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes a replicant a running member of the cluster, even though it has no compatible core node to connect to and, thus, is not fully initialized.
The node is returned from mria:running_nodes/0 and can be used in various RPCs at EMQX layer resulting in numerous levels.
Perhaps we should add some mria_membership status as connecting and don't include disconnected replicants to the running nodes list.

State#s{core_nodes = NewCoreNodes, node_info = NodeInfo}.

maybe_report_incompatibility(ShardBadness, IncompatInfo) when ShardBadness =:= #{},
IncompatInfo =/= #{} ->
?tp(warning, "No core node in the cluster is compatible with this replicant node",
#{nodes_info => IncompatInfo});
maybe_report_incompatibility(_ShardBadness, _Errs) ->
ok.

%% Find fully connected clusters (i.e. cliques of nodes)
-spec find_clusters(#{node() => node_info()}) -> [[node()]].
find_clusters(NodeInfo) ->
Expand All @@ -199,57 +207,66 @@ find_clusters(NodeInfo) ->
NodeInfo)).

%% Find the preferred core node for each shard:
-spec shard_badness(#{node() => node_info()}) -> #{mria_rlog:shard() => {node(), Badness}}
when Badness :: float().
-spec shard_badness(#{node() => node_info()}) -> {#{mria_rlog:shard() => {node(), Badness}}, #{node() => Reason}}
when Badness :: float(), Reason :: term().
shard_badness(NodeInfo) ->
maps:fold(
fun(Node, LbInfo = #{shard_badness := Shards}, Acc) ->
case verify_node_compatibility(LbInfo) of
true ->
lists:foldl(
fun({Shard, Badness}, Acc1) ->
maps:update_with(Shard,
fun({_OldNode, OldBadness}) when OldBadness > Badness ->
{Node, Badness};
(Old) ->
Old
end,
{Node, Badness},
Acc1)
end,
Acc,
Shards);
false ->
Acc
end
fun(Node, LbInfo = #{shard_badness := Shards}, {Acc, IncompatAcc}) ->
case verify_node_compatibility(LbInfo) of
true ->
Acc1 = lists:foldl(
fun({Shard, Badness}, Acc1) ->
maps:update_with(Shard,
fun({_OldNode, OldBadness}) when OldBadness > Badness ->
{Node, Badness};
(Old) ->
Old
end,
{Node, Badness},
Acc1)
end,
Acc,
Shards),
{Acc1, IncompatAcc};
{false, Reason} ->
{Acc, IncompatAcc#{Node => Reason}}
end
end,
#{},
{#{}, #{}},
NodeInfo).

verify_node_compatibility(LbInfo = #{protocol_version := ProtoVsn}) ->
case mria_config:callback(lb_custom_info_check) of
{ok, CustomCheckFun} ->
ok;
undefined ->
CustomCheckFun = fun(_) -> true end
end,
CustomInfo = maps:get(custom_info, LbInfo, undefined),
MyProtoVersion = mria_rlog:get_protocol_version(),
%% Actual check:
IsCustomCompat = try
Result = CustomCheckFun(CustomInfo),
is_boolean(Result) orelse
error({non_boolean_result, Result}),
Result
catch
%% TODO: this can get spammy:
EC:Err:Stack ->
?tp(error, mria_failed_to_check_upstream_compatibility,
#{lb_info => LbInfo, EC => Err, stacktrace => Stack}),
false
case ProtoVsn =:= MyProtoVersion of
true ->
verify_custom_compatibility(LbInfo);
false ->
{false, "Mria protocol version doesn't match"}
end.

verify_custom_compatibility(LbInfo) ->
CustomCheckFun = case mria_config:callback(lb_custom_info_check) of
{ok, Fun} -> Fun;
undefined -> fun(_) -> true end
end,
ProtoVsn =:= MyProtoVersion andalso
IsCustomCompat.
CustomInfo = maps:get(custom_info, LbInfo, undefined),
try
case CustomCheckFun(CustomInfo) of
true -> true;
%% backward-compatibility for CustomCheckFun that doesn't return
%% {false, Reason}
false -> {false, undefined};
{false, Reason} -> {false, Reason};
Other ->
error({non_boolean_result, Other})
end
catch
%% TODO: this can get spammy:
EC:Err:Stack ->
?tp(error, mria_failed_to_check_upstream_compatibility,
#{lb_info => LbInfo, EC => Err, stacktrace => Stack}),
{false, undefined}
end.

start_timer(LastUpdateTime) ->
%% Leave at least 100 ms between updates to leave some time to
Expand Down Expand Up @@ -482,7 +499,7 @@ find_clusters_test_() ->

shard_badness_test_() ->
Vsn = mria_rlog:get_protocol_version(),
[ ?_assertMatch( #{foo := {n1, 1}, bar := {n2, 2}}
[ ?_assertMatch( {#{foo := {n1, 1}, bar := {n2, 2}}, _}
, shard_badness(#{ n1 => #{shard_badness => [{foo, 1}], protocol_version => Vsn}
, n2 => #{shard_badness => [{foo, 2}, {bar, 2}], protocol_version => Vsn}
})
Expand Down
13 changes: 11 additions & 2 deletions test/mria_lb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,21 @@ t_node_leave_disable_discovery(_Config) ->
end, []).

t_custom_compat_check(_Config) ->
Env = [ {mria, {callback, lb_custom_info_check}, fun(Val) -> Val =:= chosen_one end}
custom_compat_check_test(fun(Val) -> Val =:= chosen_one end, chosen_one).

t_custom_compat_check_with_reason(_Config) ->
Fun = fun(Val) ->
Val =:= chosen_one orelse {false, "not a chosen one"}
end,
custom_compat_check_test(Fun, chosen_one).

custom_compat_check_test(CheckFun, CheckFunArg) ->
Env = [ {mria, {callback, lb_custom_info_check}, CheckFun}
| mria_mnesia_test_util:common_env()],
Cluster = mria_ct:cluster([ core
, core
, {core, [{mria, {callback, lb_custom_info},
fun() -> chosen_one end}]}
fun() -> CheckFunArg end}]}
, replicant
], Env),
?check_trace(
Expand Down