127
127
-record (link ,
128
128
{name :: link_name (),
129
129
ref :: link_ref (),
130
- state = detached :: detached | attach_sent | attached | detach_sent ,
130
+ state = detached :: detached | attach_sent | attached | attach_refused | detach_sent ,
131
131
notify :: pid (),
132
132
output_handle :: output_handle (),
133
133
input_handle :: input_handle () | undefined ,
@@ -325,9 +325,11 @@ mapped(cast, #'v1_0.end'{} = End, State) ->
325
325
ok = notify_session_ended (End , State ),
326
326
{stop , normal , State };
327
327
mapped (cast , # 'v1_0.attach' {name = {utf8 , Name },
328
- initial_delivery_count = IDC ,
329
328
handle = {uint , InHandle },
330
329
role = PeerRoleBool ,
330
+ source = Source ,
331
+ target = Target ,
332
+ initial_delivery_count = IDC ,
331
333
max_message_size = MaybeMaxMessageSize } = Attach ,
332
334
# state {links = Links , link_index = LinkIndex ,
333
335
link_handle_index = LHI } = State0 ) ->
@@ -339,20 +341,28 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name},
339
341
#{OutHandle := Link0 } = Links ,
340
342
ok = notify_link_attached (Link0 , Attach , State0 ),
341
343
342
- {DeliveryCount , MaxMessageSize } =
344
+ {LinkState , DeliveryCount , MaxMessageSize } =
343
345
case Link0 of
344
346
# link {role = sender = OurRole ,
345
347
delivery_count = DC } ->
348
+ LS = case Target of
349
+ # 'v1_0.target' {} -> attached ;
350
+ _ -> attach_refused
351
+ end ,
346
352
MSS = case MaybeMaxMessageSize of
347
353
{ulong , S } when S > 0 -> S ;
348
354
_ -> undefined
349
355
end ,
350
- {DC , MSS };
356
+ {LS , DC , MSS };
351
357
# link {role = receiver = OurRole ,
352
358
max_message_size = MSS } ->
353
- {unpack (IDC ), MSS }
359
+ LS = case Source of
360
+ # 'v1_0.source' {} -> attached ;
361
+ _ -> attach_refused
362
+ end ,
363
+ {LS , unpack (IDC ), MSS }
354
364
end ,
355
- Link = Link0 # link {state = attached ,
365
+ Link = Link0 # link {state = LinkState ,
356
366
input_handle = InHandle ,
357
367
delivery_count = DeliveryCount ,
358
368
max_message_size = MaxMessageSize },
@@ -496,43 +506,31 @@ mapped({call, From},
496
506
{keep_state_and_data , {reply , From , {error , remote_incoming_window_exceeded }}};
497
507
mapped ({call , From = {Pid , _ }},
498
508
{transfer , # 'v1_0.transfer' {handle = {uint , OutHandle },
499
- delivery_tag = {binary , DeliveryTag },
500
- settled = false } = Transfer0 , Sections },
501
- # state {outgoing_delivery_id = DeliveryId , links = Links ,
502
- outgoing_unsettled = Unsettled } = State ) ->
509
+ delivery_tag = DeliveryTag ,
510
+ settled = Settled } = Transfer0 , Sections },
511
+ # state {outgoing_delivery_id = DeliveryId ,
512
+ links = Links ,
513
+ outgoing_unsettled = Unsettled0 } = State0 ) ->
503
514
case Links of
515
+ #{OutHandle := # link {state = attach_refused }} ->
516
+ {keep_state_and_data , {reply , From , {error , attach_refused }}};
504
517
#{OutHandle := # link {input_handle = undefined }} ->
505
518
{keep_state_and_data , {reply , From , {error , half_attached }}};
506
519
#{OutHandle := # link {link_credit = LC }} when LC =< 0 ->
507
520
{keep_state_and_data , {reply , From , {error , insufficient_credit }}};
508
- #{OutHandle := Link = # link {max_message_size = MaxMessageSize ,
509
- footer_opt = FooterOpt }} ->
521
+ #{OutHandle := # link {max_message_size = MaxMessageSize ,
522
+ footer_opt = FooterOpt } = Link } ->
510
523
Transfer = Transfer0 # 'v1_0.transfer' {delivery_id = uint (DeliveryId )},
511
- case send_transfer (Transfer , Sections , FooterOpt , MaxMessageSize , State ) of
512
- {ok , NumFrames } ->
513
- State1 = State # state {outgoing_unsettled = Unsettled #{DeliveryId => {DeliveryTag , Pid }}},
514
- {keep_state , book_transfer_send (NumFrames , Link , State1 ), {reply , From , ok }};
515
- Error ->
516
- {keep_state_and_data , {reply , From , Error }}
517
- end ;
518
- _ ->
519
- {keep_state_and_data , {reply , From , {error , link_not_found }}}
520
-
521
- end ;
522
- mapped ({call , From },
523
- {transfer , # 'v1_0.transfer' {handle = {uint , OutHandle }} = Transfer0 ,
524
- Sections }, # state {outgoing_delivery_id = DeliveryId ,
525
- links = Links } = State ) ->
526
- case Links of
527
- #{OutHandle := # link {input_handle = undefined }} ->
528
- {keep_state_and_data , {reply , From , {error , half_attached }}};
529
- #{OutHandle := # link {link_credit = LC }} when LC =< 0 ->
530
- {keep_state_and_data , {reply , From , {error , insufficient_credit }}};
531
- #{OutHandle := Link = # link {max_message_size = MaxMessageSize ,
532
- footer_opt = FooterOpt }} ->
533
- Transfer = Transfer0 # 'v1_0.transfer' {delivery_id = uint (DeliveryId )},
534
- case send_transfer (Transfer , Sections , FooterOpt , MaxMessageSize , State ) of
524
+ case send_transfer (Transfer , Sections , FooterOpt , MaxMessageSize , State0 ) of
535
525
{ok , NumFrames } ->
526
+ State = case Settled of
527
+ true ->
528
+ State0 ;
529
+ false ->
530
+ {binary , Tag } = DeliveryTag ,
531
+ Unsettled = Unsettled0 #{DeliveryId => {Tag , Pid }},
532
+ State0 # state {outgoing_unsettled = Unsettled }
533
+ end ,
536
534
{keep_state , book_transfer_send (NumFrames , Link , State ), {reply , From , ok }};
537
535
Error ->
538
536
{keep_state_and_data , {reply , From , Error }}
@@ -688,21 +686,28 @@ send_flow_link(OutHandle,
688
686
never -> never ;
689
687
_ -> {RenewWhenBelow , Credit }
690
688
end ,
691
- #{OutHandle := # link {output_handle = H ,
689
+ #{OutHandle := # link {state = LinkState ,
690
+ output_handle = H ,
692
691
role = receiver ,
693
692
delivery_count = DeliveryCount ,
694
693
available = Available } = Link } = Links ,
695
- Flow1 = Flow0 # 'v1_0.flow' {
696
- handle = uint (H ),
697
- % % "In the event that the receiving link endpoint has not yet seen the
698
- % % initial attach frame from the sender this field MUST NOT be set." [2.7.4]
699
- delivery_count = maybe_uint (DeliveryCount ),
700
- available = uint (Available )},
701
- Flow = set_flow_session_fields (Flow1 , State ),
702
- ok = send (Flow , State ),
703
- State # state {links = Links #{OutHandle =>
704
- Link # link {link_credit = Credit ,
705
- auto_flow = AutoFlow }}}.
694
+ case LinkState of
695
+ attach_refused ->
696
+ % % We will receive the DETACH frame shortly.
697
+ State ;
698
+ _ ->
699
+ Flow1 = Flow0 # 'v1_0.flow' {
700
+ handle = uint (H ),
701
+ % % "In the event that the receiving link endpoint has not yet seen the
702
+ % % initial attach frame from the sender this field MUST NOT be set." [2.7.4]
703
+ delivery_count = maybe_uint (DeliveryCount ),
704
+ available = uint (Available )},
705
+ Flow = set_flow_session_fields (Flow1 , State ),
706
+ ok = send (Flow , State ),
707
+ State # state {links = Links #{OutHandle =>
708
+ Link # link {link_credit = Credit ,
709
+ auto_flow = AutoFlow }}}
710
+ end .
706
711
707
712
send_flow_session (State ) ->
708
713
Flow = set_flow_session_fields (# 'v1_0.flow' {}, State ),
0 commit comments