381381 stashed_eol = [] :: [rabbit_amqqueue :name ()],
382382
383383 queue_states = rabbit_queue_type :init () :: rabbit_queue_type :state (),
384+ queue_types_published = sets :new ([{version , 2 }]) ::
385+ sets :set (rabbit_queue_type :queue_type ()),
384386 permission_cache = [] :: permission_cache (),
385387 topic_permission_cache = [] :: topic_permission_cache ()
386388 }).
@@ -582,36 +584,17 @@ handle_cast({queue_event, _, _} = QEvent, State0) ->
582584 log_error_and_close_session (Error , State0 )
583585 end ;
584586handle_cast ({conserve_resources , Alarm , Conserve },
585- # state {incoming_window = IncomingWindow0 ,
586- cfg = # cfg {resource_alarms = Alarms0 ,
587- incoming_window_margin = Margin0 ,
587+ # state {cfg = # cfg {resource_alarms = Alarms0 ,
588588 writer_pid = WriterPid ,
589- channel_num = Ch ,
590- max_incoming_window = MaxIncomingWindow
589+ channel_num = Ch
591590 } = Cfg
592591 } = State0 ) ->
593592 Alarms = case Conserve of
594593 true -> sets :add_element (Alarm , Alarms0 );
595594 false -> sets :del_element (Alarm , Alarms0 )
596595 end ,
597- {SendFlow , IncomingWindow , Margin } =
598- case {sets :is_empty (Alarms0 ), sets :is_empty (Alarms )} of
599- {true , false } ->
600- % % Alarm kicked in.
601- % % Notify the client to not send us any more TRANSFERs. Since we decrase
602- % % our incoming window dynamically, there might be incoming in-flight
603- % % TRANSFERs. So, let's be lax and allow for some excess TRANSFERs.
604- {true , 0 , MaxIncomingWindow };
605- {false , true } ->
606- % % All alarms cleared.
607- % % Notify the client that it can resume sending us TRANSFERs.
608- {true , MaxIncomingWindow , 0 };
609- _ ->
610- {false , IncomingWindow0 , Margin0 }
611- end ,
612- State = State0 # state {incoming_window = IncomingWindow ,
613- cfg = Cfg # cfg {resource_alarms = Alarms ,
614- incoming_window_margin = Margin }},
596+ State1 = State0 # state {cfg = Cfg # cfg {resource_alarms = Alarms }},
597+ {SendFlow , State } = check_resource_alarm (State0 , State1 ),
615598 case SendFlow of
616599 true ->
617600 Flow = session_flow_fields (# 'v1_0.flow' {}, State ),
@@ -637,6 +620,41 @@ handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) ->
637620handle_cast (shutdown , State ) ->
638621 {stop , normal , State }.
639622
623+ is_in_resource_alarm (# state {cfg = # cfg {resource_alarms = Alarms },
624+ queue_types_published = QTs }) ->
625+ sets :fold (
626+ fun ({disk , QT }, Acc ) ->
627+ Acc orelse sets :is_element (QT , QTs );
628+ (_ , _ ) ->
629+ true
630+ end , false , Alarms ).
631+
632+ check_resource_alarm (State0 ,
633+ # state {incoming_window = IncomingWindow0 ,
634+ cfg = # cfg {incoming_window_margin = Margin0 ,
635+ max_incoming_window = MaxIncomingWindow
636+ } = Cfg } = State1 ) ->
637+ WasBlocked = is_in_resource_alarm (State0 ),
638+ IsBlocked = is_in_resource_alarm (State1 ),
639+ {SendFlow , IncomingWindow , Margin } =
640+ case IsBlocked of
641+ true when not WasBlocked ->
642+ % % Alarm kicked in.
643+ % % Notify the client to not send us any more TRANSFERs. Since we decrase
644+ % % our incoming window dynamically, there might be incoming in-flight
645+ % % TRANSFERs. So, let's be lax and allow for some excess TRANSFERs.
646+ {true , 0 , MaxIncomingWindow };
647+ false when WasBlocked ->
648+ % % All alarms cleared.
649+ % % Notify the client that it can resume sending us TRANSFERs.
650+ {true , MaxIncomingWindow , 0 };
651+ _ ->
652+ {false , IncomingWindow0 , Margin0 }
653+ end ,
654+ State = State1 # state {incoming_window = IncomingWindow ,
655+ cfg = Cfg # cfg {incoming_window_margin = Margin }},
656+ {SendFlow , State }.
657+
640658log_error_and_close_session (
641659 Error , State = # state {cfg = # cfg {reader_pid = ReaderPid ,
642660 writer_pid = WriterPid ,
@@ -1940,7 +1958,6 @@ session_flow_control_received_transfer(
19401958 incoming_window = InWindow0 ,
19411959 remote_outgoing_window = RemoteOutgoingWindow ,
19421960 cfg = # cfg {incoming_window_margin = Margin ,
1943- resource_alarms = Alarms ,
19441961 max_incoming_window = MaxIncomingWindow }
19451962 } = State ) ->
19461963 InWindow1 = InWindow0 - 1 ,
@@ -1954,7 +1971,7 @@ session_flow_control_received_transfer(
19541971 ok
19551972 end ,
19561973 {Flows , InWindow } = case InWindow1 =< (MaxIncomingWindow div 2 ) andalso
1957- sets : is_empty ( Alarms ) of
1974+ not is_in_resource_alarm ( State ) of
19581975 true ->
19591976 % % We've reached halfway and there are no
19601977 % % disk or memory alarm, open the window.
@@ -2371,6 +2388,7 @@ incoming_link_transfer(
23712388 multi_transfer_msg = MultiTransfer
23722389 } = Link0 ,
23732390 State0 = # state {queue_states = QStates0 ,
2391+ queue_types_published = QTs0 ,
23742392 permission_cache = PermCache0 ,
23752393 topic_permission_cache = TopicPermCache0 ,
23762394 cfg = # cfg {user = User = # user {username = Username },
@@ -2414,19 +2432,26 @@ incoming_link_transfer(
24142432 Qs = rabbit_amqqueue :prepend_extra_bcc (Qs0 ),
24152433 case rabbit_queue_type :deliver (Qs , Mc , Opts , QStates0 ) of
24162434 {ok , QStates , Actions } ->
2435+ QTs1 = sets :from_list (rabbit_amqqueue :queue_types (Qs ),
2436+ [{version , 2 }]),
2437+ QTs = sets :union (QTs0 , QTs1 ),
24172438 State1 = State0 # state {queue_states = QStates ,
2439+ queue_types_published = QTs ,
24182440 permission_cache = PermCache ,
24192441 topic_permission_cache = TopicPermCache },
24202442 % % Confirms must be registered before processing actions
24212443 % % because actions may contain rejections of publishes.
24222444 {U , Reply0 } = process_routing_confirm (
24232445 Qs , Settled , DeliveryId , U0 ),
2424- State = handle_queue_actions (Actions , State1 ),
2446+ State2 = handle_queue_actions (Actions , State1 ),
2447+ {SendAlarmFlow , State } = check_resource_alarm (
2448+ State0 , State2 ),
24252449 DeliveryCount = add (DeliveryCount0 , 1 ),
24262450 Credit1 = Credit0 - 1 ,
24272451 {Credit , Reply1 } = maybe_grant_link_credit (
24282452 Credit1 , MaxLinkCredit ,
2429- DeliveryCount , map_size (U ), Handle ),
2453+ DeliveryCount , map_size (U ), Handle ,
2454+ SendAlarmFlow ),
24302455 Reply = Reply0 ++ Reply1 ,
24312456 Link = Link0 # incoming_link {
24322457 delivery_count = DeliveryCount ,
@@ -2461,7 +2486,8 @@ incoming_link_transfer(
24612486 Credit1 = Credit0 - 1 ,
24622487 {Credit , Reply0 } = maybe_grant_link_credit (
24632488 Credit1 , MaxLinkCredit ,
2464- DeliveryCount , map_size (U0 ), Handle ),
2489+ DeliveryCount , map_size (U0 ), Handle ,
2490+ false ),
24652491 Reply = [Disposition | Reply0 ],
24662492 Link = Link0 # incoming_link {
24672493 delivery_count = DeliveryCount ,
@@ -2574,8 +2600,9 @@ rejected(DeliveryId, Error) ->
25742600 settled = true ,
25752601 state = # 'v1_0.rejected' {error = Error }}.
25762602
2577- maybe_grant_link_credit (Credit , MaxLinkCredit , DeliveryCount , NumUnconfirmed , Handle ) ->
2578- case grant_link_credit (Credit , MaxLinkCredit , NumUnconfirmed ) of
2603+ maybe_grant_link_credit (Credit , MaxLinkCredit , DeliveryCount , NumUnconfirmed ,
2604+ Handle , AlarmFlow ) ->
2605+ case grant_link_credit (Credit , MaxLinkCredit , NumUnconfirmed ) orelse AlarmFlow of
25792606 true ->
25802607 {MaxLinkCredit , [flow (Handle , DeliveryCount , MaxLinkCredit )]};
25812608 false ->
0 commit comments