@@ -31,8 +31,8 @@ defmodule RealtimeWeb.RealtimeChannel do
31
31
@ confirm_token_ms_interval :timer . minutes ( 5 )
32
32
33
33
@ impl true
34
- def join ( "realtime:" , _params , _socket ) do
35
- Logging . log_error_message ( :error , "TopicNameRequired" , "You must provide a topic name" )
34
+ def join ( "realtime:" , _params , socket ) do
35
+ Logging . log_error ( socket , "TopicNameRequired" , "You must provide a topic name" )
36
36
end
37
37
38
38
def join ( "realtime:" <> sub_topic = topic , params , socket ) do
@@ -55,22 +55,21 @@ defmodule RealtimeWeb.RealtimeChannel do
55
55
|> assign ( :private? , ! ! params [ "config" ] [ "private" ] )
56
56
|> assign ( :policies , nil )
57
57
58
- token = socket . assigns . access_token
59
- log_level = socket . assigns . log_level
60
-
61
58
with :ok <- SignalHandler . shutdown_in_progress? ( ) ,
62
59
:ok <- only_private? ( tenant_id , socket ) ,
63
- :ok <- limit_joins ( socket . assigns ) ,
60
+ :ok <- limit_joins ( socket ) ,
64
61
:ok <- limit_channels ( socket ) ,
65
- :ok <- limit_max_users ( socket . assigns ) ,
62
+ :ok <- limit_max_users ( socket ) ,
66
63
{ :ok , claims , confirm_token_ref , access_token , _ } <- confirm_token ( socket ) ,
67
64
socket = assign_authorization_context ( socket , sub_topic , access_token , claims ) ,
68
65
{ :ok , db_conn } <- Connect . lookup_or_start_connection ( tenant_id ) ,
69
66
{ :ok , socket } <- maybe_assign_policies ( sub_topic , db_conn , socket ) do
70
67
tenant_topic = Tenants . tenant_topic ( tenant_id , sub_topic , ! socket . assigns . private? )
71
68
72
69
# fastlane subscription
73
- metadata = MessageDispatcher . fastlane_metadata ( transport_pid , serializer , topic , log_level , tenant_id )
70
+ metadata =
71
+ MessageDispatcher . fastlane_metadata ( transport_pid , serializer , topic , socket . assigns . log_level , tenant_id )
72
+
74
73
RealtimeWeb.Endpoint . subscribe ( tenant_topic , metadata: metadata )
75
74
76
75
Phoenix.PubSub . subscribe ( Realtime.PubSub , "realtime:operations:" <> tenant_id )
@@ -121,109 +120,77 @@ defmodule RealtimeWeb.RealtimeChannel do
121
120
{ :ok , state , assign ( socket , assigns ) }
122
121
else
123
122
{ :error , :expired_token , msg } ->
124
- Logging . maybe_log_warning_with_token_metadata ( "InvalidJWTToken" , msg , token , log_level )
123
+ Logging . maybe_log_warning ( socket , "InvalidJWTToken" , msg )
125
124
126
125
{ :error , :missing_claims } ->
127
126
msg = "Fields `role` and `exp` are required in JWT"
128
- Logging . maybe_log_warning_with_token_metadata ( "InvalidJWTToken" , msg , token , log_level )
127
+ Logging . maybe_log_warning ( socket , "InvalidJWTToken" , msg )
129
128
130
129
{ :error , :unauthorized , msg } ->
131
- Logging . log_error_message ( :error , "Unauthorized" , msg )
130
+ Logging . log_error ( socket , "Unauthorized" , msg )
132
131
133
132
{ :error , :too_many_channels } ->
134
133
msg = "Too many channels"
135
- Logging . log_error_message ( :error , "ChannelRateLimitReached" , msg )
134
+ Logging . log_error ( socket , "ChannelRateLimitReached" , msg )
136
135
137
136
{ :error , :too_many_connections } ->
138
137
msg = "Too many connected users"
139
- Logging . log_error_message ( :error , "ConnectionRateLimitReached" , msg )
138
+ Logging . log_error ( socket , "ConnectionRateLimitReached" , msg )
140
139
141
140
{ :error , :too_many_joins } ->
142
141
msg = "Too many joins per second"
143
142
{ :error , % { reason: msg } }
144
143
145
144
{ :error , :increase_connection_pool } ->
146
145
msg = "Please increase your connection pool size"
147
- Logging . log_error_message ( :error , "IncreaseConnectionPool" , msg )
146
+ Logging . log_error ( socket , "IncreaseConnectionPool" , msg )
148
147
149
148
{ :error , :tenant_db_too_many_connections } ->
150
149
msg = "Database can't accept more connections, Realtime won't connect"
151
- Logging . log_error_message ( :error , "DatabaseLackOfConnections" , msg )
150
+ Logging . log_error ( socket , "DatabaseLackOfConnections" , msg )
152
151
153
152
{ :error , :unable_to_set_policies , error } ->
154
- Logging . log_error_message ( :error , "UnableToSetPolicies" , error )
153
+ Logging . log_error ( socket , "UnableToSetPolicies" , error )
155
154
{ :error , % { reason: "Realtime was unable to connect to the project database" } }
156
155
157
156
{ :error , :tenant_database_unavailable } ->
158
- Logging . log_error_message (
159
- :error ,
160
- "UnableToConnectToProject" ,
161
- "Realtime was unable to connect to the project database"
162
- )
157
+ Logging . log_error ( socket , "UnableToConnectToProject" , "Realtime was unable to connect to the project database" )
163
158
164
159
{ :error , :rpc_error , :timeout } ->
165
- Logging . log_error_message ( :error , "TimeoutOnRpcCall" , "Node request timeout" )
160
+ Logging . log_error ( socket , "TimeoutOnRpcCall" , "Node request timeout" )
166
161
167
162
{ :error , :rpc_error , reason } ->
168
- Logging . log_error_message ( :error , "ErrorOnRpcCall" , "RPC call error: " <> inspect ( reason ) )
163
+ Logging . log_error ( socket , "ErrorOnRpcCall" , "RPC call error: " <> inspect ( reason ) )
169
164
170
165
{ :error , :initializing } ->
171
- Logging . log_error_message (
172
- :error ,
173
- "InitializingProjectConnection" ,
174
- "Realtime is initializing the project connection"
175
- )
166
+ Logging . log_error ( socket , "InitializingProjectConnection" , "Realtime is initializing the project connection" )
176
167
177
168
{ :error , :tenant_database_connection_initializing } ->
178
- Logging . log_error_message (
179
- :error ,
180
- "InitializingProjectConnection" ,
181
- "Connecting to the project database"
182
- )
169
+ Logging . log_error ( socket , "InitializingProjectConnection" , "Connecting to the project database" )
183
170
184
171
{ :error , :token_malformed , msg } ->
185
- Logging . log_error_message ( :error , "MalformedJWT" , msg )
172
+ Logging . log_error ( socket , "MalformedJWT" , msg )
186
173
187
174
{ :error , invalid_exp } when is_integer ( invalid_exp ) and invalid_exp <= 0 ->
188
- Logging . log_error_with_token_metadata (
189
- "InvalidJWTToken" ,
190
- "Token expiration time is invalid" ,
191
- socket . assigns . access_token
192
- )
175
+ Logging . log_error ( socket , "InvalidJWTToken" , "Token expiration time is invalid" )
193
176
194
177
{ :error , :private_only } ->
195
- Logging . log_error_message (
196
- :error ,
197
- "PrivateOnly" ,
198
- "This project only allows private channels"
199
- )
178
+ Logging . log_error ( socket , "PrivateOnly" , "This project only allows private channels" )
200
179
201
180
{ :error , :tenant_not_found } ->
202
- Logging . log_error_message (
203
- :error ,
204
- "TenantNotFound" ,
205
- "Tenant with the given ID does not exist"
206
- )
181
+ Logging . log_error ( socket , "TenantNotFound" , "Tenant with the given ID does not exist" )
207
182
208
183
{ :error , :tenant_suspended } ->
209
- Logging . log_error_message (
210
- :error ,
211
- "RealtimeDisabledForTenant" ,
212
- "Realtime disabled for this tenant"
213
- )
184
+ Logging . log_error ( socket , "RealtimeDisabledForTenant" , "Realtime disabled for this tenant" )
214
185
215
186
{ :error , :signature_error } ->
216
- Logging . log_error_message ( :error , "JwtSignatureError" , "Failed to validate JWT signature" )
187
+ Logging . log_error ( socket , "JwtSignatureError" , "Failed to validate JWT signature" )
217
188
218
189
{ :error , :shutdown_in_progress } ->
219
- Logging . log_error_message (
220
- :error ,
221
- "RealtimeRestarting" ,
222
- "Realtime is restarting, please standby"
223
- )
190
+ Logging . log_error ( socket , "RealtimeRestarting" , "Realtime is restarting, please standby" )
224
191
225
192
{ :error , error } ->
226
- Logging . log_error_message ( :error , "UnknownErrorOnChannel" , error )
193
+ Logging . log_error ( socket , "UnknownErrorOnChannel" , error )
227
194
{ :error , % { reason: "Unknown Error on Channel" } }
228
195
end
229
196
end
@@ -276,14 +243,14 @@ defmodule RealtimeWeb.RealtimeChannel do
276
243
log_warning ( "TooManyPresenceMessages" , message )
277
244
end
278
245
279
- socket = Logging . maybe_log_handle_info ( socket , msg )
246
+ Logging . maybe_log_info ( socket , msg )
280
247
push ( socket , "presence_diff" , payload )
281
248
{ :noreply , socket }
282
249
end
283
250
284
251
def handle_info ( % { event: type , payload: payload } = msg , socket ) do
285
252
count ( socket )
286
- socket = Logging . maybe_log_handle_info ( socket , msg )
253
+ Logging . maybe_log_info ( socket , msg )
287
254
push ( socket , type , payload )
288
255
{ :noreply , socket }
289
256
end
@@ -503,7 +470,7 @@ defmodule RealtimeWeb.RealtimeChannel do
503
470
wait
504
471
end
505
472
506
- def limit_joins ( % { tenant: tenant , limits: limits } ) do
473
+ def limit_joins ( % { assigns: % { tenant: tenant , limits: limits } } = socket ) do
507
474
rate_args = Tenants . joins_per_second_rate ( tenant , limits . max_joins_per_second )
508
475
509
476
RateCounter . new ( rate_args )
@@ -517,7 +484,7 @@ defmodule RealtimeWeb.RealtimeChannel do
517
484
{ :error , :too_many_joins }
518
485
519
486
error ->
520
- Logging . log_error_message ( :error , "UnknownErrorOnCounter" , error )
487
+ Logging . log_error ( socket , "UnknownErrorOnCounter" , error )
521
488
{ :error , error }
522
489
end
523
490
end
@@ -533,7 +500,7 @@ defmodule RealtimeWeb.RealtimeChannel do
533
500
end
534
501
end
535
502
536
- defp limit_max_users ( % { limits: % { max_concurrent_users: max_conn_users } , tenant: tenant } ) do
503
+ defp limit_max_users ( % { assigns: % { limits: % { max_concurrent_users: max_conn_users } , tenant: tenant } } ) do
537
504
conns = Realtime.UsersCounter . tenant_users ( tenant )
538
505
539
506
if conns < max_conn_users ,
0 commit comments