Skip to content

rabbitmq-queues: a way to grow a quorum queue member set to a target size #13873

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 85 additions & 16 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1543,29 +1543,20 @@ shrink_all(Node) ->
amqqueue:get_type(Q) == ?MODULE,
lists:member(Node, get_nodes(Q))].


-spec grow(node() | integer(), binary(), binary(), all | even) ->
[{rabbit_amqqueue:name(),
{ok, pos_integer()} | {error, pos_integer(), term()}}].
grow(Node, VhostSpec, QueueSpec, Strategy) ->
grow(Node, VhostSpec, QueueSpec, Strategy, promotable).

-spec grow(node(), binary(), binary(), all | even, membership()) ->
-spec grow(node() | integer(), binary(), binary(), all | even, membership()) ->
[{rabbit_amqqueue:name(),
{ok, pos_integer()} | {error, pos_integer(), term()}}].
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
grow(Node, VhostSpec, QueueSpec, Strategy, Membership) when is_atom(Node) ->
Running = rabbit_nodes:list_running(),
[begin
Size = length(get_nodes(Q)),
QName = amqqueue:get_name(Q),
?LOG_INFO("~ts: adding a new member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
case add_member(Q, Node, Membership) of
ok ->
{QName, {ok, Size + 1}};
{error, Err} ->
?LOG_WARNING(
"~ts: failed to add member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end
maybe_grow(Q, Node, Membership, Size)
end
|| Q <- rabbit_amqqueue:list(),
amqqueue:get_type(Q) == ?MODULE,
Expand All @@ -1575,7 +1566,85 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) ->
lists:member(Node, Running),
matches_strategy(Strategy, get_nodes(Q)),
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ];

grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership)
when is_integer(QuorumClusterSize), QuorumClusterSize > 0 ->
Running = rabbit_nodes:list_running(),
TotalRunning = length(Running),

TargetQuorumClusterSize =
if QuorumClusterSize > TotalRunning ->
%% we cant grow beyond total running nodes
TotalRunning;
true ->
QuorumClusterSize
end,

lists:flatten(
[begin
QNodes = get_nodes(Q),
case length(QNodes) of
Size when Size < TargetQuorumClusterSize ->
TargetAvailableNodes = Running -- QNodes,
N = length(TargetAvailableNodes),
Node = lists:nth(rand:uniform(N), TargetAvailableNodes),
maybe_grow(Q, Node, Membership, Size);
_ ->
[]
end
end
|| _ <- lists:seq(1, TargetQuorumClusterSize),
Q <- rabbit_amqqueue:list(),
amqqueue:get_type(Q) == ?MODULE,
matches_strategy(Strategy, get_nodes(Q)),
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]);

grow(QuorumClusterSize, _VhostSpec, _QueueSpec, _Strategy, _Membership)
when is_integer(QuorumClusterSize) ->
rabbit_log:warning(
"cannot grow queues to a quorum cluster size less than zero (~tp)",
[QuorumClusterSize]),
{error, bad_quorum_cluster_size}.

maybe_grow(Q, Node, Membership, Size) ->
QNodes = get_nodes(Q),
maybe_grow(Q, Node, Membership, Size, QNodes).

maybe_grow(Q, Node, Membership, Size, QNodes) ->
QName = amqqueue:get_name(Q),
{ok, RaName} = qname_to_internal_name(QName),
case check_all_memberships(RaName, QNodes, voter) of
true ->
?LOG_INFO("~ts: adding a new member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
case add_member(Q, Node, Membership) of
ok ->
{QName, {ok, Size + 1}};
{error, Err} ->
?LOG_WARNING(
"~ts: failed to add member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end;
false ->
Err = {error, non_voters_found},
?LOG_WARNING(
"~ts: failed to add member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end.

check_all_memberships(RaName, QNodes, CompareMembership) ->
case rpc:multicall(QNodes, ets, lookup, [ra_state, RaName]) of
{Result, []} ->
lists:all(
fun(M) -> M == CompareMembership end,
[Membership || [{_RaName, _RaState, Membership}] <- Result]);
_ ->
false
end.

-spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}.
transfer_leadership(Q, Destination) ->
Expand Down
113 changes: 112 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ groups() ->
node_removal_is_not_quorum_critical,
select_nodes_with_least_replicas,
select_nodes_with_least_replicas_node_down,
subscribe_from_each
subscribe_from_each,
grow_queue


]},
Expand Down Expand Up @@ -1790,6 +1791,116 @@ dont_leak_file_handles(Config) ->
rabbit_ct_client_helpers:close_channel(C),
ok.

grow_queue(Config) ->
[Server0, Server1, Server2, _Server3, _Server4] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
AQ = ?config(alt_queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-quorum-initial-group-size">>, long, 5}])),
?assertEqual({'queue.declare_ok', AQ, 0, 0},
declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-quorum-initial-group-size">>, long, 5}])),

QQs = [QQ, AQ],
MsgCount = 3,

[begin
RaName = ra_name(Q),
rabbit_ct_client_helpers:publish(Ch, Q, MsgCount),
wait_for_messages_ready([Server0], RaName, MsgCount),
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]),
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
?assertEqual(5, length(Nodes0))
end || Q <- QQs],

rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_all_queues_shrink_member_to_current_member, []),

TargetClusterSize_1 = 1,
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),

%% grow queues to node 'Server1'
TargetClusterSize_2 = 2,
Result1 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]),
%% [{{resource,<<"/">>,queue,<<"grow_queue">>},{ok,2}},
%% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{ok,2}},...]
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result1)),
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),

%% grow queues to quorum cluster size '2' has no effect
Result2 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]),
?assertEqual([], Result2),
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),

%% grow queues to quorum cluster size '3'
TargetClusterSize_3 = 3,
Result3 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all, voter]),
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result3)),
assert_grown_queues(QQs, Server0, TargetClusterSize_3, MsgCount),

%% grow queues to quorum cluster size '5'
TargetClusterSize_5 = 5,
Result4 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all, voter]),
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result4)),
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount),

%% shrink all queues again down to 1 member
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_all_queues_shrink_member_to_current_member, []),
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),

%% grow queues to quorum cluster size > '5' (limit = 5).
TargetClusterSize_10 = 10,
Result5 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]),
?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result5)),
assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount),

%% shrink all queues again down to 1 member
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_all_queues_shrink_member_to_current_member, []),
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),

%% attempt to grow queues to quorum cluster size < '0'.
BadTargetClusterSize = -5,
?assertEqual({error, bad_quorum_cluster_size},
rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])),

%% shrink all queues again down to 1 member
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_all_queues_shrink_member_to_current_member, []),
assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount),

%% grow queues to node 'Server1': non_voter
rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all, non_voter]),
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),

%% grow queues to node 'Server2': fail, non_voters found
Result6 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server2, <<"/">>, <<".*">>, all, voter]),
%% [{{resource,<<"/">>,queue,<<"grow_queue">>},{error, 2, {error, non_voters_found}},
%% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{error, 2, {error, non_voters_found}},...]
?assert(lists:all(
fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result6)),
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount),

%% grow queues to target quorum cluster size '5': fail, non_voters found
Result7 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]),
?assert(lists:all(
fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result7)),
assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount).

assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) ->
[begin
RaName = ra_name(Q),
wait_for_messages_ready([Node], RaName, MsgCount),
{ok, Q0} = rpc:call(Node, rabbit_amqqueue, lookup, [Q, <<"/">>]),
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
?assertEqual(TargetClusterSize, length(Nodes0))
end || Q <- Qs].

gh_12635(Config) ->
check_quorum_queues_v4_compat(Config),

Expand Down
Loading
Loading