From 4cfc7dc995018c91e12a8c8f4e978da502c5951a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 9 Jan 2025 20:55:48 +0100 Subject: [PATCH 1/3] fix: make global lock guarding join operations stricter Otherwise, concurrent joins can ruin each other's lives and make any further cluster operations impossible. This can happen, for example, when a concurrent join stops the entire `mnesia` system while another join is running schema transactions. --- include/mria.hrl | 2 +- src/mria.erl | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/include/mria.hrl b/include/mria.hrl index 32ed304..2fd86df 100644 --- a/include/mria.hrl +++ b/include/mria.hrl @@ -17,4 +17,4 @@ -type(member() :: #member{}). --define(JOIN_LOCK_ID, {mria_sync_join, node()}). +-define(JOIN_LOCK_ID(REQUESTER), {mria_sync_join, REQUESTER}). diff --git a/src/mria.erl b/src/mria.erl index b757d2c..eef6c7c 100644 --- a/src/mria.erl +++ b/src/mria.erl @@ -235,11 +235,22 @@ join(Node) -> join(Node, _) when Node =:= node() -> ignore; join(Node, Reason) when is_atom(Node) -> + %% NOTE + %% %% If two nodes are trying to join each other simultaneously, %% one of them must be blocked waiting for a lock. %% Once lock is released, it is expected to be already in the %% cluster (if the other node joined it successfully). - global:trans(?JOIN_LOCK_ID, fun() -> join1(Node, Reason) end, [node(), Node]). + %% + %% Additionally, avoid conducting concurrent join operations + %% by specifying current process PID as the lock requester. + %% Otherwise, concurrent joins can ruin each other's lives and + %% make any further cluster operations impossible. + %% This can happen, for example, when a concurrent join stops the + %% entire `mnesia` system while another join is running schema + %% transactions. + LockId = ?JOIN_LOCK_ID(self()), + global:trans(LockId, fun() -> join1(Node, Reason) end, [node(), Node]). %% @doc Leave the cluster -spec leave() -> ok | {error, term()}. From ad1383a82aa43528cbf78f09a28789480f2d2684 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 9 Jan 2025 20:56:54 +0100 Subject: [PATCH 2/3] chore: bump select dependencies to recent versions --- rebar.config | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rebar.config b/rebar.config index 0d2089a..d570dc2 100644 --- a/rebar.config +++ b/rebar.config @@ -2,8 +2,8 @@ {minimum_otp_vsn, "21.0"}. {deps, - [{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe", {tag, "1.0.7"}}}, - {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}}, + [{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe", {tag, "1.0.10"}}}, + {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.4.1"}}}, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.6"}}}, {mnesia_rocksdb, {git, "https://github.com/emqx/mnesia_rocksdb", {tag, "0.1.16"}}}, {optvar, {git, "https://github.com/emqx/optvar", {tag, "1.0.5"}}} From fee7a61d91a621aebdd3872410013c3ccf737c3f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 10 Jan 2025 13:31:21 +0100 Subject: [PATCH 3/3] test: add testcase for concurrent joins --- src/mria_mnesia.erl | 2 ++ test/mria_SUITE.erl | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/src/mria_mnesia.erl b/src/mria_mnesia.erl index 5662004..23f8048 100644 --- a/src/mria_mnesia.erl +++ b/src/mria_mnesia.erl @@ -130,6 +130,7 @@ ensure_stopped() -> %% @doc Cluster with node. -spec(connect(node()) -> ok | {error, any()}). connect(Node) -> + ?tp(mria_mnesia_connect, #{to => Node}), case mnesia:change_config(extra_db_nodes, [Node]) of {ok, [Node]} -> ok; {ok, []} -> {error, {failed_to_connect_node, Node}}; @@ -237,6 +238,7 @@ is_node_in_cluster(Node) -> %% @doc Copy schema. copy_schema(Node) -> + ?tp(mria_mnesia_copy_schema, #{}), case mnesia:change_table_copy_type(schema, Node, disc_copies) of {atomic, ok} -> ok; {aborted, {already_exists, schema, Node, disc_copies}} -> diff --git a/test/mria_SUITE.erl b/test/mria_SUITE.erl index b6f5b28..671fcff 100644 --- a/test/mria_SUITE.erl +++ b/test/mria_SUITE.erl @@ -1317,6 +1317,43 @@ t_join_another_node_simultaneously(_) -> end, []). +t_join_many_nodes_simultaneously(_) -> + % Self = self(), + CommonEnv = mria_mnesia_test_util:common_env(), + Cluster = [maps:remove(join_to, Spec) + || Spec <- mria_ct:cluster([core, core, core, core], CommonEnv)], + ?check_trace( + #{timetrap => 15_000}, + try + %% Spin the cluster up. + [N1, N2, N3, N4] = Nodes = mria_ct:start_cluster(mria, Cluster), + %% Connect only N2, N3, N4 together. + ok = rpc:call(N2, mria, join, [N4]), + ok = rpc:call(N3, mria, join, [N4]), + %% Subscribe to an event emitted right before schema transactions take place. + {ok, SRef} = snabbkaffe:subscribe(?match_event(#{?snk_kind := mria_mnesia_connect})), + %% Ask N1 to join the cluster (using N2 as a seed). + K1 = rpc:async_call(N1, mria, join, [N2]), + %% Wait for the event, and ask (concurrently) N1 to join the cluster (using + %% other 2 nodes as seeds). + {ok, _} = snabbkaffe:receive_events(SRef), + K2 = rpc:async_call(N1, mria, join, [N3]), + K3 = rpc:async_call(N1, mria, join, [N4]), + ?assertMatch([ok, + {error, {already_in_cluster, _}}, + {error, {already_in_cluster, _}}], + lists:sort([rpc:yield(K) || K <- [K1, K2, K3]])), + timer:sleep(3000), + ?assertEqual({[true, true, true, true], []}, + rpc:multicall(Nodes, mria_sup, is_running, [])), + {Results, []} = rpc:multicall(Nodes, mria_mnesia, running_nodes, []), + ?assertEqual([Nodes, Nodes, Nodes, Nodes], + lists:map(fun lists:sort/1, Results)) + after + ok = mria_ct:teardown_cluster(Cluster) + end, + []). + cluster_benchmark(_) -> NReplicas = 6, Config = #{ trans_size => 10