From 7850f4ce301448e4376369a44efb14a21d4bb3cd Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Mon, 9 Jun 2025 14:34:01 +0200 Subject: [PATCH 1/2] Add support for websocket proxy --- examples/nginx.conf | 44 ++++++++++++++++++++ examples/websocket_proxy.py | 81 +++++++++++++++++++++++++++++++++++++ nats/aio/client.py | 15 ++++++- nats/aio/transport.py | 25 ++++++++++-- 4 files changed, 160 insertions(+), 5 deletions(-) create mode 100644 examples/nginx.conf create mode 100644 examples/websocket_proxy.py diff --git a/examples/nginx.conf b/examples/nginx.conf new file mode 100644 index 00000000..ae7c6c32 --- /dev/null +++ b/examples/nginx.conf @@ -0,0 +1,44 @@ +daemon off; + +events { + worker_connections 1024; +} + +http { + access_log /dev/stdout; + error_log /dev/stderr; + + upstream backend { + server 127.0.0.1:8080; # Your WebSocket server + } + + # Map to handle WebSocket upgrade + map $http_upgrade $connection_upgrade { + default upgrade; + '' close; + } + + server { + listen 8888; + server_name localhost; + + # Handle both HTTP and WebSocket at root + location / { + proxy_pass http://backend; + proxy_http_version 1.1; + + # WebSocket headers + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection $connection_upgrade; + + # Standard proxy headers + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # Optional: Increase timeouts for long-lived WebSocket connections + proxy_read_timeout 86400; + } + } +} diff --git a/examples/websocket_proxy.py b/examples/websocket_proxy.py new file mode 100644 index 00000000..38ad0a6b --- /dev/null +++ b/examples/websocket_proxy.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 + +""" +NATS WebSocket proxy example. + +Usage: + python websocket_proxy_example.py [--proxy http://localhost:8888] [--proxy-user user] [--proxy-password pass] +""" + +import asyncio +import argparse +import os +import nats + + +async def main(): + """Connect to NATS WebSocket server with optional proxy""" + + # Parse command line arguments + parser = argparse.ArgumentParser(description="NATS WebSocket Proxy Example") + parser.add_argument("--server", default="ws://localhost:8080", + help="NATS WebSocket server URL (default: ws://localhost:8080)") + parser.add_argument("--proxy", help="HTTP proxy URL (e.g., http://localhost:8888)") + parser.add_argument("--proxy-user", help="Proxy username for authentication") + parser.add_argument("--proxy-password", help="Proxy password for authentication") + + args = parser.parse_args() + + # Build connection options + connect_options = { + "servers": [args.server] + } + + if args.proxy: + connect_options["proxy"] = args.proxy + print(f"Proxy: {args.proxy}") + + if args.proxy_user and args.proxy_password: + connect_options["proxy_user"] = args.proxy_user + connect_options["proxy_password"] = args.proxy_password + print(f"Auth: {args.proxy_user}") + elif args.proxy_user or args.proxy_password: + print("Error: Both user and password required") + return 1 + + try: + print(f"Connecting to {args.server}...") + nc = await nats.connect(**connect_options) + print("Connected") + + # Test pub/sub + messages = [] + + async def handler(msg): + messages.append(msg.data.decode()) + + await nc.subscribe("test", cb=handler) + await nc.flush() + + # Send test message + for i in range(10): + await nc.publish("test", f"hello {i}".encode()) + await nc.flush() + await asyncio.sleep(0.1) + + expected = [f"hello {i}" for i in range(10)] + assert messages == expected, f"Expected {expected}, got {messages}" + print(f"Success: All {len(messages)} messages received correctly") + + await nc.close() + + except Exception as e: + print(f"Failed: {e}") + return 1 + + return 0 + + +if __name__ == "__main__": + exit_code = asyncio.run(main()) + exit(exit_code) diff --git a/nats/aio/client.py b/nats/aio/client.py index 25253bc6..c6902b5a 100644 --- a/nats/aio/client.py +++ b/nats/aio/client.py @@ -358,6 +358,9 @@ async def connect( inbox_prefix: Union[str, bytes] = DEFAULT_INBOX_PREFIX, pending_size: int = DEFAULT_PENDING_SIZE, flush_timeout: Optional[float] = None, + proxy: Optional[str] = None, + proxy_user: Optional[str] = None, + proxy_password: Optional[str] = None, ) -> None: """ Establishes a connection to NATS. @@ -370,6 +373,9 @@ async def connect( :param discovered_server_cb: Callback to report when a new server joins the cluster. :param pending_size: Max size of the pending buffer for publishing commands. :param flush_timeout: Max duration to wait for a forced flush to occur. + :param proxy: Proxy URL for WebSocket connections (e.g., 'http://proxy.example.com:8080') + :param proxy_user: Username for proxy authentication + :param proxy_password: Password for proxy authentication Connecting setting all callbacks:: @@ -495,6 +501,9 @@ async def subscribe_handler(msg): self.options["connect_timeout"] = connect_timeout self.options["drain_timeout"] = drain_timeout self.options["tls_handshake_first"] = tls_handshake_first + self.options["proxy"] = proxy + self.options["proxy_user"] = proxy_user + self.options["proxy_password"] = proxy_password if tls: self.options["tls"] = tls @@ -1380,7 +1389,11 @@ async def _select_next_server(self) -> None: s.last_attempt = time.monotonic() if not self._transport: if s.uri.scheme in ("ws", "wss"): - self._transport = WebSocketTransport() + self._transport = WebSocketTransport( + proxy=self.options.get("proxy"), + proxy_user=self.options.get("proxy_user"), + proxy_password=self.options.get("proxy_password"), + ) else: # use TcpTransport as a fallback self._transport = TcpTransport() diff --git a/nats/aio/transport.py b/nats/aio/transport.py index 45de6443..e1fa5bd8 100644 --- a/nats/aio/transport.py +++ b/nats/aio/transport.py @@ -8,8 +8,10 @@ try: import aiohttp + from aiohttp import BasicAuth except ImportError: aiohttp = None # type: ignore[assignment] + BasicAuth = None # type: ignore[assignment] from nats.errors import ProtocolError @@ -192,12 +194,16 @@ def __bool__(self): class WebSocketTransport(Transport): - def __init__(self): + def __init__(self, proxy: Optional[str] = None, proxy_user: Optional[str] = None, proxy_password: Optional[str] = None): if not aiohttp: raise ImportError( "Could not import aiohttp transport, please install it with `pip install aiohttp`" ) self._ws: Optional[aiohttp.ClientWebSocketResponse] = None + self._proxy = proxy + self._proxy_auth = None + if proxy_user and proxy_password: + self._proxy_auth = BasicAuth(proxy_user, proxy_password) self._client: aiohttp.ClientSession = aiohttp.ClientSession() self._pending = asyncio.Queue() self._close_task = asyncio.Future() @@ -207,8 +213,14 @@ async def connect( self, uri: ParseResult, buffer_size: int, connect_timeout: int ): # for websocket library, the uri must contain the scheme already + kwargs = {"timeout": connect_timeout} + if self._proxy: + kwargs["proxy"] = self._proxy + if self._proxy_auth: + kwargs["proxy_auth"] = self._proxy_auth + self._ws = await self._client.ws_connect( - uri.geturl(), timeout=connect_timeout + uri.geturl(), **kwargs ) self._using_tls = False @@ -224,10 +236,15 @@ async def connect_tls( return raise ProtocolError("ws: cannot upgrade to TLS") + kwargs = {"ssl": ssl_context, "timeout": connect_timeout} + if self._proxy: + kwargs["proxy"] = self._proxy + if self._proxy_auth: + kwargs["proxy_auth"] = self._proxy_auth + self._ws = await self._client.ws_connect( uri if isinstance(uri, str) else uri.geturl(), - ssl=ssl_context, - timeout=connect_timeout, + **kwargs ) self._using_tls = True From 936a62030d6da3b924e98dd55c0a51f4e6b1ed11 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Mon, 9 Jun 2025 14:43:22 +0200 Subject: [PATCH 2/2] Format --- examples/websocket_proxy.py | 1 - nats/aio/transport.py | 18 ++++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/examples/websocket_proxy.py b/examples/websocket_proxy.py index 38ad0a6b..e215b52e 100644 --- a/examples/websocket_proxy.py +++ b/examples/websocket_proxy.py @@ -9,7 +9,6 @@ import asyncio import argparse -import os import nats diff --git a/nats/aio/transport.py b/nats/aio/transport.py index e1fa5bd8..ced3de0d 100644 --- a/nats/aio/transport.py +++ b/nats/aio/transport.py @@ -194,7 +194,12 @@ def __bool__(self): class WebSocketTransport(Transport): - def __init__(self, proxy: Optional[str] = None, proxy_user: Optional[str] = None, proxy_password: Optional[str] = None): + def __init__( + self, + proxy: Optional[str] = None, + proxy_user: Optional[str] = None, + proxy_password: Optional[str] = None + ): if not aiohttp: raise ImportError( "Could not import aiohttp transport, please install it with `pip install aiohttp`" @@ -218,10 +223,8 @@ async def connect( kwargs["proxy"] = self._proxy if self._proxy_auth: kwargs["proxy_auth"] = self._proxy_auth - - self._ws = await self._client.ws_connect( - uri.geturl(), **kwargs - ) + + self._ws = await self._client.ws_connect(uri.geturl(), **kwargs) self._using_tls = False async def connect_tls( @@ -241,10 +244,9 @@ async def connect_tls( kwargs["proxy"] = self._proxy if self._proxy_auth: kwargs["proxy_auth"] = self._proxy_auth - + self._ws = await self._client.ws_connect( - uri if isinstance(uri, str) else uri.geturl(), - **kwargs + uri if isinstance(uri, str) else uri.geturl(), **kwargs ) self._using_tls = True