From 0061944e9cc5d2590f9291ce968f8cf8692c4a78 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Wed, 21 Aug 2024 10:38:38 +0200 Subject: [PATCH] Cancel AMQP stream consumer when local stream member is deleted The consumer reader process is gone and there is no way to recover it as the node does not have a member of the stream anymore, so it should be cancelled/detached. --- deps/rabbit/BUILD.bazel | 2 +- deps/rabbit/src/rabbit_stream_coordinator.erl | 6 +++ deps/rabbit/src/rabbit_stream_queue.erl | 4 +- .../rabbit/test/rabbit_stream_queue_SUITE.erl | 42 +++++++++++++++++++ 4 files changed, 52 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index c91cd890ff2c..5922b3d03617 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -832,7 +832,7 @@ rabbitmq_integration_suite( additional_beam = [ ":test_queue_utils_beam", ], - shard_count = 19, + shard_count = 20, deps = [ "@proper//:erlang_app", ], diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 0846dd58d1e0..12c10c5e4ddc 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -1747,6 +1747,12 @@ eval_listener({P, member}, {ListNode, ListMPid0}, {Lsts0, Effs0}, {queue_event, QRef, {stream_local_member_change, MemberPid}}, cast} | Efs]}; + (_MNode, #member{state = {running, _, MemberPid}, + role = {replica, _}, + target = deleted}, {_, Efs}) -> + {MemberPid, [{send_msg, P, + {queue_event, QRef, deleted_replica}, + cast} | Efs]}; (_N, _M, Acc) -> %% not a replica, nothing to do Acc diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index e36ad708eb9a..6ee619e0e0ff 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -626,7 +626,9 @@ handle_event(_QName, {stream_local_member_change, Pid}, end, #{}, Readers0), {ok, State#stream_client{local_pid = Pid, readers = Readers1}, []}; handle_event(_QName, eol, #stream_client{name = Name}) -> - {eol, [{unblock, Name}]}. + {eol, [{unblock, Name}]}; +handle_event(QName, deleted_replica, State) -> + {ok, State, [{queue_down, QName}]}. is_recoverable(Q) -> Node = node(), diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 3d09d901caf9..3a74b4753bd0 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -34,6 +34,7 @@ all() -> {group, cluster_size_3}, {group, cluster_size_3_1}, {group, cluster_size_3_2}, + {group, cluster_size_3_3}, {group, cluster_size_3_parallel_1}, {group, cluster_size_3_parallel_2}, {group, cluster_size_3_parallel_3}, @@ -79,6 +80,7 @@ groups() -> {cluster_size_3_2, [], [recover, declare_with_node_down_1, declare_with_node_down_2]}, + {cluster_size_3_3, [], [consume_while_deleting_replica]}, {cluster_size_3_parallel_1, [parallel], [ delete_replica, delete_last_replica, @@ -207,6 +209,7 @@ init_per_group1(Group, Config) -> cluster_size_3_parallel_5 -> 3; cluster_size_3_1 -> 3; cluster_size_3_2 -> 3; + cluster_size_3_3 -> 3; unclustered_size_3_1 -> 3; unclustered_size_3_2 -> 3; unclustered_size_3_3 -> 3; @@ -1649,6 +1652,45 @@ consume_from_replica(Config) -> receive_batch(Ch2, 0, 99), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +consume_while_deleting_replica(Config) -> + [Server1, _, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + rabbit_ct_helpers:await_condition( + fun () -> + Info = find_queue_info(Config, 1, [online]), + length(proplists:get_value(online, Info)) == 3 + end), + + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server3), + qos(Ch2, 10, false), + + CTag = atom_to_binary(?FUNCTION_NAME), + subscribe(Ch2, Q, false, 0, CTag), + + %% Delete replica in node 3 + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stream_queue, + delete_replica, [<<"/">>, Q, Server3]), + + publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]), + + %% no messages should be received + receive + #'basic.cancel'{consumer_tag = CTag} -> + ok; + {_, #amqp_msg{}} -> + exit(unexpected_message) + after 30000 -> + exit(missing_consumer_cancel) + end, + + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + consume_credit(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),