Skip to content

Commit 23bb692

Browse files
filipecabacow3b6x9
andauthored
fix: Instrument RPC calls (#766)
Instrument RPC calls --------- Co-authored-by: Wen Bo Xie <[email protected]>
1 parent 1a1ccba commit 23bb692

File tree

9 files changed

+72
-21
lines changed

9 files changed

+72
-21
lines changed

lib/extensions/postgres_cdc_rls/cdc_rls.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ defmodule Extensions.PostgresCdcRls do
99
alias RealtimeWeb.Endpoint
1010
alias Extensions.PostgresCdcRls, as: Rls
1111
alias Rls.Subscriptions
12+
alias Realtime.Rpc
1213

1314
@spec handle_connect(map()) :: {:ok, {pid(), pid()}} | nil
1415
def handle_connect(args) do
@@ -31,7 +32,7 @@ defmodule Extensions.PostgresCdcRls do
3132
conn_node = node(conn)
3233

3334
if conn_node !== node() do
34-
:rpc.call(
35+
Rpc.call(
3536
conn_node,
3637
Subscriptions,
3738
:create,
@@ -67,7 +68,7 @@ defmodule Extensions.PostgresCdcRls do
6768
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
6869
)
6970

70-
case :rpc.call(launch_node, __MODULE__, :start, [args], 30_000) do
71+
case Rpc.call(launch_node, __MODULE__, :start, [args], 30_000) do
7172
{:ok, _pid} = ok ->
7273
ok
7374

lib/extensions/postgres_cdc_rls/subscriptions_checker.ex

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
77
alias Rls.Subscriptions
88

99
alias Realtime.Helpers, as: H
10-
10+
alias Realtime.Rpc
1111
@timeout 120_000
1212
@max_delete_records 1000
1313

@@ -178,7 +178,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
178178
if node == node() do
179179
acc ++ not_alive_pids(pids)
180180
else
181-
case :rpc.call(node, __MODULE__, :not_alive_pids, [pids], 15_000) do
181+
case Rpc.call(node, __MODULE__, :not_alive_pids, [pids], 15_000) do
182182
{:badrpc, _} = error ->
183183
Logger.error("Can't check pids on node #{inspect(node)}: #{inspect(error)}")
184184
acc
@@ -202,18 +202,10 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
202202
end
203203

204204
defp check_delete_queue() do
205-
Process.send_after(
206-
self(),
207-
:check_delete_queue,
208-
1000
209-
)
205+
Process.send_after(self(), :check_delete_queue, 1000)
210206
end
211207

212208
defp check_active_pids() do
213-
Process.send_after(
214-
self(),
215-
:check_active_pids,
216-
@timeout
217-
)
209+
Process.send_after(self(), :check_active_pids, @timeout)
218210
end
219211
end

lib/extensions/postgres_cdc_stream/cdc_stream.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ defmodule Extensions.PostgresCdcStream do
55
require Logger
66

77
alias Extensions.PostgresCdcStream, as: Stream
8+
alias Realtime.Rpc
89

910
def handle_connect(opts) do
1011
Enum.reduce_while(1..5, nil, fn retry, acc ->
@@ -59,7 +60,7 @@ defmodule Extensions.PostgresCdcStream do
5960
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
6061
)
6162

62-
case :rpc.call(launch_node, __MODULE__, :start, [args], 30_000) do
63+
case Rpc.call(launch_node, __MODULE__, :start, [args], 30_000) do
6364
{:ok, _pid} = ok ->
6465
ok
6566

lib/realtime/helpers.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ defmodule Realtime.Helpers do
55

66
alias Realtime.Api.Tenant
77
alias Realtime.PostgresCdc
8-
8+
alias Realtime.Rpc
99
require Logger
1010

1111
@spec cancel_timer(reference() | nil) :: non_neg_integer() | false | :ok | nil
@@ -318,7 +318,7 @@ defmodule Realtime.Helpers do
318318
[node() | Node.list()]
319319
|> Task.async_stream(
320320
fn node ->
321-
:erpc.call(node, __MODULE__, :kill_connections_to_tenant_id, [tenant_id, reason], 5000)
321+
Rpc.ecall(node, __MODULE__, :kill_connections_to_tenant_id, [tenant_id, reason], 5000)
322322
end,
323323
timeout: 5000
324324
)

lib/realtime/monitoring/latency.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule Realtime.Latency do
88
require Logger
99

1010
alias Realtime.Helpers
11+
alias Realtime.Rpc
1112

1213
defmodule Payload do
1314
@moduledoc false
@@ -94,7 +95,7 @@ defmodule Realtime.Latency do
9495
for n <- [Node.self() | Node.list()] do
9596
Task.Supervisor.async(Realtime.TaskSupervisor, fn ->
9697
{latency, response} =
97-
:timer.tc(fn -> :rpc.call(n, __MODULE__, :pong, [pong_timeout], timer_timeout) end)
98+
:timer.tc(fn -> Rpc.call(n, __MODULE__, :pong, [pong_timeout], timer_timeout) end)
9899

99100
latency_ms = latency / 1_000
100101
region = Application.get_env(:realtime, :region, "not_set")

lib/realtime/rpc.ex

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
defmodule Realtime.Rpc do
2+
@moduledoc """
3+
RPC module for Realtime with the intent of standardizing the RPC interface and collect telemetry
4+
"""
5+
alias Realtime.Telemetry
6+
7+
@doc """
8+
Calls external node using :rpc.call/5 and collects telemetry
9+
"""
10+
def call(node, mod, func, opts \\ [], timeout \\ 15000) do
11+
{latency, response} = :timer.tc(fn -> :rpc.call(node, mod, func, opts, timeout) end)
12+
13+
Telemetry.execute([:rpc, :call], latency, %{
14+
mod: mod,
15+
func: func,
16+
target_node: node,
17+
origin_node: node()
18+
})
19+
20+
response
21+
rescue
22+
_ ->
23+
Telemetry.execute([:erpc, :call], timeout, %{
24+
mod: mod,
25+
func: func,
26+
target_node: node,
27+
origin_node: node()
28+
})
29+
end
30+
31+
@doc """
32+
Calls external node using :erpc.call/5 and collects telemetry
33+
"""
34+
def ecall(node, mod, func, opts \\ [], timeout \\ 15000) do
35+
{latency, response} = :timer.tc(fn -> :erpc.call(node, mod, func, opts, timeout) end)
36+
37+
Telemetry.execute([:erpc, :call], latency, %{
38+
mod: mod,
39+
func: func,
40+
target_node: node,
41+
origin_node: node()
42+
})
43+
44+
response
45+
rescue
46+
_ ->
47+
Telemetry.execute([:erpc, :call], timeout, %{
48+
mod: mod,
49+
func: func,
50+
target_node: node,
51+
origin_node: node()
52+
})
53+
end
54+
end

lib/realtime/tenants/connect.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ defmodule Realtime.Tenants.Connect do
1414
alias Realtime.Helpers
1515
alias Realtime.Tenants
1616
alias Realtime.UsersCounter
17+
alias Realtime.Rpc
1718

1819
@erpc_timeout_default 5000
1920
@check_connected_user_interval_default 50_000
@@ -173,7 +174,7 @@ defmodule Realtime.Tenants.Connect do
173174
with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id),
174175
:ok <- tenant_suspended?(tenant),
175176
{:ok, node} <- Realtime.Nodes.get_node_for_tenant(tenant) do
176-
:erpc.call(node, __MODULE__, :connect, [tenant_id, opts], erpc_timeout)
177+
Rpc.ecall(node, __MODULE__, :connect, [tenant_id, opts], erpc_timeout)
177178
end
178179
end
179180

lib/realtime_web/controllers/metrics_controller.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ defmodule RealtimeWeb.MetricsController do
22
use RealtimeWeb, :controller
33
require Logger
44
alias Realtime.PromEx
5+
alias Realtime.Rpc
56

67
def index(conn, _) do
78
cluster_metrics =
89
Node.list()
910
|> Task.async_stream(
1011
fn node ->
11-
{node, :rpc.call(node, PromEx, :get_metrics, [], 10_000)}
12+
{node, Rpc.call(node, PromEx, :get_metrics, [], 10_000)}
1213
end,
1314
timeout: :infinity
1415
)

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.25.48",
7+
version: "2.25.49",
88
elixir: "~> 1.14.0",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

0 commit comments

Comments
 (0)