Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom()
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)

no_channel_timeout_in_ms =
if config_env() == :test,
Expand Down Expand Up @@ -126,7 +127,8 @@ config :realtime,
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
platform: platform,
pubsub_adapter: pubsub_adapter,
broadcast_pool_size: broadcast_pool_size
broadcast_pool_size: broadcast_pool_size,
users_scope_shards: users_scope_shards

if config_env() != :test && run_janitor? do
config :realtime,
Expand Down
3 changes: 1 addition & 2 deletions lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ defmodule Realtime.Application do
Realtime.PromEx.set_metrics_tags()
:ets.new(Realtime.Tenants.Connect, [:named_table, :set, :public])
:syn.set_event_handler(Realtime.SynHandler)

:ok = :syn.add_node_to_scopes([:users, RegionNodes, Realtime.Tenants.Connect])
:ok = :syn.add_node_to_scopes([RegionNodes, Realtime.Tenants.Connect | Realtime.UsersCounter.scopes()])

region = Application.get_env(:realtime, :region)
:syn.join(RegionNodes, region, self(), node: node())
Expand Down
3 changes: 2 additions & 1 deletion lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ defmodule Realtime.Tenants do
"""
@spec list_connected_tenants(atom()) :: [String.t()]
def list_connected_tenants(node) do
:syn.group_names(:users, node)
UsersCounter.scopes()
|> Enum.flat_map(fn scope -> :syn.group_names(scope, node) end)
end

@doc """
Expand Down
21 changes: 18 additions & 3 deletions lib/realtime/user_counter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,32 @@ defmodule Realtime.UsersCounter do
Adds a RealtimeChannel pid to the `:users` scope for a tenant so we can keep track of all connected clients for a tenant.
"""
@spec add(pid(), String.t()) :: :ok
def add(pid, tenant), do: :syn.join(:users, tenant, pid)
def add(pid, tenant_id), do: tenant_id |> scope() |> :syn.join(tenant_id, pid)

@doc """
Returns the count of all connected clients for a tenant for the cluster.
"""
@spec tenant_users(String.t()) :: non_neg_integer()
def tenant_users(tenant), do: :syn.member_count(:users, tenant)
def tenant_users(tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id)

@doc """
Returns the count of all connected clients for a tenant for a single node.
"""
@spec tenant_users(atom, String.t()) :: non_neg_integer()
def tenant_users(node_name, tenant), do: :syn.member_count(:users, tenant, node_name)
def tenant_users(node_name, tenant_id), do: tenant_id |> scope() |> :syn.member_count(tenant_id, node_name)

@doc """
Returns the scope for a given tenant id.
"""
@spec scope(String.t()) :: atom()
def scope(tenant_id) do
shards = Application.get_env(:realtime, :users_scope_shards)
shard = :erlang.phash2(tenant_id, shards)
:"users_#{shard}"
end

def scopes() do
shards = Application.get_env(:realtime, :users_scope_shards)
Enum.map(0..(shards - 1), fn shard -> :"users_#{shard}" end)
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.53.3",
version: "2.53.4",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
2 changes: 1 addition & 1 deletion test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2588,7 +2588,7 @@ defmodule Realtime.Integration.RtChannelTest do
Realtime.Tenants.Cache.invalidate_tenant_cache(external_id)
end

defp assert_process_down(pid, timeout \\ 100) do
defp assert_process_down(pid, timeout \\ 300) do
ref = Process.monitor(pid)
assert_receive {:DOWN, ^ref, :process, ^pid, _reason}, timeout
end
Expand Down
4 changes: 2 additions & 2 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ defmodule Realtime.Tenants.ConnectTest do
region = Tenants.region(tenant)
assert {_pid, %{conn: ^db_conn, region: ^region}} = :syn.lookup(Connect, external_id)
Process.sleep(1000)
:syn.leave(:users, external_id, self())
external_id |> UsersCounter.scope() |> :syn.leave(external_id, self())
Process.sleep(1000)
assert :undefined = :syn.lookup(Connect, external_id)
assert :undefined = external_id |> UsersCounter.scope() |> :syn.lookup(external_id)
refute Process.alive?(db_conn)
Connect.shutdown(external_id)
end
Expand Down