From 79a432185033299061ac2645d845a11a737451c4 Mon Sep 17 00:00:00 2001 From: phi Date: Thu, 21 Aug 2025 21:53:14 +0900 Subject: [PATCH 1/4] feat: support valkey --- src/socketio/async_redis_manager.py | 53 +++++++++++++++++++++++------ src/socketio/redis_manager.py | 53 ++++++++++++++++++++++------- 2 files changed, 83 insertions(+), 23 deletions(-) diff --git a/src/socketio/async_redis_manager.py b/src/socketio/async_redis_manager.py index 41ce2cea..b91eeb8f 100644 --- a/src/socketio/async_redis_manager.py +++ b/src/socketio/async_redis_manager.py @@ -1,5 +1,6 @@ import asyncio import pickle +from urllib.parse import urlparse try: # pragma: no cover from redis import asyncio as aioredis @@ -12,6 +13,15 @@ aioredis = None RedisError = None +try: # pragma: no cover + from valkey import asyncio as valkey + from valkey.exceptions import ValkeyError +except ImportError: # pragma: no cover + valkey = None + ValkeyError = None + + + from .async_pubsub_manager import AsyncPubSubManager from .redis_manager import parse_redis_sentinel_url @@ -47,38 +57,58 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False, logger=None, redis_options=None): - if aioredis is None: + if aioredis is None and valkey is None: raise RuntimeError('Redis package is not installed ' - '(Run "pip install redis" in your virtualenv).') - if not hasattr(aioredis.Redis, 'from_url'): + '(Run "pip install redis" or "pip install valkey" ' + 'in your virtualenv).') + if aioredis and not hasattr(aioredis.Redis, 'from_url'): raise RuntimeError('Version 2 of aioredis package is required.') super().__init__(channel=channel, write_only=write_only, logger=logger) self.redis_url = url self.redis_options = redis_options or {} self._redis_connect() + def _get_redis_module_and_error(self): + parsed_url = urlparse(self.redis_url) + schema = parsed_url.scheme.split('+', 1)[0].lower() + if schema == 'redis': + if aioredis is None or RedisError is None: + raise RuntimeError('Redis package is not installed ' + '(Run "pip install redis" in your virtualenv).') + return aioredis, RedisError + if schema == 'valkey': + if valkey is None or ValkeyError is None: + raise RuntimeError('Valkey package is not installed ' + '(Run "pip install valkey" in your virtualenv).') + return valkey, ValkeyError + error_msg = f'Unsupported Redis URL schema: {schema}' + raise ValueError(error_msg) + def _redis_connect(self): - if not self.redis_url.startswith('redis+sentinel://'): - self.redis = aioredis.Redis.from_url(self.redis_url, - **self.redis_options) - else: + module, _ = self._get_redis_module_and_error() + parsed_url = urlparse(self.redis_url) + if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}: sentinels, service_name, connection_kwargs = \ parse_redis_sentinel_url(self.redis_url) kwargs = self.redis_options kwargs.update(connection_kwargs) - sentinel = aioredis.sentinel.Sentinel(sentinels, **kwargs) + sentinel = module.sentinel.Sentinel(sentinels, **kwargs) self.redis = sentinel.master_for(service_name or self.channel) + else: + self.redis = module.Redis.from_url(self.redis_url, + **self.redis_options) self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) async def _publish(self, data): retry = True + _, error = self._get_redis_module_and_error() while True: try: if not retry: self._redis_connect() return await self.redis.publish( self.channel, pickle.dumps(data)) - except RedisError as exc: + except error as exc: if retry: self._get_logger().error( 'Cannot publish to redis... ' @@ -96,6 +126,7 @@ async def _publish(self, data): async def _redis_listen_with_retries(self): retry_sleep = 1 connect = False + _, error = self._get_redis_module_and_error() while True: try: if connect: @@ -104,10 +135,10 @@ async def _redis_listen_with_retries(self): retry_sleep = 1 async for message in self.pubsub.listen(): yield message - except RedisError as exc: + except error as exc: self._get_logger().error('Cannot receive from redis... ' 'retrying in ' - '{} secs'.format(retry_sleep), + f'{retry_sleep} secs', extra={"redis_exception": str(exc)}) connect = True await asyncio.sleep(retry_sleep) diff --git a/src/socketio/redis_manager.py b/src/socketio/redis_manager.py index 73758fce..a0ef30ea 100644 --- a/src/socketio/redis_manager.py +++ b/src/socketio/redis_manager.py @@ -5,8 +5,17 @@ try: import redis + from redis.exceptions import RedisError except ImportError: redis = None + RedisError = None + +try: + import valkey + from valkey.exceptions import ValkeyError +except ImportError: + valkey = None + ValkeyError = None from .pubsub_manager import PubSubManager @@ -18,7 +27,7 @@ def parse_redis_sentinel_url(url): redis+sentinel://[:password]@host1:port1,host2:port2,.../db/service_name """ parsed_url = urlparse(url) - if parsed_url.scheme != 'redis+sentinel': + if parsed_url.scheme not in {'redis+sentinel', 'valkey+sentinel'}: raise ValueError('Invalid Redis Sentinel URL') sentinels = [] for host_port in parsed_url.netloc.split('@')[-1].split(','): @@ -71,10 +80,10 @@ class RedisManager(PubSubManager): # pragma: no cover def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False, logger=None, redis_options=None): - if redis is None: + if redis is None and valkey is None: raise RuntimeError('Redis package is not installed ' - '(Run "pip install redis" in your ' - 'virtualenv).') + '(Run "pip install redis" or "pip install valkey" ' + 'in your virtualenv).') super().__init__(channel=channel, write_only=write_only, logger=logger) self.redis_url = url self.redis_options = redis_options or {} @@ -95,27 +104,46 @@ def initialize(self): 'Redis requires a monkey patched socket library to work ' 'with ' + self.server.async_mode) + def _get_redis_module_and_error(self): + parsed_url = urlparse(self.redis_url) + schema = parsed_url.scheme.split('+', 1)[0].lower() + if schema == 'redis': + if redis is None or RedisError is None: + raise RuntimeError('Redis package is not installed ' + '(Run "pip install redis" in your virtualenv).') + return redis, RedisError + if schema == 'valkey': + if valkey is None or ValkeyError is None: + raise RuntimeError('Valkey package is not installed ' + '(Run "pip install valkey" in your virtualenv).') + return valkey, ValkeyError + error_msg = f'Unsupported Redis URL schema: {schema}' + raise ValueError(error_msg) + def _redis_connect(self): - if not self.redis_url.startswith('redis+sentinel://'): - self.redis = redis.Redis.from_url(self.redis_url, - **self.redis_options) - else: + module, _ = self._get_redis_module_and_error() + parsed_url = urlparse(self.redis_url) + if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}: sentinels, service_name, connection_kwargs = \ parse_redis_sentinel_url(self.redis_url) kwargs = self.redis_options kwargs.update(connection_kwargs) - sentinel = redis.sentinel.Sentinel(sentinels, **kwargs) + sentinel = module.sentinel.Sentinel(sentinels, **kwargs) self.redis = sentinel.master_for(service_name or self.channel) + else: + self.redis = module.Redis.from_url(self.redis_url, + **self.redis_options) self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) def _publish(self, data): retry = True + _, error = self._get_redis_module_and_error() while True: try: if not retry: self._redis_connect() return self.redis.publish(self.channel, pickle.dumps(data)) - except redis.exceptions.RedisError as exc: + except error as exc: if retry: logger.error( 'Cannot publish to redis... retrying', @@ -132,6 +160,7 @@ def _publish(self, data): def _redis_listen_with_retries(self): retry_sleep = 1 connect = False + _, error = self._get_redis_module_and_error() while True: try: if connect: @@ -139,9 +168,9 @@ def _redis_listen_with_retries(self): self.pubsub.subscribe(self.channel) retry_sleep = 1 yield from self.pubsub.listen() - except redis.exceptions.RedisError as exc: + except error as exc: logger.error('Cannot receive from redis... ' - 'retrying in {} secs'.format(retry_sleep), + f'retrying in {retry_sleep} secs', extra={"redis_exception": str(exc)}) connect = True time.sleep(retry_sleep) From a8470ced6e09bb5af0047b4ccff76f81912222cd Mon Sep 17 00:00:00 2001 From: phi Date: Fri, 29 Aug 2025 22:15:55 +0900 Subject: [PATCH 2/4] fix: lint error --- src/socketio/async_redis_manager.py | 11 ++++++----- src/socketio/redis_manager.py | 9 ++++++--- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/socketio/async_redis_manager.py b/src/socketio/async_redis_manager.py index b91eeb8f..b37e9059 100644 --- a/src/socketio/async_redis_manager.py +++ b/src/socketio/async_redis_manager.py @@ -20,8 +20,6 @@ valkey = None ValkeyError = None - - from .async_pubsub_manager import AsyncPubSubManager from .redis_manager import parse_redis_sentinel_url @@ -59,7 +57,8 @@ def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False, logger=None, redis_options=None): if aioredis is None and valkey is None: raise RuntimeError('Redis package is not installed ' - '(Run "pip install redis" or "pip install valkey" ' + '(Run "pip install redis" or ' + '"pip install valkey" ' 'in your virtualenv).') if aioredis and not hasattr(aioredis.Redis, 'from_url'): raise RuntimeError('Version 2 of aioredis package is required.') @@ -74,12 +73,14 @@ def _get_redis_module_and_error(self): if schema == 'redis': if aioredis is None or RedisError is None: raise RuntimeError('Redis package is not installed ' - '(Run "pip install redis" in your virtualenv).') + '(Run "pip install redis" ' + 'in your virtualenv).') return aioredis, RedisError if schema == 'valkey': if valkey is None or ValkeyError is None: raise RuntimeError('Valkey package is not installed ' - '(Run "pip install valkey" in your virtualenv).') + '(Run "pip install valkey" ' + 'in your virtualenv).') return valkey, ValkeyError error_msg = f'Unsupported Redis URL schema: {schema}' raise ValueError(error_msg) diff --git a/src/socketio/redis_manager.py b/src/socketio/redis_manager.py index a0ef30ea..1d0227e2 100644 --- a/src/socketio/redis_manager.py +++ b/src/socketio/redis_manager.py @@ -82,7 +82,8 @@ def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False, logger=None, redis_options=None): if redis is None and valkey is None: raise RuntimeError('Redis package is not installed ' - '(Run "pip install redis" or "pip install valkey" ' + '(Run "pip install redis" ' + 'or "pip install valkey" ' 'in your virtualenv).') super().__init__(channel=channel, write_only=write_only, logger=logger) self.redis_url = url @@ -110,12 +111,14 @@ def _get_redis_module_and_error(self): if schema == 'redis': if redis is None or RedisError is None: raise RuntimeError('Redis package is not installed ' - '(Run "pip install redis" in your virtualenv).') + '(Run "pip install redis" ' + 'in your virtualenv).') return redis, RedisError if schema == 'valkey': if valkey is None or ValkeyError is None: raise RuntimeError('Valkey package is not installed ' - '(Run "pip install valkey" in your virtualenv).') + '(Run "pip install valkey" ' + 'in your virtualenv).') return valkey, ValkeyError error_msg = f'Unsupported Redis URL schema: {schema}' raise ValueError(error_msg) From f9cbe1dac699c6308d6666fe709a619a31bbf029 Mon Sep 17 00:00:00 2001 From: phi Date: Sat, 30 Aug 2025 17:51:12 +0900 Subject: [PATCH 3/4] test: test valkey url --- tests/common/test_redis_manager.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/common/test_redis_manager.py b/tests/common/test_redis_manager.py index 3e5ee1ef..66a667eb 100644 --- a/tests/common/test_redis_manager.py +++ b/tests/common/test_redis_manager.py @@ -4,33 +4,34 @@ class TestPubSubManager: - def test_sentinel_url_parser(self): + @pytest.mark.parametrize('rtype', ['redis', 'valkey']) + def test_sentinel_url_parser(self, rtype): with pytest.raises(ValueError): - parse_redis_sentinel_url('redis://localhost:6379/0') + parse_redis_sentinel_url(f'{rtype}://localhost:6379/0') assert parse_redis_sentinel_url( - 'redis+sentinel://localhost:6379' + f'{rtype}+sentinel://localhost:6379' ) == ( [('localhost', 6379)], None, {} ) assert parse_redis_sentinel_url( - 'redis+sentinel://192.168.0.1:6379,192.168.0.2:6379/' + f'{rtype}+sentinel://192.168.0.1:6379,192.168.0.2:6379/' ) == ( [('192.168.0.1', 6379), ('192.168.0.2', 6379)], None, {} ) assert parse_redis_sentinel_url( - 'redis+sentinel://h1:6379,h2:6379/0' + f'{rtype}+sentinel://h1:6379,h2:6379/0' ) == ( [('h1', 6379), ('h2', 6379)], None, {'db': 0} ) assert parse_redis_sentinel_url( - 'redis+sentinel://user:password@h1:6379,h2:6379,h1:6380/0/myredis' + f'{rtype}+sentinel://user:password@h1:6379,h2:6379,h1:6380/0/myredis' ) == ( [('h1', 6379), ('h2', 6379), ('h1', 6380)], 'myredis', From 774d70ff65dd168be04da2462998446fed4dc8de Mon Sep 17 00:00:00 2001 From: phi Date: Sun, 31 Aug 2025 13:20:33 +0900 Subject: [PATCH 4/4] fix: lint error --- tests/common/test_redis_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/common/test_redis_manager.py b/tests/common/test_redis_manager.py index 66a667eb..48dfb4a9 100644 --- a/tests/common/test_redis_manager.py +++ b/tests/common/test_redis_manager.py @@ -31,7 +31,8 @@ def test_sentinel_url_parser(self, rtype): {'db': 0} ) assert parse_redis_sentinel_url( - f'{rtype}+sentinel://user:password@h1:6379,h2:6379,h1:6380/0/myredis' + f'{rtype}+sentinel://' + 'user:password@h1:6379,h2:6379,h1:6380/0/myredis' ) == ( [('h1', 6379), ('h2', 6379), ('h1', 6380)], 'myredis',