diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index c40ceedfb423..90bdf6857bef 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -823,7 +823,7 @@ rabbitmq_integration_suite( additional_beam = [ ":test_queue_utils_beam", ], - shard_count = 19, + shard_count = 20, deps = [ "@proper//:erlang_app", ], diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 0846dd58d1e0..12c10c5e4ddc 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -1747,6 +1747,12 @@ eval_listener({P, member}, {ListNode, ListMPid0}, {Lsts0, Effs0}, {queue_event, QRef, {stream_local_member_change, MemberPid}}, cast} | Efs]}; + (_MNode, #member{state = {running, _, MemberPid}, + role = {replica, _}, + target = deleted}, {_, Efs}) -> + {MemberPid, [{send_msg, P, + {queue_event, QRef, deleted_replica}, + cast} | Efs]}; (_N, _M, Acc) -> %% not a replica, nothing to do Acc diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 4d88951b58c2..426d217cd1db 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -626,7 +626,9 @@ handle_event(_QName, {stream_local_member_change, Pid}, end, #{}, Readers0), {ok, State#stream_client{local_pid = Pid, readers = Readers1}, []}; handle_event(_QName, eol, #stream_client{name = Name}) -> - {eol, [{unblock, Name}]}. + {eol, [{unblock, Name}]}; +handle_event(QName, deleted_replica, State) -> + {ok, State, [{queue_down, QName}]}. is_recoverable(Q) -> Node = node(), diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 014c8b03875c..c8b2f8aabce9 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -34,6 +34,7 @@ all() -> {group, cluster_size_3}, {group, cluster_size_3_1}, {group, cluster_size_3_2}, + {group, cluster_size_3_3}, {group, cluster_size_3_parallel_1}, {group, cluster_size_3_parallel_2}, {group, cluster_size_3_parallel_3}, @@ -79,6 +80,7 @@ groups() -> {cluster_size_3_2, [], [recover, declare_with_node_down_1, declare_with_node_down_2]}, + {cluster_size_3_3, [], [consume_while_deleting_replica]}, {cluster_size_3_parallel_1, [parallel], [ delete_replica, delete_last_replica, @@ -207,6 +209,7 @@ init_per_group1(Group, Config) -> cluster_size_3_parallel_5 -> 3; cluster_size_3_1 -> 3; cluster_size_3_2 -> 3; + cluster_size_3_3 -> 3; unclustered_size_3_1 -> 3; unclustered_size_3_2 -> 3; unclustered_size_3_3 -> 3; @@ -1648,6 +1651,45 @@ consume_from_replica(Config) -> receive_batch(Ch2, 0, 99), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +consume_while_deleting_replica(Config) -> + [Server1, _, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + rabbit_ct_helpers:await_condition( + fun () -> + Info = find_queue_info(Config, 1, [online]), + length(proplists:get_value(online, Info)) == 3 + end), + + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server3), + qos(Ch2, 10, false), + + CTag = atom_to_binary(?FUNCTION_NAME), + subscribe(Ch2, Q, false, 0, CTag), + + %% Delete replica in node 3 + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stream_queue, + delete_replica, [<<"/">>, Q, Server3]), + + publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]), + + %% no messages should be received + receive + #'basic.cancel'{consumer_tag = CTag} -> + ok; + {_, #amqp_msg{}} -> + exit(unexpected_message) + after 30000 -> + exit(missing_consumer_cancel) + end, + + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + consume_credit(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),