Skip to content

Commit

Permalink
Merge pull request #12074 from rabbitmq/issue-11915
Browse files Browse the repository at this point in the history
Cancel AMQP stream consumer when local stream member is deleted
  • Loading branch information
acogoluegnes authored Sep 2, 2024
2 parents f0932e3 + 0061944 commit 56964a8
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 2 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ rabbitmq_integration_suite(
additional_beam = [
":test_queue_utils_beam",
],
shard_count = 19,
shard_count = 20,
deps = [
"@proper//:erlang_app",
],
Expand Down
6 changes: 6 additions & 0 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,12 @@ eval_listener({P, member}, {ListNode, ListMPid0}, {Lsts0, Effs0},
{queue_event, QRef,
{stream_local_member_change, MemberPid}},
cast} | Efs]};
(_MNode, #member{state = {running, _, MemberPid},
role = {replica, _},
target = deleted}, {_, Efs}) ->
{MemberPid, [{send_msg, P,
{queue_event, QRef, deleted_replica},
cast} | Efs]};
(_N, _M, Acc) ->
%% not a replica, nothing to do
Acc
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,9 @@ handle_event(_QName, {stream_local_member_change, Pid},
end, #{}, Readers0),
{ok, State#stream_client{local_pid = Pid, readers = Readers1}, []};
handle_event(_QName, eol, #stream_client{name = Name}) ->
{eol, [{unblock, Name}]}.
{eol, [{unblock, Name}]};
handle_event(QName, deleted_replica, State) ->
{ok, State, [{queue_down, QName}]}.

is_recoverable(Q) ->
Node = node(),
Expand Down
42 changes: 42 additions & 0 deletions deps/rabbit/test/rabbit_stream_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ all() ->
{group, cluster_size_3},
{group, cluster_size_3_1},
{group, cluster_size_3_2},
{group, cluster_size_3_3},
{group, cluster_size_3_parallel_1},
{group, cluster_size_3_parallel_2},
{group, cluster_size_3_parallel_3},
Expand Down Expand Up @@ -79,6 +80,7 @@ groups() ->
{cluster_size_3_2, [], [recover,
declare_with_node_down_1,
declare_with_node_down_2]},
{cluster_size_3_3, [], [consume_while_deleting_replica]},
{cluster_size_3_parallel_1, [parallel], [
delete_replica,
delete_last_replica,
Expand Down Expand Up @@ -207,6 +209,7 @@ init_per_group1(Group, Config) ->
cluster_size_3_parallel_5 -> 3;
cluster_size_3_1 -> 3;
cluster_size_3_2 -> 3;
cluster_size_3_3 -> 3;
unclustered_size_3_1 -> 3;
unclustered_size_3_2 -> 3;
unclustered_size_3_3 -> 3;
Expand Down Expand Up @@ -1648,6 +1651,45 @@ consume_from_replica(Config) ->
receive_batch(Ch2, 0, 99),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).

consume_while_deleting_replica(Config) ->
[Server1, _, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
Q = ?config(queue_name, Config),

?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),

rabbit_ct_helpers:await_condition(
fun () ->
Info = find_queue_info(Config, 1, [online]),
length(proplists:get_value(online, Info)) == 3
end),

Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server3),
qos(Ch2, 10, false),

CTag = atom_to_binary(?FUNCTION_NAME),
subscribe(Ch2, Q, false, 0, CTag),

%% Delete replica in node 3
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stream_queue,
delete_replica, [<<"/">>, Q, Server3]),

publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),

%% no messages should be received
receive
#'basic.cancel'{consumer_tag = CTag} ->
ok;
{_, #amqp_msg{}} ->
exit(unexpected_message)
after 30000 ->
exit(missing_consumer_cancel)
end,

rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).

consume_credit(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Expand Down

0 comments on commit 56964a8

Please sign in to comment.