Skip to content

Commit b428c5a

Browse files
committed
feat: port Realtime V2 serializer from supabase-js PRs #1829, #1894
This implements the Realtime V2 serializer based on supabase-js PRs #1829 and #1894. Key features: - Binary payload support for user messages - Two new message types: user broadcast and user broadcast push - Optional metadata support for user broadcast push messages - Reduced JSON encoding overhead on the server side - Backward compatible with V1 (1.0.0) as default Changes: - Added Serializer class with binary encoding/decoding support - Updated types.py to add VSN constants (VSN_1_0_0, VSN_2_0_0, DEFAULT_VSN) - Updated AsyncRealtimeClient to support vsn parameter and serializer selection - Added comprehensive test suite with 16 tests covering encoding/decoding scenarios - Metadata filtering based on allowed_metadata_keys parameter References: - supabase/supabase-js#1829 - supabase/supabase-js#1894
1 parent 8757f23 commit b428c5a

File tree

4 files changed

+888
-6
lines changed

4 files changed

+888
-6
lines changed

src/realtime/src/realtime/_async/client.py

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,15 @@
1414
from ..exceptions import NotConnectedError
1515
from ..message import Message, ServerMessageAdapter
1616
from ..transformers import http_endpoint_url
17+
from ..serializer import Serializer
1718
from ..types import (
1819
DEFAULT_HEARTBEAT_INTERVAL,
1920
DEFAULT_TIMEOUT,
21+
DEFAULT_VSN,
2022
PHOENIX_CHANNEL,
2123
VSN,
24+
VSN_1_0_0,
25+
VSN_2_0_0,
2226
ChannelEvents,
2327
)
2428
from ..utils import is_ws_url
@@ -48,6 +52,8 @@ def __init__(
4852
max_retries: int = 5,
4953
initial_backoff: float = 1.0,
5054
timeout: int = DEFAULT_TIMEOUT,
55+
vsn: str = DEFAULT_VSN,
56+
allowed_metadata_keys: Optional[List[str]] = None,
5157
) -> None:
5258
"""
5359
Initialize a RealtimeClient instance for WebSocket communication.
@@ -61,6 +67,9 @@ def __init__(
6167
:param max_retries: Maximum number of reconnection attempts. Defaults to 5.
6268
:param initial_backoff: Initial backoff time (in seconds) for reconnection attempts. Defaults to 1.0.
6369
:param timeout: Connection timeout in seconds. Defaults to DEFAULT_TIMEOUT.
70+
:param vsn: Serializer version to use. Defaults to "1.0.0". Use "2.0.0" for binary support.
71+
:param allowed_metadata_keys: List of metadata keys allowed in user broadcast push messages.
72+
Only used with VSN 2.0.0. Defaults to None.
6473
"""
6574
if not is_ws_url(url):
6675
raise ValueError("url must be a valid WebSocket URL or HTTP URL string")
@@ -80,8 +89,17 @@ def __init__(
8089
self.max_retries = max_retries
8190
self.initial_backoff = initial_backoff
8291
self.timeout = timeout
92+
self.vsn = vsn
8393
self._listen_task: Optional[asyncio.Task] = None
8494
self._heartbeat_task: Optional[asyncio.Task] = None
95+
96+
# Initialize serializer based on version
97+
if vsn == VSN_2_0_0:
98+
self.serializer = Serializer(allowed_metadata_keys=allowed_metadata_keys)
99+
elif vsn == VSN_1_0_0:
100+
self.serializer = None # V1 uses JSON directly
101+
else:
102+
raise ValueError(f"Unsupported serializer version: {vsn}")
85103

86104
@property
87105
def is_connected(self) -> bool:
@@ -101,7 +119,19 @@ async def _listen(self) -> None:
101119
logger.info(f"receive: {msg!r}")
102120

103121
try:
104-
message = ServerMessageAdapter.validate_json(msg)
122+
# Handle binary messages for V2
123+
if isinstance(msg, bytes) and self.vsn == VSN_2_0_0 and self.serializer:
124+
decoded = self.serializer.decode(msg)
125+
# Convert decoded message to JSON string for validation
126+
msg_json = json.dumps({
127+
"event": decoded.get("event"),
128+
"topic": decoded.get("topic"),
129+
"payload": decoded.get("payload"),
130+
"ref": decoded.get("ref"),
131+
})
132+
message = ServerMessageAdapter.validate_json(msg_json)
133+
else:
134+
message = ServerMessageAdapter.validate_json(msg)
105135
except ValidationError as e:
106136
logger.error(f"Unrecognized message format {msg!r}\n{e}")
107137
continue
@@ -343,15 +373,35 @@ async def send(self, message: Union[Message, Dict[str, Any]]) -> None:
343373
"Warning: calling AsyncRealtimeClient.send with a dictionary is deprecated. Please call it with a Message object instead. This will be a hard error in the future."
344374
)
345375
msg = Message(**message)
346-
message_str = msg.model_dump_json()
347-
logger.info(f"send: {message_str}")
376+
377+
# Encode message based on serializer version
378+
if self.vsn == VSN_2_0_0 and self.serializer:
379+
# Convert Message to dict for serializer
380+
msg_dict = {
381+
"join_ref": msg.join_ref,
382+
"ref": msg.ref,
383+
"topic": msg.topic,
384+
"event": msg.event,
385+
"payload": msg.payload,
386+
}
387+
encoded = self.serializer.encode(msg_dict)
388+
if isinstance(encoded, bytes):
389+
message_data = encoded
390+
logger.info(f"send (binary): {len(message_data)} bytes")
391+
else:
392+
message_data = encoded
393+
logger.info(f"send: {message_data}")
394+
else:
395+
# V1: JSON encoding
396+
message_data = msg.model_dump_json()
397+
logger.info(f"send: {message_data}")
348398

349399
async def send_message():
350400
if not self._ws_connection:
351401
raise NotConnectedError("_send")
352402

353403
try:
354-
await self._ws_connection.send(message_str)
404+
await self._ws_connection.send(message_data)
355405
except websockets.exceptions.ConnectionClosedError as e:
356406
await self._on_connect_error(e)
357407
except websockets.exceptions.ConnectionClosedOK:
@@ -374,7 +424,7 @@ async def _leave_open_topic(self, topic: str):
374424

375425
def endpoint_url(self) -> str:
376426
parsed_url = urlparse(self.url)
377-
query = urlencode({**self.params, "vsn": VSN}, doseq=True)
427+
query = urlencode({**self.params, "vsn": self.vsn}, doseq=True)
378428
return urlunparse(
379429
(
380430
parsed_url.scheme,

0 commit comments

Comments
 (0)