@@ -1927,21 +1927,17 @@ handle_frame_post_auth(Transport, {ok, #stream_connection{user = User} = C}, Sta
1927
1927
{C , State };
1928
1928
handle_frame_post_auth (Transport ,
1929
1929
{ok , # stream_connection {
1930
- name = ConnName ,
1931
- socket = Socket ,
1932
1930
stream_subscriptions = StreamSubscriptions ,
1933
1931
virtual_host = VirtualHost ,
1934
- user = User ,
1935
- send_file_oct = SendFileOct ,
1936
- transport = ConnTransport } = Connection },
1937
- # stream_connection_state {consumers = Consumers } = State ,
1932
+ user = User } = Connection },
1933
+ State ,
1938
1934
{request , CorrelationId ,
1939
1935
{subscribe ,
1940
1936
SubscriptionId ,
1941
1937
Stream ,
1942
1938
OffsetSpec ,
1943
- Credit ,
1944
- Properties }}) ->
1939
+ _Credit ,
1940
+ Properties }} = Request ) ->
1945
1941
QueueResource =
1946
1942
# resource {name = Stream ,
1947
1943
kind = queue ,
@@ -2004,89 +2000,9 @@ handle_frame_post_auth(Transport,
2004
2000
increase_protocol_counter (? PRECONDITION_FAILED ),
2005
2001
{Connection , State };
2006
2002
_ ->
2007
- Log = case Sac of
2008
- true ->
2009
- undefined ;
2010
- false ->
2011
- init_reader (ConnTransport ,
2012
- LocalMemberPid ,
2013
- QueueResource ,
2014
- SubscriptionId ,
2015
- Properties ,
2016
- OffsetSpec )
2017
- end ,
2018
-
2019
- ConsumerCounters =
2020
- atomics :new (2 , [{signed , false }]),
2021
-
2022
- response_ok (Transport ,
2023
- Connection ,
2024
- subscribe ,
2025
- CorrelationId ),
2026
-
2027
- Active =
2028
- maybe_register_consumer (VirtualHost ,
2029
- Stream ,
2030
- ConsumerName ,
2031
- ConnName ,
2032
- SubscriptionId ,
2033
- Properties ,
2034
- Sac ),
2035
-
2036
- ConsumerConfiguration =
2037
- # consumer_configuration {member_pid =
2038
- LocalMemberPid ,
2039
- subscription_id
2040
- =
2041
- SubscriptionId ,
2042
- socket = Socket ,
2043
- stream = Stream ,
2044
- offset =
2045
- OffsetSpec ,
2046
- counters =
2047
- ConsumerCounters ,
2048
- properties =
2049
- Properties ,
2050
- active =
2051
- Active },
2052
- SendLimit = Credit div 2 ,
2053
- ConsumerState =
2054
- # consumer {configuration =
2055
- ConsumerConfiguration ,
2056
- log = Log ,
2057
- send_limit = SendLimit ,
2058
- credit = Credit },
2059
-
2060
- Connection1 =
2061
- maybe_monitor_stream (LocalMemberPid ,
2062
- Stream ,
2063
- Connection ),
2064
-
2065
- State1 =
2066
- maybe_dispatch_on_subscription (Transport ,
2067
- State ,
2068
- ConsumerState ,
2069
- Connection1 ,
2070
- Consumers ,
2071
- Stream ,
2072
- SubscriptionId ,
2073
- Properties ,
2074
- SendFileOct ,
2075
- Sac ),
2076
- StreamSubscriptions1 =
2077
- case StreamSubscriptions of
2078
- #{Stream := SubscriptionIds } ->
2079
- StreamSubscriptions #{Stream =>
2080
- [SubscriptionId ]
2081
- ++ SubscriptionIds };
2082
- _ ->
2083
- StreamSubscriptions #{Stream =>
2084
- [SubscriptionId ]}
2085
- end ,
2086
- {Connection1 # stream_connection {stream_subscriptions
2087
- =
2088
- StreamSubscriptions1 },
2089
- State1 }
2003
+ handle_subscription (Transport , Connection ,
2004
+ State , Request ,
2005
+ LocalMemberPid )
2090
2006
end
2091
2007
end
2092
2008
end ;
@@ -2995,8 +2911,106 @@ maybe_dispatch_on_subscription(_Transport,
2995
2911
Consumers1 = Consumers #{SubscriptionId => ConsumerState },
2996
2912
State # stream_connection_state {consumers = Consumers1 }.
2997
2913
2914
+ handle_subscription (Transport ,# stream_connection {
2915
+ name = ConnName ,
2916
+ socket = Socket ,
2917
+ stream_subscriptions = StreamSubscriptions ,
2918
+ virtual_host = VirtualHost ,
2919
+ send_file_oct = SendFileOct ,
2920
+ transport = ConnTransport } = Connection ,
2921
+ # stream_connection_state {consumers = Consumers } = State ,
2922
+ {request , CorrelationId , {subscribe ,
2923
+ SubscriptionId ,
2924
+ Stream ,
2925
+ OffsetSpec ,
2926
+ Credit ,
2927
+ Properties }},
2928
+ LocalMemberPid ) ->
2929
+ Sac = single_active_consumer (Properties ),
2930
+ ConsumerName = consumer_name (Properties ),
2931
+ QueueResource = # resource {name = Stream ,
2932
+ kind = queue ,
2933
+ virtual_host = VirtualHost },
2934
+ case maybe_register_consumer (VirtualHost , Stream , ConsumerName , ConnName ,
2935
+ SubscriptionId , Properties , Sac ) of
2936
+ {ok , Active } ->
2937
+ Log = case Sac of
2938
+ true ->
2939
+ undefined ;
2940
+ false ->
2941
+ init_reader (ConnTransport ,
2942
+ LocalMemberPid ,
2943
+ QueueResource ,
2944
+ SubscriptionId ,
2945
+ Properties ,
2946
+ OffsetSpec )
2947
+ end ,
2948
+
2949
+ ConsumerCounters = atomics :new (2 , [{signed , false }]),
2950
+
2951
+ response_ok (Transport ,
2952
+ Connection ,
2953
+ subscribe ,
2954
+ CorrelationId ),
2955
+
2956
+ ConsumerConfiguration = # consumer_configuration {
2957
+ member_pid = LocalMemberPid ,
2958
+ subscription_id = SubscriptionId ,
2959
+ socket = Socket ,
2960
+ stream = Stream ,
2961
+ offset = OffsetSpec ,
2962
+ counters = ConsumerCounters ,
2963
+ properties = Properties ,
2964
+ active = Active },
2965
+ SendLimit = Credit div 2 ,
2966
+ ConsumerState =
2967
+ # consumer {configuration = ConsumerConfiguration ,
2968
+ log = Log ,
2969
+ send_limit = SendLimit ,
2970
+ credit = Credit },
2971
+
2972
+ Connection1 = maybe_monitor_stream (LocalMemberPid ,
2973
+ Stream ,
2974
+ Connection ),
2975
+
2976
+ State1 = maybe_dispatch_on_subscription (Transport ,
2977
+ State ,
2978
+ ConsumerState ,
2979
+ Connection1 ,
2980
+ Consumers ,
2981
+ Stream ,
2982
+ SubscriptionId ,
2983
+ Properties ,
2984
+ SendFileOct ,
2985
+ Sac ),
2986
+ StreamSubscriptions1 =
2987
+ case StreamSubscriptions of
2988
+ #{Stream := SubscriptionIds } ->
2989
+ StreamSubscriptions #{Stream =>
2990
+ [SubscriptionId ]
2991
+ ++ SubscriptionIds };
2992
+ _ ->
2993
+ StreamSubscriptions #{Stream =>
2994
+ [SubscriptionId ]}
2995
+ end ,
2996
+ {Connection1 # stream_connection {stream_subscriptions
2997
+ =
2998
+ StreamSubscriptions1 },
2999
+ State1 };
3000
+ {error , Reason } ->
3001
+ rabbit_log :warning (" Cannot create SAC subcription ~tp : ~tp " ,
3002
+ [SubscriptionId , Reason ]),
3003
+ response (Transport ,
3004
+ Connection ,
3005
+ subscribe ,
3006
+ CorrelationId ,
3007
+ ? RESPONSE_CODE_PRECONDITION_FAILED ),
3008
+ increase_protocol_counter (? PRECONDITION_FAILED ),
3009
+ {Connection , State }
3010
+ end .
3011
+
2998
3012
maybe_register_consumer (_ , _ , _ , _ , _ , _ , false = _Sac ) ->
2999
- true ;
3013
+ { ok , true } ;
3000
3014
maybe_register_consumer (VirtualHost ,
3001
3015
Stream ,
3002
3016
ConsumerName ,
@@ -3005,15 +3019,13 @@ maybe_register_consumer(VirtualHost,
3005
3019
Properties ,
3006
3020
true ) ->
3007
3021
PartitionIndex = partition_index (VirtualHost , Stream , Properties ),
3008
- {ok , Active } =
3009
- rabbit_stream_sac_coordinator :register_consumer (VirtualHost ,
3010
- Stream ,
3011
- PartitionIndex ,
3012
- ConsumerName ,
3013
- self (),
3014
- ConnectionName ,
3015
- SubscriptionId ),
3016
- Active .
3022
+ rabbit_stream_sac_coordinator :register_consumer (VirtualHost ,
3023
+ Stream ,
3024
+ PartitionIndex ,
3025
+ ConsumerName ,
3026
+ self (),
3027
+ ConnectionName ,
3028
+ SubscriptionId ).
3017
3029
3018
3030
maybe_send_consumer_update (Transport ,
3019
3031
Connection = # stream_connection {
0 commit comments