Skip to content

Commit 9aed839

Browse files
authored
fix: use only gen_rpc for broadcast & postgres changes (#1491)
1 parent f4b2965 commit 9aed839

File tree

13 files changed

+793
-983
lines changed

13 files changed

+793
-983
lines changed

lib/extensions/postgres_cdc_rls/replication_poller.ex

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
2626
tenant_id = args["id"]
2727
Logger.metadata(external_id: tenant_id, project: tenant_id)
2828

29-
tenant = Realtime.Tenants.Cache.get_tenant_by_external_id(tenant_id)
30-
3129
state = %{
3230
backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp),
3331
db_host: args["db_host"],
@@ -43,7 +41,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
4341
retry_ref: nil,
4442
retry_count: 0,
4543
slot_name: args["slot_name"] <> slot_name_suffix(),
46-
tenant: tenant
44+
tenant_id: tenant_id
4745
}
4846

4947
{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
@@ -76,17 +74,17 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
7674
max_record_bytes: max_record_bytes,
7775
max_changes: max_changes,
7876
conn: conn,
79-
tenant: tenant
77+
tenant_id: tenant_id
8078
} = state
8179
) do
8280
cancel_timer(poll_ref)
8381
cancel_timer(retry_ref)
8482

8583
args = [conn, slot_name, publication, max_changes, max_record_bytes]
8684
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
87-
record_list_changes_telemetry(time, tenant.external_id)
85+
record_list_changes_telemetry(time, tenant_id)
8886

89-
case handle_list_changes_result(list_changes, tenant) do
87+
case handle_list_changes_result(list_changes, tenant_id) do
9088
{:ok, row_count} ->
9189
Backoff.reset(backoff)
9290

@@ -179,18 +177,13 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
179177
rows: [_ | _] = rows,
180178
num_rows: rows_count
181179
}},
182-
tenant
180+
tenant_id
183181
) do
184182
for row <- rows,
185183
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
186-
topic = "realtime:postgres:" <> tenant.external_id
187-
188-
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(
189-
tenant,
190-
topic,
191-
change,
192-
MessageDispatcher
193-
)
184+
topic = "realtime:postgres:" <> tenant_id
185+
186+
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(topic, change, MessageDispatcher)
194187
end
195188

196189
{:ok, rows_count}

lib/realtime/tenants/batch_broadcast.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ defmodule Realtime.Tenants.BatchBroadcast do
119119
broadcast = %Phoenix.Socket.Broadcast{topic: topic, event: @event_type, payload: payload}
120120

121121
GenCounter.add(events_per_second_rate.id)
122-
TenantBroadcaster.pubsub_broadcast(tenant, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
122+
TenantBroadcaster.pubsub_broadcast(tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
123123
end
124124

125125
defp permissions_for_message(_, nil, _), do: nil

lib/realtime_web/channels/realtime_channel/broadcast_handler.ex

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,10 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
99
alias RealtimeWeb.RealtimeChannel
1010
alias RealtimeWeb.TenantBroadcaster
1111
alias Phoenix.Socket
12-
alias Realtime.Api.Tenant
1312
alias Realtime.GenCounter
1413
alias Realtime.Tenants.Authorization
1514
alias Realtime.Tenants.Authorization.Policies
1615
alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies
17-
alias Realtime.Tenants.Cache
1816

1917
@event_type "broadcast"
2018
@spec handle(map(), Socket.t()) :: {:reply, :ok, Socket.t()} | {:noreply, Socket.t()}
@@ -38,8 +36,8 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
3836
|> assign(:policies, policies)
3937
|> increment_rate_counter()
4038

41-
%{ack_broadcast: ack_broadcast, tenant: tenant_id} = socket.assigns
42-
send_message(tenant_id, self_broadcast, tenant_topic, payload)
39+
%{ack_broadcast: ack_broadcast} = socket.assigns
40+
send_message(self_broadcast, tenant_topic, payload)
4341
if ack_broadcast, do: {:reply, :ok, socket}, else: {:noreply, socket}
4442

4543
{:ok, policies} ->
@@ -56,42 +54,28 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
5654
end
5755

5856
def handle(payload, _db_conn, %{assigns: %{private?: false}} = socket) do
59-
%{
60-
assigns: %{
61-
tenant_topic: tenant_topic,
62-
self_broadcast: self_broadcast,
63-
ack_broadcast: ack_broadcast,
64-
tenant: tenant_id
65-
}
66-
} = socket
57+
%{assigns: %{tenant_topic: tenant_topic, self_broadcast: self_broadcast, ack_broadcast: ack_broadcast}} = socket
6758

6859
socket = increment_rate_counter(socket)
69-
send_message(tenant_id, self_broadcast, tenant_topic, payload)
60+
send_message(self_broadcast, tenant_topic, payload)
7061

7162
if ack_broadcast,
7263
do: {:reply, :ok, socket},
7364
else: {:noreply, socket}
7465
end
7566

76-
defp send_message(tenant_id, self_broadcast, tenant_topic, payload) do
77-
with %Tenant{} = tenant <- Cache.get_tenant_by_external_id(tenant_id) do
78-
broadcast = %Phoenix.Socket.Broadcast{
79-
topic: tenant_topic,
80-
event: @event_type,
81-
payload: payload
82-
}
83-
84-
if self_broadcast do
85-
TenantBroadcaster.pubsub_broadcast(tenant, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
86-
else
87-
TenantBroadcaster.pubsub_broadcast_from(
88-
tenant,
89-
self(),
90-
tenant_topic,
91-
broadcast,
92-
RealtimeChannel.MessageDispatcher
93-
)
94-
end
67+
defp send_message(self_broadcast, tenant_topic, payload) do
68+
broadcast = %Phoenix.Socket.Broadcast{topic: tenant_topic, event: @event_type, payload: payload}
69+
70+
if self_broadcast do
71+
TenantBroadcaster.pubsub_broadcast(tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
72+
else
73+
TenantBroadcaster.pubsub_broadcast_from(
74+
self(),
75+
tenant_topic,
76+
broadcast,
77+
RealtimeChannel.MessageDispatcher
78+
)
9579
end
9680
end
9781

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,39 @@
11
defmodule RealtimeWeb.TenantBroadcaster do
22
@moduledoc """
3-
Interface to broadcast messages to tenants
3+
gen_rpc broadcaster
44
"""
55

66
alias Phoenix.Endpoint
77
alias Phoenix.PubSub
8-
alias Realtime.Api.Tenant
98

10-
@callback broadcast(Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok | {:error, term()}
11-
@callback broadcast_from(from :: pid(), Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok | {:error, term()}
12-
@callback pubsub_broadcast(PubSub.topic(), PubSub.message(), PubSub.dispatcher()) ::
13-
:ok | {:error, term()}
14-
@callback pubsub_broadcast_from(from :: pid, PubSub.topic(), PubSub.message(), PubSub.dispatcher()) ::
15-
:ok | {:error, term()}
16-
17-
@spec broadcast(tenant :: Tenant.t(), Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok
18-
def broadcast(tenant, topic, event, msg) do
19-
adapter(tenant.broadcast_adapter).broadcast(topic, event, msg)
9+
@spec broadcast(Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok
10+
def broadcast(topic, event, msg) do
11+
Realtime.GenRpc.multicast(RealtimeWeb.Endpoint, :local_broadcast, [topic, event, msg], key: topic)
12+
:ok
2013
end
2114

22-
@spec broadcast_from(tenant :: Tenant.t(), from :: pid, Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok
23-
def broadcast_from(tenant, from, topic, event, msg) do
24-
adapter(tenant.broadcast_adapter).broadcast_from(from, topic, event, msg)
15+
@spec broadcast_from(from :: pid, Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok
16+
def broadcast_from(from, topic, event, msg) do
17+
Realtime.GenRpc.multicast(RealtimeWeb.Endpoint, :local_broadcast_from, [from, topic, event, msg], key: topic)
18+
:ok
2519
end
2620

27-
@spec pubsub_broadcast(tenant :: Tenant.t(), PubSub.topic(), PubSub.message(), PubSub.dispatcher()) ::
28-
:ok | {:error, term}
29-
def pubsub_broadcast(tenant, topic, message, dispatcher) do
30-
adapter(tenant.broadcast_adapter).pubsub_broadcast(topic, message, dispatcher)
31-
end
21+
@spec pubsub_broadcast(PubSub.topic(), PubSub.message(), PubSub.dispatcher()) :: :ok
22+
def pubsub_broadcast(topic, message, dispatcher) do
23+
Realtime.GenRpc.multicast(PubSub, :local_broadcast, [Realtime.PubSub, topic, message, dispatcher], key: topic)
3224

33-
@spec pubsub_broadcast_from(tenant :: Tenant.t(), from :: pid, PubSub.topic(), PubSub.message(), PubSub.dispatcher()) ::
34-
:ok | {:error, term}
35-
def pubsub_broadcast_from(tenant, from, topic, message, dispatcher) do
36-
adapter(tenant.broadcast_adapter).pubsub_broadcast_from(from, topic, message, dispatcher)
25+
:ok
3726
end
3827

39-
defp adapter(:gen_rpc), do: RealtimeWeb.TenantBroadcaster.GenRpc
40-
defp adapter(_), do: RealtimeWeb.TenantBroadcaster.Phoenix
28+
@spec pubsub_broadcast_from(from :: pid, PubSub.topic(), PubSub.message(), PubSub.dispatcher()) :: :ok
29+
def pubsub_broadcast_from(from, topic, message, dispatcher) do
30+
Realtime.GenRpc.multicast(
31+
PubSub,
32+
:local_broadcast_from,
33+
[Realtime.PubSub, from, topic, message, dispatcher],
34+
key: topic
35+
)
36+
37+
:ok
38+
end
4139
end

lib/realtime_web/tenant_broadcaster/gen_rpc.ex

Lines changed: 0 additions & 38 deletions
This file was deleted.

lib/realtime_web/tenant_broadcaster/phoenix.ex

Lines changed: 0 additions & 23 deletions
This file was deleted.

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.41.23",
7+
version: "2.41.24",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -530,12 +530,10 @@ defmodule Realtime.Integration.RtChannelTest do
530530

531531
@tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence],
532532
mode: :distributed
533-
test "private broadcast with valid channel with permissions sends message using a remote node (gen_rpc adapter)", %{
533+
test "private broadcast with valid channel with permissions sends message using a remote node", %{
534534
tenant: tenant,
535535
topic: topic
536536
} do
537-
{:ok, tenant} = Realtime.Api.update_tenant(tenant, %{broadcast_adapter: :gen_rpc})
538-
539537
{:ok, token} =
540538
generate_token(tenant, %{exp: System.system_time(:second) + 1000, role: "authenticated", sub: random_string()})
541539

0 commit comments

Comments
 (0)