Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 53 additions & 48 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
-record(link,
{name :: link_name(),
ref :: link_ref(),
state = detached :: detached | attach_sent | attached | detach_sent,
state = detached :: detached | attach_sent | attached | attach_refused | detach_sent,
notify :: pid(),
output_handle :: output_handle(),
input_handle :: input_handle() | undefined,
Expand Down Expand Up @@ -325,9 +325,11 @@ mapped(cast, #'v1_0.end'{} = End, State) ->
ok = notify_session_ended(End, State),
{stop, normal, State};
mapped(cast, #'v1_0.attach'{name = {utf8, Name},
initial_delivery_count = IDC,
handle = {uint, InHandle},
role = PeerRoleBool,
source = Source,
target = Target,
initial_delivery_count = IDC,
max_message_size = MaybeMaxMessageSize} = Attach,
#state{links = Links, link_index = LinkIndex,
link_handle_index = LHI} = State0) ->
Expand All @@ -339,20 +341,28 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name},
#{OutHandle := Link0} = Links,
ok = notify_link_attached(Link0, Attach, State0),

{DeliveryCount, MaxMessageSize} =
{LinkState, DeliveryCount, MaxMessageSize} =
case Link0 of
#link{role = sender = OurRole,
delivery_count = DC} ->
LS = case Target of
#'v1_0.target'{} -> attached;
_ -> attach_refused
end,
MSS = case MaybeMaxMessageSize of
{ulong, S} when S > 0 -> S;
_ -> undefined
end,
{DC, MSS};
{LS, DC, MSS};
#link{role = receiver = OurRole,
max_message_size = MSS} ->
{unpack(IDC), MSS}
LS = case Source of
#'v1_0.source'{} -> attached;
_ -> attach_refused
end,
{LS, unpack(IDC), MSS}
end,
Link = Link0#link{state = attached,
Link = Link0#link{state = LinkState,
input_handle = InHandle,
delivery_count = DeliveryCount,
max_message_size = MaxMessageSize},
Expand Down Expand Up @@ -496,43 +506,31 @@ mapped({call, From},
{keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}};
mapped({call, From = {Pid, _}},
{transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
delivery_tag = {binary, DeliveryTag},
settled = false} = Transfer0, Sections},
#state{outgoing_delivery_id = DeliveryId, links = Links,
outgoing_unsettled = Unsettled} = State) ->
delivery_tag = DeliveryTag,
settled = Settled} = Transfer0, Sections},
#state{outgoing_delivery_id = DeliveryId,
links = Links,
outgoing_unsettled = Unsettled0} = State0) ->
case Links of
#{OutHandle := #link{state = attach_refused}} ->
{keep_state_and_data, {reply, From, {error, attach_refused}}};
#{OutHandle := #link{input_handle = undefined}} ->
{keep_state_and_data, {reply, From, {error, half_attached}}};
#{OutHandle := #link{link_credit = LC}} when LC =< 0 ->
{keep_state_and_data, {reply, From, {error, insufficient_credit}}};
#{OutHandle := Link = #link{max_message_size = MaxMessageSize,
footer_opt = FooterOpt}} ->
#{OutHandle := #link{max_message_size = MaxMessageSize,
footer_opt = FooterOpt} = Link} ->
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(DeliveryId)},
case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State) of
{ok, NumFrames} ->
State1 = State#state{outgoing_unsettled = Unsettled#{DeliveryId => {DeliveryTag, Pid}}},
{keep_state, book_transfer_send(NumFrames, Link, State1), {reply, From, ok}};
Error ->
{keep_state_and_data, {reply, From, Error}}
end;
_ ->
{keep_state_and_data, {reply, From, {error, link_not_found}}}

end;
mapped({call, From},
{transfer, #'v1_0.transfer'{handle = {uint, OutHandle}} = Transfer0,
Sections}, #state{outgoing_delivery_id = DeliveryId,
links = Links} = State) ->
case Links of
#{OutHandle := #link{input_handle = undefined}} ->
{keep_state_and_data, {reply, From, {error, half_attached}}};
#{OutHandle := #link{link_credit = LC}} when LC =< 0 ->
{keep_state_and_data, {reply, From, {error, insufficient_credit}}};
#{OutHandle := Link = #link{max_message_size = MaxMessageSize,
footer_opt = FooterOpt}} ->
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(DeliveryId)},
case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State) of
case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State0) of
{ok, NumFrames} ->
State = case Settled of
true ->
State0;
false ->
{binary, Tag} = DeliveryTag,
Unsettled = Unsettled0#{DeliveryId => {Tag, Pid}},
State0#state{outgoing_unsettled = Unsettled}
end,
{keep_state, book_transfer_send(NumFrames, Link, State), {reply, From, ok}};
Error ->
{keep_state_and_data, {reply, From, Error}}
Expand Down Expand Up @@ -688,21 +686,28 @@ send_flow_link(OutHandle,
never -> never;
_ -> {RenewWhenBelow, Credit}
end,
#{OutHandle := #link{output_handle = H,
#{OutHandle := #link{state = LinkState,
output_handle = H,
role = receiver,
delivery_count = DeliveryCount,
available = Available} = Link} = Links,
Flow1 = Flow0#'v1_0.flow'{
handle = uint(H),
%% "In the event that the receiving link endpoint has not yet seen the
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
delivery_count = maybe_uint(DeliveryCount),
available = uint(Available)},
Flow = set_flow_session_fields(Flow1, State),
ok = send(Flow, State),
State#state{links = Links#{OutHandle =>
Link#link{link_credit = Credit,
auto_flow = AutoFlow}}}.
case LinkState of
attach_refused ->
%% We will receive the DETACH frame shortly.
State;
_ ->
Flow1 = Flow0#'v1_0.flow'{
handle = uint(H),
%% "In the event that the receiving link endpoint has not yet seen the
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
delivery_count = maybe_uint(DeliveryCount),
available = uint(Available)},
Flow = set_flow_session_fields(Flow1, State),
ok = send(Flow, State),
State#state{links = Links#{OutHandle =>
Link#link{link_credit = Credit,
auto_flow = AutoFlow}}}
end.

send_flow_session(State) ->
Flow = set_flow_session_fields(#'v1_0.flow'{}, State),
Expand Down
55 changes: 52 additions & 3 deletions deps/amqp10_client/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ groups() ->
]},
{mock, [], [
insufficient_credit,
attach_refused,
incoming_heartbeat,
multi_transfer_without_delivery_id
]}
Expand Down Expand Up @@ -772,11 +773,13 @@ insufficient_credit(Config) ->
outgoing_window = {uint, 1000}}
]}
end,
AttachStep = fun({0 = Ch, #'v1_0.attach'{role = false,
name = Name}, <<>>}) ->
AttachStep = fun({0 = Ch, #'v1_0.attach'{name = Name,
role = false,
target = Target}, <<>>}) ->
{Ch, [#'v1_0.attach'{name = Name,
handle = {uint, 99},
role = true}]}
role = true,
target = Target}]}
end,
Steps = [fun mock_server:recv_amqp_header_step/1,
fun mock_server:send_amqp_header_step/1,
Expand All @@ -799,6 +802,52 @@ insufficient_credit(Config) ->
ok = amqp10_client:close_connection(Connection),
ok.

attach_refused(Config) ->
Hostname = ?config(mock_host, Config),
Port = ?config(mock_port, Config),
OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
{Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]}
end,
BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) ->
{Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch},
next_outgoing_id = {uint, 1},
incoming_window = {uint, 1000},
outgoing_window = {uint, 1000}}
]}
end,
AttachStep = fun({0 = Ch, #'v1_0.attach'{name = Name,
role = false}, <<>>}) ->
%% We test only the 1st stage of link refusal:
%% Server replies with its local terminus set to null.
%% We omit the 2nd stage (the detach frame).
{Ch, [#'v1_0.attach'{name = Name,
handle = {uint, 99},
role = true,
target = undefined}]}
end,
Steps = [fun mock_server:recv_amqp_header_step/1,
fun mock_server:send_amqp_header_step/1,
mock_server:amqp_step(OpenStep),
mock_server:amqp_step(BeginStep),
mock_server:amqp_step(AttachStep)],

ok = mock_server:set_steps(?config(mock_server, Config), Steps),

Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()},
{ok, Connection} = amqp10_client:open_connection(Cfg),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"mock1-sender">>,
<<"test">>),
await_link(Sender, attached, attached_timeout),
Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true),
%% We expect that the lib prevents the app from sending messages
%% in this intermediate link refusal state.
?assertEqual({error, attach_refused},
amqp10_client:send_msg(Sender, Msg)),

ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection).

multi_transfer_without_delivery_id(Config) ->
Hostname = ?config(mock_host, Config),
Port = ?config(mock_port, Config),
Expand Down
Loading
Loading