Skip to content

Commit 4ef6dd3

Browse files
authored
fix: rate limit presence events (#1493)
1 parent 5945389 commit 4ef6dd3

File tree

14 files changed

+467
-258
lines changed

14 files changed

+467
-258
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ This is the list of operational codes that can help you understand your deployme
270270
| UnableToEncodeJson | An error were we are not handling correctly the response to be sent to the end user |
271271
| UnknownErrorOnController | An error we are not handling correctly was triggered on a controller |
272272
| UnknownErrorOnChannel | An error we are not handling correctly was triggered on a channel |
273-
| TooManyPresenceMessages | Limit of presence events reached |
273+
| PresenceRateLimitReached | Limit of presence events reached |
274274

275275
## License
276276

lib/realtime/api/tenant.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ defmodule Realtime.Api.Tenant do
1919
field(:postgres_cdc_default, :string)
2020
field(:max_concurrent_users, :integer)
2121
field(:max_events_per_second, :integer)
22+
field(:max_presence_events_per_second, :integer, default: 10_000)
23+
field(:max_payload_size_in_kb, :integer, default: 3000)
2224
field(:max_bytes_per_second, :integer)
2325
field(:max_channels_per_client, :integer)
2426
field(:max_joins_per_second, :integer)
@@ -69,6 +71,8 @@ defmodule Realtime.Api.Tenant do
6971
:max_bytes_per_second,
7072
:max_channels_per_client,
7173
:max_joins_per_second,
74+
:max_presence_events_per_second,
75+
:max_payload_size_in_kb,
7276
:suspend,
7377
:private_only,
7478
:migrations_ran,

lib/realtime/tenants.ex

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,17 +264,26 @@ defmodule Realtime.Tenants do
264264
@doc "RateCounter arguments for counting presence events per second."
265265
@spec presence_events_per_second_rate(Tenant.t()) :: RateCounter.Args.t()
266266
def presence_events_per_second_rate(tenant) do
267-
presence_events_per_second_rate(tenant.external_id, tenant.max_events_per_second)
267+
presence_events_per_second_rate(tenant.external_id, tenant.max_presence_events_per_second)
268268
end
269269

270270
@spec presence_events_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
271-
def presence_events_per_second_rate(tenant_id, max_events_per_second) do
271+
def presence_events_per_second_rate(tenant_id, max_presence_events_per_second) do
272272
opts = [
273273
telemetry: %{
274274
event_name: [:channel, :presence_events],
275-
measurements: %{limit: max_events_per_second},
275+
measurements: %{limit: max_presence_events_per_second},
276276
metadata: %{tenant: tenant_id}
277-
}
277+
},
278+
limit: [
279+
value: max_presence_events_per_second,
280+
log_fn: fn ->
281+
Logger.error("PresenceRateLimitReached: Too many presence events per second",
282+
external_id: tenant_id,
283+
project: tenant_id
284+
)
285+
end
286+
]
278287
]
279288

280289
%RateCounter.Args{id: presence_events_per_second_key(tenant_id), opts: opts}

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 65 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ defmodule RealtimeWeb.RealtimeChannel do
33
Used for handling channels and subscriptions.
44
"""
55
use RealtimeWeb, :channel
6-
use Realtime.Logs
6+
use RealtimeWeb.RealtimeChannel.Logging
77

88
alias RealtimeWeb.SocketDisconnect
99
alias DBConnection.Backoff
@@ -23,7 +23,6 @@ defmodule RealtimeWeb.RealtimeChannel do
2323

2424
alias RealtimeWeb.ChannelsAuthorization
2525
alias RealtimeWeb.RealtimeChannel.BroadcastHandler
26-
alias RealtimeWeb.RealtimeChannel.Logging
2726
alias RealtimeWeb.RealtimeChannel.MessageDispatcher
2827
alias RealtimeWeb.RealtimeChannel.PresenceHandler
2928
alias RealtimeWeb.RealtimeChannel.Tracker
@@ -32,7 +31,7 @@ defmodule RealtimeWeb.RealtimeChannel do
3231

3332
@impl true
3433
def join("realtime:", _params, socket) do
35-
Logging.log_error(socket, "TopicNameRequired", "You must provide a topic name")
34+
log_error(socket, "TopicNameRequired", "You must provide a topic name")
3635
end
3736

3837
def join("realtime:" <> sub_topic = topic, params, socket) do
@@ -120,77 +119,77 @@ defmodule RealtimeWeb.RealtimeChannel do
120119
{:ok, state, assign(socket, assigns)}
121120
else
122121
{:error, :expired_token, msg} ->
123-
Logging.maybe_log_warning(socket, "InvalidJWTToken", msg)
122+
maybe_log_warning(socket, "InvalidJWTToken", msg)
124123

125124
{:error, :missing_claims} ->
126125
msg = "Fields `role` and `exp` are required in JWT"
127-
Logging.maybe_log_warning(socket, "InvalidJWTToken", msg)
126+
maybe_log_warning(socket, "InvalidJWTToken", msg)
128127

129128
{:error, :unauthorized, msg} ->
130-
Logging.log_error(socket, "Unauthorized", msg)
129+
log_error(socket, "Unauthorized", msg)
131130

132131
{:error, :too_many_channels} ->
133132
msg = "Too many channels"
134-
Logging.log_error(socket, "ChannelRateLimitReached", msg)
133+
log_error(socket, "ChannelRateLimitReached", msg)
135134

136135
{:error, :too_many_connections} ->
137136
msg = "Too many connected users"
138-
Logging.log_error(socket, "ConnectionRateLimitReached", msg)
137+
log_error(socket, "ConnectionRateLimitReached", msg)
139138

140139
{:error, :too_many_joins} ->
141140
msg = "ClientJoinRateLimitReached: Too many joins per second"
142141
{:error, %{reason: msg}}
143142

144143
{:error, :increase_connection_pool} ->
145144
msg = "Please increase your connection pool size"
146-
Logging.log_error(socket, "IncreaseConnectionPool", msg)
145+
log_error(socket, "IncreaseConnectionPool", msg)
147146

148147
{:error, :tenant_db_too_many_connections} ->
149148
msg = "Database can't accept more connections, Realtime won't connect"
150-
Logging.log_error(socket, "DatabaseLackOfConnections", msg)
149+
log_error(socket, "DatabaseLackOfConnections", msg)
151150

152151
{:error, :unable_to_set_policies, error} ->
153-
Logging.log_error(socket, "UnableToSetPolicies", error)
152+
log_error(socket, "UnableToSetPolicies", error)
154153
{:error, %{reason: "Realtime was unable to connect to the project database"}}
155154

156155
{:error, :tenant_database_unavailable} ->
157-
Logging.log_error(socket, "UnableToConnectToProject", "Realtime was unable to connect to the project database")
156+
log_error(socket, "UnableToConnectToProject", "Realtime was unable to connect to the project database")
158157

159158
{:error, :rpc_error, :timeout} ->
160-
Logging.log_error(socket, "TimeoutOnRpcCall", "Node request timeout")
159+
log_error(socket, "TimeoutOnRpcCall", "Node request timeout")
161160

162161
{:error, :rpc_error, reason} ->
163-
Logging.log_error(socket, "ErrorOnRpcCall", "RPC call error: " <> inspect(reason))
162+
log_error(socket, "ErrorOnRpcCall", "RPC call error: " <> inspect(reason))
164163

165164
{:error, :initializing} ->
166-
Logging.log_error(socket, "InitializingProjectConnection", "Realtime is initializing the project connection")
165+
log_error(socket, "InitializingProjectConnection", "Realtime is initializing the project connection")
167166

168167
{:error, :tenant_database_connection_initializing} ->
169-
Logging.log_error(socket, "InitializingProjectConnection", "Connecting to the project database")
168+
log_error(socket, "InitializingProjectConnection", "Connecting to the project database")
170169

171170
{:error, :token_malformed, msg} ->
172-
Logging.log_error(socket, "MalformedJWT", msg)
171+
log_error(socket, "MalformedJWT", msg)
173172

174173
{:error, invalid_exp} when is_integer(invalid_exp) and invalid_exp <= 0 ->
175-
Logging.log_error(socket, "InvalidJWTToken", "Token expiration time is invalid")
174+
log_error(socket, "InvalidJWTToken", "Token expiration time is invalid")
176175

177176
{:error, :private_only} ->
178-
Logging.log_error(socket, "PrivateOnly", "This project only allows private channels")
177+
log_error(socket, "PrivateOnly", "This project only allows private channels")
179178

180179
{:error, :tenant_not_found} ->
181-
Logging.log_error(socket, "TenantNotFound", "Tenant with the given ID does not exist")
180+
log_error(socket, "TenantNotFound", "Tenant with the given ID does not exist")
182181

183182
{:error, :tenant_suspended} ->
184-
Logging.log_error(socket, "RealtimeDisabledForTenant", "Realtime disabled for this tenant")
183+
log_error(socket, "RealtimeDisabledForTenant", "Realtime disabled for this tenant")
185184

186185
{:error, :signature_error} ->
187-
Logging.log_error(socket, "JwtSignatureError", "Failed to validate JWT signature")
186+
log_error(socket, "JwtSignatureError", "Failed to validate JWT signature")
188187

189188
{:error, :shutdown_in_progress} ->
190-
Logging.log_error(socket, "RealtimeRestarting", "Realtime is restarting, please standby")
189+
log_error(socket, "RealtimeRestarting", "Realtime is restarting, please standby")
191190

192191
{:error, error} ->
193-
Logging.log_error(socket, "UnknownErrorOnChannel", error)
192+
log_error(socket, "UnknownErrorOnChannel", error)
194193
{:error, %{reason: "Unknown Error on Channel"}}
195194
end
196195
end
@@ -231,25 +230,16 @@ defmodule RealtimeWeb.RealtimeChannel do
231230
end
232231

233232
def handle_info(%{event: "presence_diff", payload: payload} = msg, socket) do
234-
%{presence_rate_counter: presence_rate_counter, limits: %{max_events_per_second: max}} = socket.assigns
235-
233+
%{presence_rate_counter: presence_rate_counter} = socket.assigns
236234
GenCounter.add(presence_rate_counter.id)
237-
{:ok, rate_counter} = RateCounter.get(presence_rate_counter)
238-
239-
# Let's just log for now
240-
if rate_counter.avg > max do
241-
message = "Too many presence messages per second"
242-
log_warning("TooManyPresenceMessages", message)
243-
end
244-
245-
Logging.maybe_log_info(socket, msg)
235+
maybe_log_info(socket, msg)
246236
push(socket, "presence_diff", payload)
247237
{:noreply, socket}
248238
end
249239

250240
def handle_info(%{event: type, payload: payload} = msg, socket) do
251241
count(socket)
252-
Logging.maybe_log_info(socket, msg)
242+
maybe_log_info(socket, msg)
253243
push(socket, type, payload)
254244
{:noreply, socket}
255245
end
@@ -274,19 +264,19 @@ defmodule RealtimeWeb.RealtimeChannel do
274264
case PostgresCdc.after_connect(module, response, postgres_extension, pg_change_params) do
275265
{:ok, _response} ->
276266
message = "Subscribed to PostgreSQL"
277-
Logging.maybe_log_info(socket, message)
267+
maybe_log_info(socket, message)
278268
push_system_message("postgres_changes", socket, "ok", message, channel_name)
279269
{:noreply, assign(socket, :pg_sub_ref, nil)}
280270

281271
error ->
282-
Logging.maybe_log_warning(socket, "RealtimeDisabledForConfiguration", error)
272+
maybe_log_warning(socket, "RealtimeDisabledForConfiguration", error)
283273

284274
push_system_message("postgres_changes", socket, "error", error, channel_name)
285275
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
286276
end
287277

288278
nil ->
289-
Logging.maybe_log_warning(
279+
maybe_log_warning(
290280
socket,
291281
"ReconnectSubscribeToPostgres",
292282
"Re-connecting to PostgreSQL with params: " <> inspect(pg_change_params)
@@ -295,13 +285,13 @@ defmodule RealtimeWeb.RealtimeChannel do
295285
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe())}
296286

297287
error ->
298-
Logging.maybe_log_error(socket, "UnableToSubscribeToPostgres", error)
288+
maybe_log_error(socket, "UnableToSubscribeToPostgres", error)
299289
push_system_message("postgres_changes", socket, "error", error, channel_name)
300290
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
301291
end
302292
rescue
303293
error ->
304-
log_warning("UnableToSubscribeToPostgres", error)
294+
log_warning(socket, "UnableToSubscribeToPostgres", error)
305295
push_system_message("postgres_changes", socket, "error", error, channel_name)
306296
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
307297
end
@@ -319,7 +309,7 @@ defmodule RealtimeWeb.RealtimeChannel do
319309
shutdown_response(socket, msg)
320310

321311
{:error, error} ->
322-
shutdown_response(socket, to_log(error))
312+
shutdown_response(socket, Realtime.Logs.to_log(error))
323313
end
324314
end
325315

@@ -329,7 +319,16 @@ defmodule RealtimeWeb.RealtimeChannel do
329319
{:stop, :shutdown, socket}
330320
end
331321

332-
def handle_info(:sync_presence, %{assigns: %{presence_enabled?: true}} = socket), do: PresenceHandler.sync(socket)
322+
def handle_info(:sync_presence, %{assigns: %{presence_enabled?: true}} = socket) do
323+
case PresenceHandler.sync(socket) do
324+
:ok ->
325+
{:noreply, socket}
326+
327+
{:error, :rate_limit_exceeded} ->
328+
shutdown_response(socket, "Too many presence messages per second")
329+
end
330+
end
331+
333332
def handle_info(:sync_presence, socket), do: {:noreply, socket}
334333
def handle_info(_, socket), do: {:noreply, socket}
335334

@@ -341,7 +340,7 @@ defmodule RealtimeWeb.RealtimeChannel do
341340
BroadcastHandler.handle(payload, db_conn, socket)
342341
else
343342
{:error, error} ->
344-
log_error("UnableToHandleBroadcast", error)
343+
log_error(socket, "UnableToHandleBroadcast", error)
345344
{:noreply, socket}
346345
end
347346
end
@@ -353,17 +352,30 @@ defmodule RealtimeWeb.RealtimeChannel do
353352
def handle_in("presence", payload, %{assigns: %{private?: true}} = socket) do
354353
%{tenant: tenant_id} = socket.assigns
355354

356-
with {:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id) do
357-
PresenceHandler.handle(payload, db_conn, socket)
355+
with {:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
356+
{:ok, socket} <- PresenceHandler.handle(payload, db_conn, socket) do
357+
{:reply, :ok, socket}
358358
else
359+
{:error, :rate_limit_exceeded} ->
360+
shutdown_response(socket, "Too many presence messages per second")
361+
359362
{:error, error} ->
360-
log_error("UnableToHandlePresence", error)
361-
{:noreply, socket}
363+
log_error(socket, "UnableToHandlePresence", error)
364+
{:reply, :error, socket}
362365
end
363366
end
364367

365368
def handle_in("presence", payload, %{assigns: %{private?: false}} = socket) do
366-
PresenceHandler.handle(payload, socket)
369+
with {:ok, socket} <- PresenceHandler.handle(payload, socket) do
370+
{:reply, :ok, socket}
371+
else
372+
{:error, :rate_limit_exceeded} ->
373+
shutdown_response(socket, "Too many presence messages per second")
374+
375+
{:error, error} ->
376+
log_error(socket, "UnableToHandlePresence", error)
377+
{:reply, :error, socket}
378+
end
367379
end
368380

369381
def handle_in("access_token", %{"access_token" => "sb_" <> _}, socket) do
@@ -476,7 +488,7 @@ defmodule RealtimeWeb.RealtimeChannel do
476488
{:error, :too_many_joins}
477489

478490
error ->
479-
Logging.log_error(socket, "UnknownErrorOnCounter", error)
491+
log_error(socket, "UnknownErrorOnCounter", error)
480492
{:error, error}
481493
end
482494
end
@@ -553,7 +565,7 @@ defmodule RealtimeWeb.RealtimeChannel do
553565
assign(socket, :access_token, tenant_token)
554566
end
555567

556-
defp confirm_token(%{assigns: assigns} = socket) do
568+
defp confirm_token(%{assigns: assigns}) do
557569
%{jwt_secret: jwt_secret, access_token: access_token} = assigns
558570

559571
jwt_jwks = Map.get(assigns, :jwt_jwks)
@@ -586,7 +598,7 @@ defmodule RealtimeWeb.RealtimeChannel do
586598
defp shutdown_response(socket, message) when is_binary(message) do
587599
%{assigns: %{channel_name: channel_name}} = socket
588600
push_system_message("system", socket, "error", message, channel_name)
589-
Logging.maybe_log_warning(socket, "ChannelShutdown", message)
601+
maybe_log_warning(socket, "ChannelShutdown", message)
590602
{:stop, :normal, socket}
591603
end
592604

@@ -725,7 +737,7 @@ defmodule RealtimeWeb.RealtimeChannel do
725737
{:error, :increase_connection_pool}
726738

727739
{:error, :rls_policy_error, error} ->
728-
log_error("RlsPolicyError", error)
740+
log_error(socket, "RlsPolicyError", error)
729741

730742
{:error, :unauthorized, "You do not have permissions to read from this Channel topic: #{topic}"}
731743

0 commit comments

Comments
 (0)