Skip to content

Commit 786f7ef

Browse files
committed
fix: rate limit presence events
1 parent f4b2965 commit 786f7ef

File tree

5 files changed

+151
-81
lines changed

5 files changed

+151
-81
lines changed

lib/realtime/api/tenant.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ defmodule Realtime.Api.Tenant do
1919
field(:postgres_cdc_default, :string)
2020
field(:max_concurrent_users, :integer)
2121
field(:max_events_per_second, :integer)
22+
field(:max_presence_events_per_second, :integer, default: 100)
2223
field(:max_bytes_per_second, :integer)
2324
field(:max_channels_per_client, :integer)
2425
field(:max_joins_per_second, :integer)

lib/realtime_web/channels/realtime_channel/presence_handler.ex

Lines changed: 56 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
1010
alias Phoenix.Socket
1111
alias Phoenix.Tracker.Shard
1212
alias Realtime.GenCounter
13+
alias Realtime.RateCounter
14+
alias Realtime.Tenants
1315
alias Realtime.Tenants.Authorization
1416
alias Realtime.Tenants.Authorization.Policies
1517
alias Realtime.Tenants.Authorization.Policies.PresencePolicies
@@ -24,29 +26,33 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
2426

2527
def sync(%{assigns: %{private?: false}} = socket) do
2628
%{assigns: %{tenant_topic: topic}} = socket
29+
2730
socket = Logging.maybe_log_handle_info(socket, :sync_presence)
28-
count(socket)
29-
push(socket, "presence_state", presence_dirty_list(topic))
30-
{:noreply, socket}
31+
32+
with :ok <- limit_presence_event(socket) do
33+
push(socket, "presence_state", presence_dirty_list(topic))
34+
{:noreply, socket}
35+
else
36+
{:error, :rate_limit_exceeded} -> {:stop, :normal, socket}
37+
end
3138
end
3239

3340
def sync(%{assigns: assigns} = socket) do
3441
%{tenant_topic: topic, policies: policies} = assigns
3542

36-
socket =
37-
case policies do
38-
%Policies{presence: %PresencePolicies{read: false}} ->
39-
Logger.info("Presence track message ignored on #{topic}")
40-
socket
41-
42-
_ ->
43-
socket = Logging.maybe_log_handle_info(socket, :sync_presence)
44-
count(socket)
45-
push(socket, "presence_state", presence_dirty_list(topic))
46-
socket
47-
end
48-
49-
{:noreply, socket}
43+
with :ok <- limit_presence_event(socket),
44+
%Policies{presence: %PresencePolicies{read: true}} <- policies do
45+
socket = Logging.maybe_log_handle_info(socket, :sync_presence)
46+
push(socket, "presence_state", presence_dirty_list(topic))
47+
{:noreply, socket}
48+
else
49+
%Policies{presence: %PresencePolicies{read: false}} ->
50+
Logger.info("Presence track message ignored on #{topic}")
51+
{:noreply, socket}
52+
53+
{:error, :rate_limit_exceeded} ->
54+
{:stop, :normal, socket}
55+
end
5056
end
5157

5258
@spec handle(map(), Socket.t()) :: {:reply, :error | :ok, Socket.t()}
@@ -58,15 +64,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
5864

5965
def handle(%{"event" => event} = payload, db_conn, socket) do
6066
event = String.downcase(event, :ascii)
61-
62-
case handle_presence_event(event, payload, db_conn, socket) do
63-
{:ok, socket} ->
64-
count(socket)
65-
{:reply, :ok, socket}
66-
67-
{:error, socket} ->
68-
{:reply, :error, socket}
69-
end
67+
handle_presence_event(event, payload, db_conn, socket)
7068
end
7169

7270
def handle(_payload, _db_conn, socket), do: {:noreply, socket}
@@ -90,11 +88,11 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
9088

9189
{:error, :rls_policy_error, error} ->
9290
log_error("RlsPolicyError", error)
93-
{:error, socket}
91+
{:reply, :error, socket}
9492

9593
{:error, error} ->
9694
log_error("UnableToSetPolicies", error)
97-
{:error, socket}
95+
{:reply, :error, socket}
9896
end
9997
end
10098

@@ -113,41 +111,43 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
113111
_db_conn,
114112
%{assigns: %{private?: true, policies: %Policies{presence: %PresencePolicies{write: false}}}} = socket
115113
) do
116-
{:error, socket}
114+
{:reply, :error, socket}
117115
end
118116

119117
defp handle_presence_event("untrack", _, _, socket) do
120118
%{assigns: %{presence_key: presence_key, tenant_topic: tenant_topic}} = socket
121-
{Presence.untrack(self(), tenant_topic, presence_key), socket}
119+
:ok = Presence.untrack(self(), tenant_topic, presence_key)
120+
{:reply, :ok, socket}
122121
end
123122

124123
defp handle_presence_event(event, _, _, socket) do
125124
log_error("UnknownPresenceEvent", event)
126-
{:error, socket}
125+
{:reply, :error, socket}
127126
end
128127

129128
defp track(socket, payload) do
130129
%{assigns: %{presence_key: presence_key, tenant_topic: tenant_topic}} = socket
131130
payload = Map.get(payload, "payload", %{})
132131

133-
case Presence.track(self(), tenant_topic, presence_key, payload) do
134-
{:ok, _} ->
135-
{:ok, socket}
136-
132+
with :ok <- limit_presence_event(socket),
133+
{:ok, _} <- Presence.track(self(), tenant_topic, presence_key, payload) do
134+
{:reply, :ok, socket}
135+
else
137136
{:error, {:already_tracked, pid, _, _}} ->
138137
case Presence.update(pid, tenant_topic, presence_key, payload) do
139-
{:ok, _} -> {:ok, socket}
140-
{:error, _} -> {:error, socket}
138+
{:ok, _} -> {:reply, :ok, socket}
139+
{:error, _} -> {:reply, :error, socket}
141140
end
142141

142+
{:error, :rate_limit_exceeded} ->
143+
{:reply, :error, socket}
144+
143145
{:error, error} ->
144146
log_error("UnableToTrackPresence", error)
145-
{:error, socket}
147+
{:reply, :error, socket}
146148
end
147149
end
148150

149-
defp count(%{assigns: %{presence_rate_counter: presence_counter}}), do: GenCounter.add(presence_counter.id)
150-
151151
defp presence_dirty_list(topic) do
152152
[{:pool_size, size}] = :ets.lookup(Presence, :pool_size)
153153

@@ -156,4 +156,20 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
156156
|> Shard.dirty_list(topic)
157157
|> Phoenix.Presence.group()
158158
end
159+
160+
defp limit_presence_event(socket) do
161+
%{assigns: %{presence_rate_counter: presence_counter, tenant: tenant_id}} = socket
162+
GenCounter.add(presence_counter.id)
163+
{:ok, rate_counter} = RateCounter.get(presence_counter)
164+
165+
tenant = Tenants.Cache.get_tenant_by_external_id(tenant_id)
166+
167+
if rate_counter.avg > tenant.max_presence_events_per_second do
168+
message = "Too many presence messages per second"
169+
log_warning("TooManyPresenceMessages", message)
170+
{:error, :rate_limit_exceeded}
171+
else
172+
:ok
173+
end
174+
end
159175
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.41.23",
7+
version: "2.41.24",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
defmodule Realtime.Repo.Migrations.AddMaxPresenceEventsPerSecond do
2+
use Ecto.Migration
3+
4+
def change do
5+
alter table(:tenants) do
6+
add :max_presence_events_per_second, :integer, default: 100
7+
end
8+
end
9+
end

test/realtime_web/channels/realtime_channel/presence_handler_test.exs

Lines changed: 84 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
2525
db_conn: db_conn
2626
} do
2727
key = random_string()
28+
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
2829

2930
socket =
30-
socket_fixture(tenant, topic, key, %Policies{presence: %PresencePolicies{read: true, write: true}})
31+
socket_fixture(tenant, topic, key, policies: policies)
3132

3233
PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
3334
topic = socket.assigns.tenant_topic
@@ -36,15 +37,10 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
3637
assert Map.has_key?(joins, key)
3738
end
3839

39-
test "when tracking already existing user, metadata updated", %{
40-
tenant: tenant,
41-
topic: topic,
42-
db_conn: db_conn
43-
} do
40+
test "when tracking already existing user, metadata updated", %{tenant: tenant, topic: topic, db_conn: db_conn} do
4441
key = random_string()
45-
46-
socket =
47-
socket_fixture(tenant, topic, key, %Policies{presence: %PresencePolicies{read: true, write: true}})
42+
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
43+
socket = socket_fixture(tenant, topic, key, policies: policies)
4844

4945
assert {:reply, :ok, socket} = PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
5046

@@ -62,15 +58,8 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
6258

6359
test "with false policy and is public, user can track their presence and changes", %{tenant: tenant, topic: topic} do
6460
key = random_string()
65-
66-
socket =
67-
socket_fixture(
68-
tenant,
69-
topic,
70-
key,
71-
%Policies{presence: %PresencePolicies{read: false, write: false}},
72-
false
73-
)
61+
policies = %Policies{presence: %PresencePolicies{read: false, write: false}}
62+
socket = socket_fixture(tenant, topic, key, policies: policies, private?: false)
7463

7564
assert {:reply, :ok, _socket} = PresenceHandler.handle(%{"event" => "track"}, socket)
7665

@@ -81,9 +70,8 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
8170

8271
test "user can untrack when they want", %{tenant: tenant, topic: topic, db_conn: db_conn} do
8372
key = random_string()
84-
85-
socket =
86-
socket_fixture(tenant, topic, key, %Policies{presence: %PresencePolicies{read: true, write: true}})
73+
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
74+
socket = socket_fixture(tenant, topic, key, policies: policies)
8775

8876
assert {:reply, :ok, socket} = PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
8977

@@ -151,10 +139,8 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
151139
reject(&Authorization.get_write_authorizations/3)
152140

153141
key = random_string()
154-
155-
socket =
156-
socket_fixture(tenant, topic, key, %Policies{broadcast: %BroadcastPolicies{read: false}}, false)
157-
142+
policies = %Policies{broadcast: %BroadcastPolicies{read: false}}
143+
socket = socket_fixture(tenant, topic, key, policies: policies, private?: false)
158144
topic = socket.assigns.tenant_topic
159145

160146
for _ <- 1..100, reduce: socket do
@@ -187,7 +173,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
187173
} do
188174
key = random_string()
189175
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
190-
socket = socket_fixture(tenant, topic, key, policies, false, false)
176+
socket = socket_fixture(tenant, topic, key, policies: policies, private?: false, enabled?: false)
191177

192178
assert {:reply, :ok, _socket} = PresenceHandler.handle(%{"event" => "track"}, socket)
193179
topic = socket.assigns.tenant_topic
@@ -201,12 +187,68 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
201187
} do
202188
key = random_string()
203189
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
204-
socket = socket_fixture(tenant, topic, key, policies, false, false)
190+
socket = socket_fixture(tenant, topic, key, policies: policies, private?: false, enabled?: false)
205191

206192
assert {:reply, :ok, _socket} = PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
207193
topic = socket.assigns.tenant_topic
208194
refute_receive %Broadcast{topic: ^topic, event: "presence_diff"}
209195
end
196+
197+
@tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence]
198+
test "rate limit is checked on private channel", %{tenant: tenant, topic: topic, db_conn: db_conn} do
199+
key = random_string()
200+
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
201+
socket = socket_fixture(tenant, topic, key, policies: policies, private?: true)
202+
203+
for _ <- 1..300, do: PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
204+
Process.sleep(1000)
205+
206+
assert {:reply, :error, _} = PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
207+
end
208+
209+
test "rate limit is checked on public channel", %{tenant: tenant, topic: topic, db_conn: db_conn} do
210+
key = random_string()
211+
socket = socket_fixture(tenant, topic, key, private?: false)
212+
213+
for _ <- 1..300, do: PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
214+
Process.sleep(1000)
215+
216+
assert {:reply, :error, _} = PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
217+
end
218+
end
219+
220+
describe "sync/1" do
221+
test "syncs presence state for public channels", %{tenant: tenant, topic: topic} do
222+
key = random_string()
223+
policies = %Policies{presence: %PresencePolicies{read: false, write: false}}
224+
socket = socket_fixture(tenant, topic, key, policies: policies, private?: false)
225+
226+
assert {:noreply, _socket} = PresenceHandler.sync(socket)
227+
end
228+
229+
test "syncs presence state for private channels with read policy true", %{tenant: tenant, topic: topic} do
230+
key = random_string()
231+
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
232+
socket = socket_fixture(tenant, topic, key, policies: policies, private?: true)
233+
234+
assert {:noreply, _socket} = PresenceHandler.sync(socket)
235+
end
236+
237+
test "ignores sync for private channels with read policy false", %{tenant: tenant, topic: topic} do
238+
key = random_string()
239+
policies = %Policies{presence: %PresencePolicies{read: false, write: true}}
240+
socket = socket_fixture(tenant, topic, key, policies: policies, private?: true)
241+
242+
assert {:noreply, _socket} = PresenceHandler.sync(socket)
243+
end
244+
245+
test "ignores sync when presence is disabled", %{tenant: tenant, topic: topic} do
246+
key = random_string()
247+
policies = %Policies{presence: %PresencePolicies{read: true, write: true}}
248+
socket = socket_fixture(tenant, topic, key, policies: policies, private?: true, enabled?: false)
249+
250+
assert {:noreply, _socket} = PresenceHandler.sync(socket)
251+
end
210252
end
211253

212254
defp initiate_tenant(context) do
@@ -223,20 +265,19 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
223265
{:ok, tenant: tenant, db_conn: db_conn, topic: topic}
224266
end
225267

226-
defp socket_fixture(
227-
tenant,
228-
topic,
229-
presence_key,
230-
policies \\ %Policies{
231-
broadcast: %BroadcastPolicies{read: true},
232-
presence: %PresencePolicies{read: true, write: nil}
233-
},
234-
private? \\ true,
235-
enabled? \\ true
236-
) do
268+
defp socket_fixture(tenant, topic, presence_key, opts \\ []) do
269+
policies =
270+
Keyword.get(opts, :policies, %Policies{
271+
broadcast: %BroadcastPolicies{read: true},
272+
presence: %PresencePolicies{read: true, write: nil}
273+
})
274+
275+
private? = Keyword.get(opts, :private?, true)
276+
enabled? = Keyword.get(opts, :enabled?, true)
277+
log_level = Keyword.get(opts, :log_level, :error)
278+
237279
claims = %{sub: random_string(), role: "authenticated", exp: Joken.current_time() + 1_000}
238280
signer = Joken.Signer.create("HS256", "secret")
239-
240281
jwt = Joken.generate_and_sign!(%{}, claims, signer)
241282

242283
authorization_context =
@@ -267,7 +308,10 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
267308
presence_rate_counter: rate,
268309
private?: private?,
269310
presence_key: presence_key,
270-
presence_enabled?: enabled?
311+
presence_enabled?: enabled?,
312+
log_level: log_level,
313+
channel_name: topic,
314+
tenant: tenant.external_id
271315
}
272316
}
273317
end

0 commit comments

Comments
 (0)