Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ This is the list of operational codes that can help you understand your deployme
| UnableToEncodeJson | An error were we are not handling correctly the response to be sent to the end user |
| UnknownErrorOnController | An error we are not handling correctly was triggered on a controller |
| UnknownErrorOnChannel | An error we are not handling correctly was triggered on a channel |
| TooManyPresenceMessages | Limit of presence events reached |
| PresenceRateLimitReached | Limit of presence events reached |

## License

Expand Down
4 changes: 4 additions & 0 deletions lib/realtime/api/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ defmodule Realtime.Api.Tenant do
field(:postgres_cdc_default, :string)
field(:max_concurrent_users, :integer)
field(:max_events_per_second, :integer)
field(:max_presence_events_per_second, :integer, default: 10_000)
field(:max_payload_size_in_kb, :integer, default: 3000)
field(:max_bytes_per_second, :integer)
field(:max_channels_per_client, :integer)
field(:max_joins_per_second, :integer)
Expand Down Expand Up @@ -69,6 +71,8 @@ defmodule Realtime.Api.Tenant do
:max_bytes_per_second,
:max_channels_per_client,
:max_joins_per_second,
:max_presence_events_per_second,
:max_payload_size_in_kb,
:suspend,
:private_only,
:migrations_ran,
Expand Down
17 changes: 13 additions & 4 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,26 @@ defmodule Realtime.Tenants do
@doc "RateCounter arguments for counting presence events per second."
@spec presence_events_per_second_rate(Tenant.t()) :: RateCounter.Args.t()
def presence_events_per_second_rate(tenant) do
presence_events_per_second_rate(tenant.external_id, tenant.max_events_per_second)
presence_events_per_second_rate(tenant.external_id, tenant.max_presence_events_per_second)
end

@spec presence_events_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
def presence_events_per_second_rate(tenant_id, max_events_per_second) do
def presence_events_per_second_rate(tenant_id, max_presence_events_per_second) do
opts = [
telemetry: %{
event_name: [:channel, :presence_events],
measurements: %{limit: max_events_per_second},
measurements: %{limit: max_presence_events_per_second},
metadata: %{tenant: tenant_id}
}
},
limit: [
value: max_presence_events_per_second,
log_fn: fn ->
Logger.error("PresenceRateLimitReached: Too many presence events per second",
external_id: tenant_id,
project: tenant_id
)
end
]
]

%RateCounter.Args{id: presence_events_per_second_key(tenant_id), opts: opts}
Expand Down
118 changes: 65 additions & 53 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule RealtimeWeb.RealtimeChannel do
Used for handling channels and subscriptions.
"""
use RealtimeWeb, :channel
use Realtime.Logs
use RealtimeWeb.RealtimeChannel.Logging

alias RealtimeWeb.SocketDisconnect
alias DBConnection.Backoff
Expand All @@ -23,7 +23,6 @@ defmodule RealtimeWeb.RealtimeChannel do

alias RealtimeWeb.ChannelsAuthorization
alias RealtimeWeb.RealtimeChannel.BroadcastHandler
alias RealtimeWeb.RealtimeChannel.Logging
alias RealtimeWeb.RealtimeChannel.MessageDispatcher
alias RealtimeWeb.RealtimeChannel.PresenceHandler
alias RealtimeWeb.RealtimeChannel.Tracker
Expand All @@ -32,7 +31,7 @@ defmodule RealtimeWeb.RealtimeChannel do

@impl true
def join("realtime:", _params, socket) do
Logging.log_error(socket, "TopicNameRequired", "You must provide a topic name")
log_error(socket, "TopicNameRequired", "You must provide a topic name")
end

def join("realtime:" <> sub_topic = topic, params, socket) do
Expand Down Expand Up @@ -120,77 +119,77 @@ defmodule RealtimeWeb.RealtimeChannel do
{:ok, state, assign(socket, assigns)}
else
{:error, :expired_token, msg} ->
Logging.maybe_log_warning(socket, "InvalidJWTToken", msg)
maybe_log_warning(socket, "InvalidJWTToken", msg)

{:error, :missing_claims} ->
msg = "Fields `role` and `exp` are required in JWT"
Logging.maybe_log_warning(socket, "InvalidJWTToken", msg)
maybe_log_warning(socket, "InvalidJWTToken", msg)

{:error, :unauthorized, msg} ->
Logging.log_error(socket, "Unauthorized", msg)
log_error(socket, "Unauthorized", msg)

{:error, :too_many_channels} ->
msg = "Too many channels"
Logging.log_error(socket, "ChannelRateLimitReached", msg)
log_error(socket, "ChannelRateLimitReached", msg)

{:error, :too_many_connections} ->
msg = "Too many connected users"
Logging.log_error(socket, "ConnectionRateLimitReached", msg)
log_error(socket, "ConnectionRateLimitReached", msg)

{:error, :too_many_joins} ->
msg = "ClientJoinRateLimitReached: Too many joins per second"
{:error, %{reason: msg}}

{:error, :increase_connection_pool} ->
msg = "Please increase your connection pool size"
Logging.log_error(socket, "IncreaseConnectionPool", msg)
log_error(socket, "IncreaseConnectionPool", msg)

{:error, :tenant_db_too_many_connections} ->
msg = "Database can't accept more connections, Realtime won't connect"
Logging.log_error(socket, "DatabaseLackOfConnections", msg)
log_error(socket, "DatabaseLackOfConnections", msg)

{:error, :unable_to_set_policies, error} ->
Logging.log_error(socket, "UnableToSetPolicies", error)
log_error(socket, "UnableToSetPolicies", error)
{:error, %{reason: "Realtime was unable to connect to the project database"}}

{:error, :tenant_database_unavailable} ->
Logging.log_error(socket, "UnableToConnectToProject", "Realtime was unable to connect to the project database")
log_error(socket, "UnableToConnectToProject", "Realtime was unable to connect to the project database")

{:error, :rpc_error, :timeout} ->
Logging.log_error(socket, "TimeoutOnRpcCall", "Node request timeout")
log_error(socket, "TimeoutOnRpcCall", "Node request timeout")

{:error, :rpc_error, reason} ->
Logging.log_error(socket, "ErrorOnRpcCall", "RPC call error: " <> inspect(reason))
log_error(socket, "ErrorOnRpcCall", "RPC call error: " <> inspect(reason))

{:error, :initializing} ->
Logging.log_error(socket, "InitializingProjectConnection", "Realtime is initializing the project connection")
log_error(socket, "InitializingProjectConnection", "Realtime is initializing the project connection")

{:error, :tenant_database_connection_initializing} ->
Logging.log_error(socket, "InitializingProjectConnection", "Connecting to the project database")
log_error(socket, "InitializingProjectConnection", "Connecting to the project database")

{:error, :token_malformed, msg} ->
Logging.log_error(socket, "MalformedJWT", msg)
log_error(socket, "MalformedJWT", msg)

{:error, invalid_exp} when is_integer(invalid_exp) and invalid_exp <= 0 ->
Logging.log_error(socket, "InvalidJWTToken", "Token expiration time is invalid")
log_error(socket, "InvalidJWTToken", "Token expiration time is invalid")

{:error, :private_only} ->
Logging.log_error(socket, "PrivateOnly", "This project only allows private channels")
log_error(socket, "PrivateOnly", "This project only allows private channels")

{:error, :tenant_not_found} ->
Logging.log_error(socket, "TenantNotFound", "Tenant with the given ID does not exist")
log_error(socket, "TenantNotFound", "Tenant with the given ID does not exist")

{:error, :tenant_suspended} ->
Logging.log_error(socket, "RealtimeDisabledForTenant", "Realtime disabled for this tenant")
log_error(socket, "RealtimeDisabledForTenant", "Realtime disabled for this tenant")

{:error, :signature_error} ->
Logging.log_error(socket, "JwtSignatureError", "Failed to validate JWT signature")
log_error(socket, "JwtSignatureError", "Failed to validate JWT signature")

{:error, :shutdown_in_progress} ->
Logging.log_error(socket, "RealtimeRestarting", "Realtime is restarting, please standby")
log_error(socket, "RealtimeRestarting", "Realtime is restarting, please standby")

{:error, error} ->
Logging.log_error(socket, "UnknownErrorOnChannel", error)
log_error(socket, "UnknownErrorOnChannel", error)
{:error, %{reason: "Unknown Error on Channel"}}
end
end
Expand Down Expand Up @@ -231,25 +230,16 @@ defmodule RealtimeWeb.RealtimeChannel do
end

def handle_info(%{event: "presence_diff", payload: payload} = msg, socket) do
%{presence_rate_counter: presence_rate_counter, limits: %{max_events_per_second: max}} = socket.assigns

%{presence_rate_counter: presence_rate_counter} = socket.assigns
GenCounter.add(presence_rate_counter.id)
{:ok, rate_counter} = RateCounter.get(presence_rate_counter)

# Let's just log for now
if rate_counter.avg > max do
message = "Too many presence messages per second"
log_warning("TooManyPresenceMessages", message)
end

Logging.maybe_log_info(socket, msg)
maybe_log_info(socket, msg)
push(socket, "presence_diff", payload)
{:noreply, socket}
end

def handle_info(%{event: type, payload: payload} = msg, socket) do
count(socket)
Logging.maybe_log_info(socket, msg)
maybe_log_info(socket, msg)
push(socket, type, payload)
{:noreply, socket}
end
Expand All @@ -274,19 +264,19 @@ defmodule RealtimeWeb.RealtimeChannel do
case PostgresCdc.after_connect(module, response, postgres_extension, pg_change_params) do
{:ok, _response} ->
message = "Subscribed to PostgreSQL"
Logging.maybe_log_info(socket, message)
maybe_log_info(socket, message)
push_system_message("postgres_changes", socket, "ok", message, channel_name)
{:noreply, assign(socket, :pg_sub_ref, nil)}

error ->
Logging.maybe_log_warning(socket, "RealtimeDisabledForConfiguration", error)
maybe_log_warning(socket, "RealtimeDisabledForConfiguration", error)

push_system_message("postgres_changes", socket, "error", error, channel_name)
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end

nil ->
Logging.maybe_log_warning(
maybe_log_warning(
socket,
"ReconnectSubscribeToPostgres",
"Re-connecting to PostgreSQL with params: " <> inspect(pg_change_params)
Expand All @@ -295,13 +285,13 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe())}

error ->
Logging.maybe_log_error(socket, "UnableToSubscribeToPostgres", error)
maybe_log_error(socket, "UnableToSubscribeToPostgres", error)
push_system_message("postgres_changes", socket, "error", error, channel_name)
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end
rescue
error ->
log_warning("UnableToSubscribeToPostgres", error)
log_warning(socket, "UnableToSubscribeToPostgres", error)
push_system_message("postgres_changes", socket, "error", error, channel_name)
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end
Expand All @@ -319,7 +309,7 @@ defmodule RealtimeWeb.RealtimeChannel do
shutdown_response(socket, msg)

{:error, error} ->
shutdown_response(socket, to_log(error))
shutdown_response(socket, Realtime.Logs.to_log(error))
end
end

Expand All @@ -329,7 +319,16 @@ defmodule RealtimeWeb.RealtimeChannel do
{:stop, :shutdown, socket}
end

def handle_info(:sync_presence, %{assigns: %{presence_enabled?: true}} = socket), do: PresenceHandler.sync(socket)
def handle_info(:sync_presence, %{assigns: %{presence_enabled?: true}} = socket) do
case PresenceHandler.sync(socket) do
:ok ->
{:noreply, socket}

{:error, :rate_limit_exceeded} ->
shutdown_response(socket, "Too many presence messages per second")
end
end

def handle_info(:sync_presence, socket), do: {:noreply, socket}
def handle_info(_, socket), do: {:noreply, socket}

Expand All @@ -341,7 +340,7 @@ defmodule RealtimeWeb.RealtimeChannel do
BroadcastHandler.handle(payload, db_conn, socket)
else
{:error, error} ->
log_error("UnableToHandleBroadcast", error)
log_error(socket, "UnableToHandleBroadcast", error)
{:noreply, socket}
end
end
Expand All @@ -353,17 +352,30 @@ defmodule RealtimeWeb.RealtimeChannel do
def handle_in("presence", payload, %{assigns: %{private?: true}} = socket) do
%{tenant: tenant_id} = socket.assigns

with {:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id) do
PresenceHandler.handle(payload, db_conn, socket)
with {:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
{:ok, socket} <- PresenceHandler.handle(payload, db_conn, socket) do
{:reply, :ok, socket}
else
{:error, :rate_limit_exceeded} ->
shutdown_response(socket, "Too many presence messages per second")

{:error, error} ->
log_error("UnableToHandlePresence", error)
{:noreply, socket}
log_error(socket, "UnableToHandlePresence", error)
{:reply, :error, socket}
end
end

def handle_in("presence", payload, %{assigns: %{private?: false}} = socket) do
PresenceHandler.handle(payload, socket)
with {:ok, socket} <- PresenceHandler.handle(payload, socket) do
{:reply, :ok, socket}
else
{:error, :rate_limit_exceeded} ->
shutdown_response(socket, "Too many presence messages per second")

{:error, error} ->
log_error(socket, "UnableToHandlePresence", error)
{:reply, :error, socket}
end
end

def handle_in("access_token", %{"access_token" => "sb_" <> _}, socket) do
Expand Down Expand Up @@ -476,7 +488,7 @@ defmodule RealtimeWeb.RealtimeChannel do
{:error, :too_many_joins}

error ->
Logging.log_error(socket, "UnknownErrorOnCounter", error)
log_error(socket, "UnknownErrorOnCounter", error)
{:error, error}
end
end
Expand Down Expand Up @@ -553,7 +565,7 @@ defmodule RealtimeWeb.RealtimeChannel do
assign(socket, :access_token, tenant_token)
end

defp confirm_token(%{assigns: assigns} = socket) do
defp confirm_token(%{assigns: assigns}) do
%{jwt_secret: jwt_secret, access_token: access_token} = assigns

jwt_jwks = Map.get(assigns, :jwt_jwks)
Expand Down Expand Up @@ -586,7 +598,7 @@ defmodule RealtimeWeb.RealtimeChannel do
defp shutdown_response(socket, message) when is_binary(message) do
%{assigns: %{channel_name: channel_name}} = socket
push_system_message("system", socket, "error", message, channel_name)
Logging.maybe_log_warning(socket, "ChannelShutdown", message)
maybe_log_warning(socket, "ChannelShutdown", message)
{:stop, :normal, socket}
end

Expand Down Expand Up @@ -725,7 +737,7 @@ defmodule RealtimeWeb.RealtimeChannel do
{:error, :increase_connection_pool}

{:error, :rls_policy_error, error} ->
log_error("RlsPolicyError", error)
log_error(socket, "RlsPolicyError", error)

{:error, :unauthorized, "You do not have permissions to read from this Channel topic: #{topic}"}

Expand Down
Loading
Loading