diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 5ae9a8a73973..c1b5fdb09839 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1543,29 +1543,20 @@ shrink_all(Node) -> amqqueue:get_type(Q) == ?MODULE, lists:member(Node, get_nodes(Q))]. - +-spec grow(node() | integer(), binary(), binary(), all | even) -> + [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}]. grow(Node, VhostSpec, QueueSpec, Strategy) -> grow(Node, VhostSpec, QueueSpec, Strategy, promotable). --spec grow(node(), binary(), binary(), all | even, membership()) -> +-spec grow(node() | integer(), binary(), binary(), all | even, membership()) -> [{rabbit_amqqueue:name(), {ok, pos_integer()} | {error, pos_integer(), term()}}]. -grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> +grow(Node, VhostSpec, QueueSpec, Strategy, Membership) when is_atom(Node) -> Running = rabbit_nodes:list_running(), [begin Size = length(get_nodes(Q)), - QName = amqqueue:get_name(Q), - ?LOG_INFO("~ts: adding a new member (replica) on node ~w", - [rabbit_misc:rs(QName), Node]), - case add_member(Q, Node, Membership) of - ok -> - {QName, {ok, Size + 1}}; - {error, Err} -> - ?LOG_WARNING( - "~ts: failed to add member (replica) on node ~w, error: ~w", - [rabbit_misc:rs(QName), Node, Err]), - {QName, {error, Size, Err}} - end + maybe_grow(Q, Node, Membership, Size) end || Q <- rabbit_amqqueue:list(), amqqueue:get_type(Q) == ?MODULE, @@ -1575,7 +1566,85 @@ grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> lists:member(Node, Running), matches_strategy(Strategy, get_nodes(Q)), is_match(amqqueue:get_vhost(Q), VhostSpec) andalso - is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]; + +grow(QuorumClusterSize, VhostSpec, QueueSpec, Strategy, Membership) + when is_integer(QuorumClusterSize), QuorumClusterSize > 0 -> + Running = rabbit_nodes:list_running(), + TotalRunning = length(Running), + + TargetQuorumClusterSize = + if QuorumClusterSize > TotalRunning -> + %% we cant grow beyond total running nodes + TotalRunning; + true -> + QuorumClusterSize + end, + + lists:flatten( + [begin + QNodes = get_nodes(Q), + case length(QNodes) of + Size when Size < TargetQuorumClusterSize -> + TargetAvailableNodes = Running -- QNodes, + N = length(TargetAvailableNodes), + Node = lists:nth(rand:uniform(N), TargetAvailableNodes), + maybe_grow(Q, Node, Membership, Size); + _ -> + [] + end + end + || _ <- lists:seq(1, TargetQuorumClusterSize), + Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == ?MODULE, + matches_strategy(Strategy, get_nodes(Q)), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]); + +grow(QuorumClusterSize, _VhostSpec, _QueueSpec, _Strategy, _Membership) + when is_integer(QuorumClusterSize) -> + rabbit_log:warning( + "cannot grow queues to a quorum cluster size less than zero (~tp)", + [QuorumClusterSize]), + {error, bad_quorum_cluster_size}. + +maybe_grow(Q, Node, Membership, Size) -> + QNodes = get_nodes(Q), + maybe_grow(Q, Node, Membership, Size, QNodes). + +maybe_grow(Q, Node, Membership, Size, QNodes) -> + QName = amqqueue:get_name(Q), + {ok, RaName} = qname_to_internal_name(QName), + case check_all_memberships(RaName, QNodes, voter) of + true -> + ?LOG_INFO("~ts: adding a new member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + case add_member(Q, Node, Membership) of + ok -> + {QName, {ok, Size + 1}}; + {error, Err} -> + ?LOG_WARNING( + "~ts: failed to add member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end; + false -> + Err = {error, non_voters_found}, + ?LOG_WARNING( + "~ts: failed to add member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end. + +check_all_memberships(RaName, QNodes, CompareMembership) -> + case rpc:multicall(QNodes, ets, lookup, [ra_state, RaName]) of + {Result, []} -> + lists:all( + fun(M) -> M == CompareMembership end, + [Membership || [{_RaName, _RaState, Membership}] <- Result]); + _ -> + false + end. -spec transfer_leadership(amqqueue:amqqueue(), node()) -> {migrated, node()} | {not_migrated, atom()}. transfer_leadership(Q, Destination) -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 2ae9f23d4060..dbdcb5511ddc 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -115,7 +115,8 @@ groups() -> node_removal_is_not_quorum_critical, select_nodes_with_least_replicas, select_nodes_with_least_replicas_node_down, - subscribe_from_each + subscribe_from_each, + grow_queue ]}, @@ -1790,6 +1791,116 @@ dont_leak_file_handles(Config) -> rabbit_ct_client_helpers:close_channel(C), ok. +grow_queue(Config) -> + [Server0, Server1, Server2, _Server3, _Server4] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + AQ = ?config(alt_queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-quorum-initial-group-size">>, long, 5}])), + ?assertEqual({'queue.declare_ok', AQ, 0, 0}, + declare(Ch, AQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-quorum-initial-group-size">>, long, 5}])), + + QQs = [QQ, AQ], + MsgCount = 3, + + [begin + RaName = ra_name(Q), + rabbit_ct_client_helpers:publish(Ch, Q, MsgCount), + wait_for_messages_ready([Server0], RaName, MsgCount), + {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(5, length(Nodes0)) + end || Q <- QQs], + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + + TargetClusterSize_1 = 1, + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% grow queues to node 'Server1' + TargetClusterSize_2 = 2, + Result1 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all]), + %% [{{resource,<<"/">>,queue,<<"grow_queue">>},{ok,2}}, + %% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{ok,2}},...] + ?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result1)), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to quorum cluster size '2' has no effect + Result2 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_2, <<"/">>, <<".*">>, all]), + ?assertEqual([], Result2), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to quorum cluster size '3' + TargetClusterSize_3 = 3, + Result3 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_3, <<"/">>, <<".*">>, all, voter]), + ?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result3)), + assert_grown_queues(QQs, Server0, TargetClusterSize_3, MsgCount), + + %% grow queues to quorum cluster size '5' + TargetClusterSize_5 = 5, + Result4 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all, voter]), + ?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result4)), + assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount), + + %% shrink all queues again down to 1 member + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% grow queues to quorum cluster size > '5' (limit = 5). + TargetClusterSize_10 = 10, + Result5 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_10, <<"/">>, <<".*">>, all]), + ?assert(lists:all(fun({_, {R, _}}) -> R =:= ok end, Result5)), + assert_grown_queues(QQs, Server0, TargetClusterSize_5, MsgCount), + + %% shrink all queues again down to 1 member + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% attempt to grow queues to quorum cluster size < '0'. + BadTargetClusterSize = -5, + ?assertEqual({error, bad_quorum_cluster_size}, + rpc:call(Server0, rabbit_quorum_queue, grow, [BadTargetClusterSize, <<"/">>, <<".*">>, all])), + + %% shrink all queues again down to 1 member + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_all_queues_shrink_member_to_current_member, []), + assert_grown_queues(QQs, Server0, TargetClusterSize_1, MsgCount), + + %% grow queues to node 'Server1': non_voter + rpc:call(Server0, rabbit_quorum_queue, grow, [Server1, <<"/">>, <<".*">>, all, non_voter]), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to node 'Server2': fail, non_voters found + Result6 = rpc:call(Server0, rabbit_quorum_queue, grow, [Server2, <<"/">>, <<".*">>, all, voter]), + %% [{{resource,<<"/">>,queue,<<"grow_queue">>},{error, 2, {error, non_voters_found}}, + %% {{resource,<<"/">>,queue,<<"grow_queue_alt">>},{error, 2, {error, non_voters_found}},...] + ?assert(lists:all( + fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result6)), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount), + + %% grow queues to target quorum cluster size '5': fail, non_voters found + Result7 = rpc:call(Server0, rabbit_quorum_queue, grow, [TargetClusterSize_5, <<"/">>, <<".*">>, all]), + ?assert(lists:all( + fun({_, Err}) -> Err =:= {error, TargetClusterSize_2, {error, non_voters_found}} end, Result7)), + assert_grown_queues(QQs, Server0, TargetClusterSize_2, MsgCount). + +assert_grown_queues(Qs, Node, TargetClusterSize, MsgCount) -> + [begin + RaName = ra_name(Q), + wait_for_messages_ready([Node], RaName, MsgCount), + {ok, Q0} = rpc:call(Node, rabbit_amqqueue, lookup, [Q, <<"/">>]), + #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + ?assertEqual(TargetClusterSize, length(Nodes0)) + end || Q <- Qs]. + gh_12635(Config) -> check_quorum_queues_v4_compat(Config), diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex new file mode 100644 index 000000000000..be78119e5e6f --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_to_count_command.ex @@ -0,0 +1,171 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.GrowToCountCommand do + alias RabbitMQ.CLI.Core.{DocGuide, Validators} + import RabbitMQ.CLI.Core.DataCoercion + + @behaviour RabbitMQ.CLI.CommandBehaviour + + defp default_opts, + do: %{vhost_pattern: ".*", queue_pattern: ".*", membership: "promotable", errors_only: false} + + def switches(), + do: [ + vhost_pattern: :string, + queue_pattern: :string, + membership: :string, + errors_only: :boolean + ] + + def merge_defaults(args, opts) do + {args, Map.merge(default_opts(), opts)} + end + + def validate(args, _) when length(args) < 2 do + {:validation_failure, :not_enough_args} + end + + def validate(args, _) when length(args) > 2 do + {:validation_failure, :too_many_args} + end + + def validate([_, s], _) + when not (s == "all" or + s == "even") do + {:validation_failure, "strategy '#{s}' is not recognised."} + end + + def validate([n, _], _) + when (is_integer(n) and n <= 0) do + {:validation_failure, "node count '#{n}' must be greater than 0."} + end + + def validate(_, %{membership: m}) + when not (m == "promotable" or + m == "non_voter" or + m == "voter") do + {:validation_failure, "voter status '#{m}' is not recognised."} + end + + def validate(_, _) do + :ok + end + + def validate_execution_environment(args, opts) do + Validators.rabbit_is_running(args, opts) + end + + def run([node_count, strategy], %{ + node: node_name, + vhost_pattern: vhost_pat, + queue_pattern: queue_pat, + membership: membership, + errors_only: errors_only + }) when is_integer(node_count) do + + args = [node_count, vhost_pat, queue_pat, to_atom(strategy)] + + args = + case to_atom(membership) do + :promotable -> args + other -> args ++ [other] + end + + case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, args) do + {:error, _} = error -> + error + + {:badrpc, _} = error -> + error + + results when errors_only -> + for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results, + do: [ + {:vhost, vhost}, + {:name, name}, + {:size, format_size(res)}, + {:result, format_result(res)} + ] + + results -> + for {{:resource, vhost, _kind, name}, res} <- results, + do: [ + {:vhost, vhost}, + {:name, name}, + {:size, format_size(res)}, + {:result, format_result(res)} + ] + end + end + + use RabbitMQ.CLI.DefaultOutput + + def formatter(), do: RabbitMQ.CLI.Formatters.Table + + def usage, + do: + "grow_to_count [--vhost-pattern ] [--queue-pattern ] [--membership ]" + + def usage_additional do + [ + ["", "number of nodes to place replicas on"], + [ + "", + "add a member for all matching queues or just those whose membership count is an even number" + ], + ["--queue-pattern ", "regular expression to match queue names"], + ["--vhost-pattern ", "regular expression to match virtual host names"], + ["--membership ", "add a promotable non-voter (default) or full voter"], + ["--errors-only", "only list queues which reported an error"] + ] + end + + def usage_doc_guides() do + [ + DocGuide.quorum_queues() + ] + end + + def help_section, do: :cluster_management + + def description, + do: + "Grows quorum queue clusters by adding member replicas on the specified number of nodes for all matching queues" + + def banner([node_count, strategy], _) do + "Growing #{strategy} quorum queues on #{node_count} nodes..." + end + + # + # Implementation + # + + defp format_size({:ok, size}) do + size + end + + defp format_size({:error, _size, :timeout}) do + # the actual size is uncertain here + "?" + end + + defp format_size({:error, size, _}) do + size + end + + defp format_result({:ok, _size}) do + "ok" + end + + defp format_result({:error, _size, :timeout}) do + "error: the operation timed out and may not have been completed" + end + + defp format_result({:error, _size, err}) do + to_string(:io_lib.format("error: ~W", [err, 10])) + end +end diff --git a/deps/rabbitmq_cli/test/queues/grow_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_command_test.exs index 2b1aab070317..b4b8ada8acb7 100644 --- a/deps/rabbitmq_cli/test/queues/grow_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/grow_command_test.exs @@ -44,51 +44,51 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do end test "validate: when one argument is provided, returns a failure" do - assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :not_enough_args} + assert @command.validate(["target@node"], %{}) == {:validation_failure, :not_enough_args} end test "validate: when a node and even are provided, returns a success" do - assert @command.validate(["quorum-queue-a", "even"], %{}) == :ok + assert @command.validate(["target@node", "even"], %{}) == :ok end test "validate: when a node and all are provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{}) == :ok + assert @command.validate(["target@node", "all"], %{}) == :ok end test "validate: when a node and something else is provided, returns a failure" do - assert @command.validate(["quorum-queue-a", "banana"], %{}) == + assert @command.validate(["target@node", "banana"], %{}) == {:validation_failure, "strategy 'banana' is not recognised."} end test "validate: when three arguments are provided, returns a failure" do - assert @command.validate(["quorum-queue-a", "extra-arg", "another-extra-arg"], %{}) == + assert @command.validate(["target@node", "extra-arg", "another-extra-arg"], %{}) == {:validation_failure, :too_many_args} end test "validate: when membership promotable is provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "promotable"}) == :ok + assert @command.validate(["target@node", "all"], %{membership: "promotable", queue_pattern: "qq.*"}) == :ok end test "validate: when membership voter is provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "voter"}) == :ok + assert @command.validate(["target@node", "all"], %{membership: "voter", queue_pattern: "qq.*"}) == :ok end test "validate: when membership non_voter is provided, returns a success" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "non_voter"}) == :ok + assert @command.validate(["target@node", "all"], %{membership: "non_voter", queue_pattern: "qq.*"}) == :ok end test "validate: when wrong membership is provided, returns failure" do - assert @command.validate(["quorum-queue-a", "all"], %{membership: "banana"}) == + assert @command.validate(["target@node", "all"], %{membership: "banana", queue_pattern: "qq.*"}) == {:validation_failure, "voter status 'banana' is not recognised."} end @tag test_timeout: 3000 - test "run: targeting an unreachable node throws a badrpc", context do + test "run: targeting an unreachable node throws a badrpc when growing to a target node", context do assert match?( {:badrpc, _}, @command.run( - ["quorum-queue-a", "all"], - Map.merge(context[:opts], %{node: :jake@thedog}) + ["target@node", "all"], + Map.merge(context[:opts], %{node: :jake@thedog, queue_pattern: "qq.*"}) ) ) end diff --git a/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs b/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs new file mode 100644 index 000000000000..861530f9a3c8 --- /dev/null +++ b/deps/rabbitmq_cli/test/queues/grow_to_count_command_test.exs @@ -0,0 +1,109 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.GrowToCountCommandTest do + use ExUnit.Case, async: false + import TestHelper + + @command RabbitMQ.CLI.Queues.Commands.GrowToCountCommand + + setup_all do + RabbitMQ.CLI.Core.Distribution.start() + + :ok + end + + setup context do + {:ok, + opts: %{ + node: get_rabbit_hostname(), + timeout: context[:test_timeout] || 30000, + vhost_pattern: ".*", + queue_pattern: ".*", + membership: "promotable", + errors_only: false + }} + end + + test "merge_defaults: defaults to reporting complete results" do + assert @command.merge_defaults([], %{}) == + {[], + %{ + vhost_pattern: ".*", + queue_pattern: ".*", + errors_only: false, + membership: "promotable" + }} + end + + test "validate: when no arguments are provided, returns a failure" do + assert @command.validate([], %{}) == {:validation_failure, :not_enough_args} + end + + test "validate: when one argument is provided, returns a failure" do + assert @command.validate([5], %{}) == {:validation_failure, :not_enough_args} + end + + test "validate: when node count and even are provided, returns a success" do + assert @command.validate([7, "even"], %{}) == :ok + end + + test "validate: when node count and all are provided, returns a success" do + assert @command.validate([5, "all"], %{}) == :ok + end + + test "validate: when node count and something else is provided, returns a failure" do + assert @command.validate([7, "banana"], %{}) == + {:validation_failure, "strategy 'banana' is not recognised."} + end + + test "validate: when three arguments are provided, returns a failure" do + assert @command.validate([7, "extra-arg", "another-extra-arg"], %{}) == + {:validation_failure, :too_many_args} + end + + test "validate: when membership promotable is provided, returns a success" do + assert @command.validate([5, "all"], %{membership: "promotable"}) == :ok + end + + test "validate: when membership voter is provided, returns a success" do + assert @command.validate([7, "all"], %{membership: "voter"}) == :ok + end + + test "validate: when membership non_voter is provided, returns a success" do + assert @command.validate([5, "all"], %{membership: "non_voter"}) == :ok + end + + test "validate: when wrong membership is provided, returns failure" do + assert @command.validate([7, "all"], %{membership: "banana"}) == + {:validation_failure, "voter status 'banana' is not recognised."} + end + + test "validate: when node count greater than zero, returns a success" do + assert @command.validate([7, "all"], %{membership: "voter"}) == :ok + end + + test "validate: when node count is zero, returns failure" do + assert @command.validate([0, "all"], %{membership: "voter"}) == + {:validation_failure, "node count '0' must be greater than 0."} + end + + test "validate: when node count is less than zero, returns failure" do + assert @command.validate([-1, "all"], %{membership: "voter"}) == + {:validation_failure, "node count '-1' must be greater than 0."} + end + + @tag test_timeout: 3000 + test "run: targeting an unreachable node throws a badrpc when growing to a node count", context do + assert match?( + {:badrpc, _}, + @command.run( + [5, "all"], + Map.merge(context[:opts], %{node: :jake@thedog}) + ) + ) + end +end