From 0b48282652842246f5b1f682dfa6c60d4b774e59 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 21 Jun 2024 19:16:14 +0300 Subject: [PATCH] feat(mria_lb): add incompatibility reasons to custom compatibility check --- src/mria_lb.erl | 107 ++++++++++++++++++++++++----------------- test/mria_lb_SUITE.erl | 13 ++++- 2 files changed, 73 insertions(+), 47 deletions(-) diff --git a/src/mria_lb.erl b/src/mria_lb.erl index 8ac1aca..eaff5e9 100644 --- a/src/mria_lb.erl +++ b/src/mria_lb.erl @@ -171,7 +171,8 @@ do_update(State = #s{core_nodes = OldCoreNodes, node_info = OldNodeInfo}) -> maybe_report_netsplit(OldCoreNodes, Clusters), {IsChanged, NewCoreNodes} = find_best_cluster(OldCoreNodes, Clusters), %% Update shards: - ShardBadness = shard_badness(maps:with(NewCoreNodes, NodeInfo)), + {ShardBadness, IncompatNodesInfo} = shard_badness(maps:with(NewCoreNodes, NodeInfo)), + _ = maybe_report_incompatibility(ShardBadness, IncompatNodesInfo), maps:map(fun(Shard, {Node, _Badness}) -> mria_status:notify_core_node_up(Shard, Node) end, @@ -190,6 +191,13 @@ do_update(State = #s{core_nodes = OldCoreNodes, node_info = OldNodeInfo}) -> ping_new_nodes(NewCoreNodes, DiscoveredReplicants), State#s{core_nodes = NewCoreNodes, node_info = NodeInfo}. +maybe_report_incompatibility(ShardBadness, IncompatInfo) when ShardBadness =:= #{}, + IncompatInfo =/= #{} -> + ?tp(warning, "No core node in the cluster is compatible with this replicant node", + #{nodes_info => IncompatInfo}); +maybe_report_incompatibility(_ShardBadness, _Errs) -> + ok. + %% Find fully connected clusters (i.e. cliques of nodes) -spec find_clusters(#{node() => node_info()}) -> [[node()]]. find_clusters(NodeInfo) -> @@ -199,57 +207,66 @@ find_clusters(NodeInfo) -> NodeInfo)). %% Find the preferred core node for each shard: --spec shard_badness(#{node() => node_info()}) -> #{mria_rlog:shard() => {node(), Badness}} - when Badness :: float(). +-spec shard_badness(#{node() => node_info()}) -> {#{mria_rlog:shard() => {node(), Badness}}, #{node() => Reason}} + when Badness :: float(), Reason :: term(). shard_badness(NodeInfo) -> maps:fold( - fun(Node, LbInfo = #{shard_badness := Shards}, Acc) -> - case verify_node_compatibility(LbInfo) of - true -> - lists:foldl( - fun({Shard, Badness}, Acc1) -> - maps:update_with(Shard, - fun({_OldNode, OldBadness}) when OldBadness > Badness -> - {Node, Badness}; - (Old) -> - Old - end, - {Node, Badness}, - Acc1) - end, - Acc, - Shards); - false -> - Acc - end + fun(Node, LbInfo = #{shard_badness := Shards}, {Acc, IncompatAcc}) -> + case verify_node_compatibility(LbInfo) of + true -> + Acc1 = lists:foldl( + fun({Shard, Badness}, Acc1) -> + maps:update_with(Shard, + fun({_OldNode, OldBadness}) when OldBadness > Badness -> + {Node, Badness}; + (Old) -> + Old + end, + {Node, Badness}, + Acc1) + end, + Acc, + Shards), + {Acc1, IncompatAcc}; + {false, Reason} -> + {Acc, IncompatAcc#{Node => Reason}} + end end, - #{}, + {#{}, #{}}, NodeInfo). verify_node_compatibility(LbInfo = #{protocol_version := ProtoVsn}) -> - case mria_config:callback(lb_custom_info_check) of - {ok, CustomCheckFun} -> - ok; - undefined -> - CustomCheckFun = fun(_) -> true end - end, - CustomInfo = maps:get(custom_info, LbInfo, undefined), MyProtoVersion = mria_rlog:get_protocol_version(), - %% Actual check: - IsCustomCompat = try - Result = CustomCheckFun(CustomInfo), - is_boolean(Result) orelse - error({non_boolean_result, Result}), - Result - catch - %% TODO: this can get spammy: - EC:Err:Stack -> - ?tp(error, mria_failed_to_check_upstream_compatibility, - #{lb_info => LbInfo, EC => Err, stacktrace => Stack}), - false + case ProtoVsn =:= MyProtoVersion of + true -> + verify_custom_compatibility(LbInfo); + false -> + {false, "Mria protocol version doesn't match"} + end. + +verify_custom_compatibility(LbInfo) -> + CustomCheckFun = case mria_config:callback(lb_custom_info_check) of + {ok, Fun} -> Fun; + undefined -> fun(_) -> true end end, - ProtoVsn =:= MyProtoVersion andalso - IsCustomCompat. + CustomInfo = maps:get(custom_info, LbInfo, undefined), + try + case CustomCheckFun(CustomInfo) of + true -> true; + %% backward-compatibility for CustomCheckFun that doesn't return + %% {false, Reason} + false -> {false, undefined}; + {false, Reason} -> {false, Reason}; + Other -> + error({non_boolean_result, Other}) + end + catch + %% TODO: this can get spammy: + EC:Err:Stack -> + ?tp(error, mria_failed_to_check_upstream_compatibility, + #{lb_info => LbInfo, EC => Err, stacktrace => Stack}), + {false, undefined} + end. start_timer(LastUpdateTime) -> %% Leave at least 100 ms between updates to leave some time to @@ -482,7 +499,7 @@ find_clusters_test_() -> shard_badness_test_() -> Vsn = mria_rlog:get_protocol_version(), - [ ?_assertMatch( #{foo := {n1, 1}, bar := {n2, 2}} + [ ?_assertMatch( {#{foo := {n1, 1}, bar := {n2, 2}}, _} , shard_badness(#{ n1 => #{shard_badness => [{foo, 1}], protocol_version => Vsn} , n2 => #{shard_badness => [{foo, 2}, {bar, 2}], protocol_version => Vsn} }) diff --git a/test/mria_lb_SUITE.erl b/test/mria_lb_SUITE.erl index fff337d..cee2ee7 100644 --- a/test/mria_lb_SUITE.erl +++ b/test/mria_lb_SUITE.erl @@ -269,12 +269,21 @@ t_node_leave_disable_discovery(_Config) -> end, []). t_custom_compat_check(_Config) -> - Env = [ {mria, {callback, lb_custom_info_check}, fun(Val) -> Val =:= chosen_one end} + custom_compat_check_test(fun(Val) -> Val =:= chosen_one end, chosen_one). + +t_custom_compat_check_with_reason(_Config) -> + Fun = fun(Val) -> + Val =:= chosen_one orelse {false, "not a chosen one"} + end, + custom_compat_check_test(Fun, chosen_one). + +custom_compat_check_test(CheckFun, CheckFunArg) -> + Env = [ {mria, {callback, lb_custom_info_check}, CheckFun} | mria_mnesia_test_util:common_env()], Cluster = mria_ct:cluster([ core , core , {core, [{mria, {callback, lb_custom_info}, - fun() -> chosen_one end}]} + fun() -> CheckFunArg end}]} , replicant ], Env), ?check_trace(