Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 43 additions & 11 deletions src/socketio/async_redis_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import pickle
from urllib.parse import urlparse

try: # pragma: no cover
from redis import asyncio as aioredis
Expand All @@ -12,6 +13,13 @@
aioredis = None
RedisError = None

try: # pragma: no cover
from valkey import asyncio as valkey
from valkey.exceptions import ValkeyError
except ImportError: # pragma: no cover
valkey = None
ValkeyError = None

from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url

Expand Down Expand Up @@ -47,38 +55,61 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover

def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
if aioredis is None:
if aioredis is None and valkey is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your virtualenv).')
if not hasattr(aioredis.Redis, 'from_url'):
'(Run "pip install redis" or '
'"pip install valkey" '
'in your virtualenv).')
if aioredis and not hasattr(aioredis.Redis, 'from_url'):
raise RuntimeError('Version 2 of aioredis package is required.')
super().__init__(channel=channel, write_only=write_only, logger=logger)
self.redis_url = url
self.redis_options = redis_options or {}
self._redis_connect()

def _get_redis_module_and_error(self):
parsed_url = urlparse(self.redis_url)
schema = parsed_url.scheme.split('+', 1)[0].lower()
if schema == 'redis':
if aioredis is None or RedisError is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" '
'in your virtualenv).')
return aioredis, RedisError
if schema == 'valkey':
if valkey is None or ValkeyError is None:
raise RuntimeError('Valkey package is not installed '
'(Run "pip install valkey" '
'in your virtualenv).')
return valkey, ValkeyError
error_msg = f'Unsupported Redis URL schema: {schema}'
raise ValueError(error_msg)

def _redis_connect(self):
if not self.redis_url.startswith('redis+sentinel://'):
self.redis = aioredis.Redis.from_url(self.redis_url,
**self.redis_options)
else:
module, _ = self._get_redis_module_and_error()
parsed_url = urlparse(self.redis_url)
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
sentinels, service_name, connection_kwargs = \
parse_redis_sentinel_url(self.redis_url)
kwargs = self.redis_options
kwargs.update(connection_kwargs)
sentinel = aioredis.sentinel.Sentinel(sentinels, **kwargs)
sentinel = module.sentinel.Sentinel(sentinels, **kwargs)
self.redis = sentinel.master_for(service_name or self.channel)
else:
self.redis = module.Redis.from_url(self.redis_url,
**self.redis_options)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)

async def _publish(self, data):
retry = True
_, error = self._get_redis_module_and_error()
while True:
try:
if not retry:
self._redis_connect()
return await self.redis.publish(
self.channel, pickle.dumps(data))
except RedisError as exc:
except error as exc:
if retry:
self._get_logger().error(
'Cannot publish to redis... '
Expand All @@ -96,6 +127,7 @@ async def _publish(self, data):
async def _redis_listen_with_retries(self):
retry_sleep = 1
connect = False
_, error = self._get_redis_module_and_error()
while True:
try:
if connect:
Expand All @@ -104,10 +136,10 @@ async def _redis_listen_with_retries(self):
retry_sleep = 1
async for message in self.pubsub.listen():
yield message
except RedisError as exc:
except error as exc:
self._get_logger().error('Cannot receive from redis... '
'retrying in '
'{} secs'.format(retry_sleep),
f'{retry_sleep} secs',
extra={"redis_exception": str(exc)})
connect = True
await asyncio.sleep(retry_sleep)
Expand Down
56 changes: 44 additions & 12 deletions src/socketio/redis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@

try:
import redis
from redis.exceptions import RedisError
except ImportError:
redis = None
RedisError = None

try:
import valkey
from valkey.exceptions import ValkeyError
except ImportError:
valkey = None
ValkeyError = None

from .pubsub_manager import PubSubManager

Expand All @@ -18,7 +27,7 @@ def parse_redis_sentinel_url(url):
redis+sentinel://[:password]@host1:port1,host2:port2,.../db/service_name
"""
parsed_url = urlparse(url)
if parsed_url.scheme != 'redis+sentinel':
if parsed_url.scheme not in {'redis+sentinel', 'valkey+sentinel'}:
raise ValueError('Invalid Redis Sentinel URL')
sentinels = []
for host_port in parsed_url.netloc.split('@')[-1].split(','):
Expand Down Expand Up @@ -71,10 +80,11 @@ class RedisManager(PubSubManager): # pragma: no cover

def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
if redis is None:
if redis is None and valkey is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your '
'virtualenv).')
'(Run "pip install redis" '
'or "pip install valkey" '
'in your virtualenv).')
super().__init__(channel=channel, write_only=write_only, logger=logger)
self.redis_url = url
self.redis_options = redis_options or {}
Expand All @@ -95,27 +105,48 @@ def initialize(self):
'Redis requires a monkey patched socket library to work '
'with ' + self.server.async_mode)

def _get_redis_module_and_error(self):
parsed_url = urlparse(self.redis_url)
schema = parsed_url.scheme.split('+', 1)[0].lower()
if schema == 'redis':
if redis is None or RedisError is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" '
'in your virtualenv).')
return redis, RedisError
if schema == 'valkey':
if valkey is None or ValkeyError is None:
raise RuntimeError('Valkey package is not installed '
'(Run "pip install valkey" '
'in your virtualenv).')
return valkey, ValkeyError
error_msg = f'Unsupported Redis URL schema: {schema}'
raise ValueError(error_msg)

def _redis_connect(self):
if not self.redis_url.startswith('redis+sentinel://'):
self.redis = redis.Redis.from_url(self.redis_url,
**self.redis_options)
else:
module, _ = self._get_redis_module_and_error()
parsed_url = urlparse(self.redis_url)
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
sentinels, service_name, connection_kwargs = \
parse_redis_sentinel_url(self.redis_url)
kwargs = self.redis_options
kwargs.update(connection_kwargs)
sentinel = redis.sentinel.Sentinel(sentinels, **kwargs)
sentinel = module.sentinel.Sentinel(sentinels, **kwargs)
self.redis = sentinel.master_for(service_name or self.channel)
else:
self.redis = module.Redis.from_url(self.redis_url,
**self.redis_options)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)

def _publish(self, data):
retry = True
_, error = self._get_redis_module_and_error()
while True:
try:
if not retry:
self._redis_connect()
return self.redis.publish(self.channel, pickle.dumps(data))
except redis.exceptions.RedisError as exc:
except error as exc:
if retry:
logger.error(
'Cannot publish to redis... retrying',
Expand All @@ -132,16 +163,17 @@ def _publish(self, data):
def _redis_listen_with_retries(self):
retry_sleep = 1
connect = False
_, error = self._get_redis_module_and_error()
while True:
try:
if connect:
self._redis_connect()
self.pubsub.subscribe(self.channel)
retry_sleep = 1
yield from self.pubsub.listen()
except redis.exceptions.RedisError as exc:
except error as exc:
logger.error('Cannot receive from redis... '
'retrying in {} secs'.format(retry_sleep),
f'retrying in {retry_sleep} secs',
extra={"redis_exception": str(exc)})
connect = True
time.sleep(retry_sleep)
Expand Down
13 changes: 7 additions & 6 deletions tests/common/test_redis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,34 @@


class TestPubSubManager:
def test_sentinel_url_parser(self):
@pytest.mark.parametrize('rtype', ['redis', 'valkey'])
def test_sentinel_url_parser(self, rtype):
with pytest.raises(ValueError):
parse_redis_sentinel_url('redis://localhost:6379/0')
parse_redis_sentinel_url(f'{rtype}://localhost:6379/0')

assert parse_redis_sentinel_url(
'redis+sentinel://localhost:6379'
f'{rtype}+sentinel://localhost:6379'
) == (
[('localhost', 6379)],
None,
{}
)
assert parse_redis_sentinel_url(
'redis+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
f'{rtype}+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
) == (
[('192.168.0.1', 6379), ('192.168.0.2', 6379)],
None,
{}
)
assert parse_redis_sentinel_url(
'redis+sentinel://h1:6379,h2:6379/0'
f'{rtype}+sentinel://h1:6379,h2:6379/0'
) == (
[('h1', 6379), ('h2', 6379)],
None,
{'db': 0}
)
assert parse_redis_sentinel_url(
'redis+sentinel://user:password@h1:6379,h2:6379,h1:6380/0/myredis'
f'{rtype}+sentinel://user:password@h1:6379,h2:6379,h1:6380/0/myredis'
) == (
[('h1', 6379), ('h2', 6379), ('h1', 6380)],
'myredis',
Expand Down
Loading