From 0199d11c381b0d6ad59026409a80337b060081a2 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 26 Oct 2020 18:50:07 +0300 Subject: [PATCH] Merge pull request #150 from rabbitmq/binding-recovery-fix Explicitly recover binding on plugin init (cherry picked from commit 4d838f3159b6dffa4ccf447b1f5054258039f7b2) --- src/rabbit_delayed_message.erl | 45 +++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/src/rabbit_delayed_message.erl b/src/rabbit_delayed_message.erl index c3943ef..69e2624 100644 --- a/src/rabbit_delayed_message.erl +++ b/src/rabbit_delayed_message.erl @@ -13,7 +13,7 @@ %% https://www.erlang.org/documentation/doc-7.0-rc1/erts-7.0/doc/html/time_correction.html -module(rabbit_delayed_message). - +-include_lib("rabbit_common/include/rabbit.hrl"). -rabbit_boot_step({?MODULE, [{description, "exchange delayed message mnesia setup"}, {mfa, {?MODULE, setup_mnesia, []}}, @@ -106,6 +106,7 @@ messages_delayed(Exchange) -> %%-------------------------------------------------------------------- init([]) -> + recover(), {ok, #state{timer = not_set}}. handle_call({delay_message, Exchange, Delivery, Delay}, @@ -216,3 +217,45 @@ append_to_atom(Atom, Append) when is_atom(Append) -> append_to_atom(Atom, atom_to_list(Append)); append_to_atom(Atom, Append) when is_list(Append) -> list_to_atom(atom_to_list(Atom) ++ Append). + +recover() -> + %% topology recovery has already happened, we have to recover state for any durable + %% consistent hash exchanges since plugin activation was moved later in boot process + %% starting with RabbitMQ 3.8.4 + case list_exchanges() of + {ok, Xs} -> + rabbit_log:debug("Delayed message exchange: " + "have ~b durable exchanges to recover", + [length(Xs)]), + [recover_exchange_and_bindings(X) || X <- lists:usort(Xs)]; + {aborted, Reason} -> + rabbit_log:error( + "Delayed message exchange: " + "failed to recover durable bindings of one of the exchanges, reason: ~p", + [Reason]) + end. + +list_exchanges() -> + case mnesia:transaction( + fun () -> + mnesia:match_object( + rabbit_exchange, #exchange{durable = true, + type = 'x-delayed-message', + _ = '_'}, write) + end) of + {atomic, Xs} -> + {ok, Xs}; + {aborted, Reason} -> + {aborted, Reason} + end. + +recover_exchange_and_bindings(#exchange{name = XName} = X) -> + mnesia:transaction( + fun () -> + Bindings = rabbit_binding:list_for_source(XName), + [rabbit_exchange_type_delayed_message:add_binding(transaction, X, B) + || B <- lists:usort(Bindings)], + rabbit_log:debug("Delayed message exchange: " + "recovered bindings for ~s", + [rabbit_misc:rs(XName)]) + end).