Skip to content

Commit d9ae439

Browse files
committed
Updated redis broker to operate with raw bytes.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 0e45fe0 commit d9ae439

File tree

2 files changed

+38
-43
lines changed

2 files changed

+38
-43
lines changed

taskiq_redis/redis_broker.py

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import pickle
2-
from abc import abstractmethod
31
from logging import getLogger
42
from typing import Any, AsyncGenerator, Callable, Optional, TypeVar
53

@@ -52,66 +50,63 @@ async def shutdown(self) -> None:
5250
await super().shutdown()
5351
await self.connection_pool.disconnect()
5452

55-
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
56-
"""
57-
Listen redis queue for new messages.
5853

59-
This function listens to the queue
60-
and yields new messages if they have BrokerMessage type.
54+
class PubSubBroker(BaseRedisBroker):
55+
"""Broker that works with Redis and broadcasts tasks to all workers."""
6156

62-
:yields: broker messages.
63-
"""
64-
async for message in self._listen_to_raw_messages():
65-
try:
66-
redis_message = pickle.loads(message)
67-
if isinstance(redis_message, BrokerMessage):
68-
yield redis_message
69-
except (
70-
TypeError,
71-
AttributeError,
72-
pickle.UnpicklingError,
73-
) as exc:
74-
logger.debug(
75-
"Cannot read broker message %s",
76-
exc,
77-
exc_info=True,
78-
)
79-
80-
@abstractmethod
81-
async def _listen_to_raw_messages(self) -> AsyncGenerator[bytes, None]:
57+
async def kick(self, message: BrokerMessage) -> None:
8258
"""
83-
Generator for reading raw data from Redis.
59+
Publish message over PUBSUB channel.
8460
85-
:yields: raw data.
61+
:param message: message to send.
8662
"""
87-
yield # type: ignore
88-
63+
async with Redis(connection_pool=self.connection_pool) as redis_conn:
64+
await redis_conn.publish(self.queue_name, message.message)
8965

90-
class PubSubBroker(BaseRedisBroker):
91-
"""Broker that works with Redis and broadcasts tasks to all workers."""
66+
async def listen(self) -> AsyncGenerator[bytes, None]:
67+
"""
68+
Listen redis queue for new messages.
9269
93-
async def kick(self, message: BrokerMessage) -> None: # noqa: D102
94-
async with Redis(connection_pool=self.connection_pool) as redis_conn:
95-
await redis_conn.publish(self.queue_name, pickle.dumps(message))
70+
This function listens to the pubsub channel
71+
and yields all messages with proper types.
9672
97-
async def _listen_to_raw_messages(self) -> AsyncGenerator[bytes, None]:
73+
:yields: broker messages.
74+
"""
9875
async with Redis(connection_pool=self.connection_pool) as redis_conn:
9976
redis_pubsub_channel = redis_conn.pubsub()
10077
await redis_pubsub_channel.subscribe(self.queue_name)
10178
async for message in redis_pubsub_channel.listen():
10279
if not message:
10380
continue
81+
if message["type"] != "message":
82+
logger.debug("Received non-message from redis: %s", message)
83+
continue
10484
yield message["data"]
10585

10686

10787
class ListQueueBroker(BaseRedisBroker):
10888
"""Broker that works with Redis and distributes tasks between workers."""
10989

110-
async def kick(self, message: BrokerMessage) -> None: # noqa: D102
90+
async def kick(self, message: BrokerMessage) -> None:
91+
"""
92+
Put a message in a list.
93+
94+
This method appends a message to the list of all messages.
95+
96+
:param message: message to append.
97+
"""
11198
async with Redis(connection_pool=self.connection_pool) as redis_conn:
112-
await redis_conn.lpush(self.queue_name, pickle.dumps(message))
99+
await redis_conn.lpush(self.queue_name, message.message)
113100

114-
async def _listen_to_raw_messages(self) -> AsyncGenerator[bytes, None]:
101+
async def listen(self) -> AsyncGenerator[bytes, None]:
102+
"""
103+
Listen redis queue for new messages.
104+
105+
This function listens to the queue
106+
and yields new messages if they have BrokerMessage type.
107+
108+
:yields: broker messages.
109+
"""
115110
redis_brpop_data_position = 1
116111
async with Redis(connection_pool=self.connection_pool) as redis_conn:
117112
while True: # noqa: WPS457

tests/test_broker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from taskiq_redis import ListQueueBroker, PubSubBroker
88

99

10-
async def get_message(broker: AsyncBroker) -> BrokerMessage: # type: ignore
10+
async def get_message(broker: AsyncBroker) -> bytes: # type: ignore
1111
"""
1212
Get a message from the broker.
1313
@@ -56,7 +56,7 @@ async def test_pub_sub_broker(
5656

5757
message1 = worker1_task.result()
5858
message2 = worker2_task.result()
59-
assert message1 == valid_broker_message
59+
assert message1 == valid_broker_message.message
6060
assert message1 == message2
6161

6262

@@ -81,4 +81,4 @@ async def test_list_queue_broker(
8181

8282
assert worker1_task.done() != worker2_task.done()
8383
message = worker1_task.result() if worker1_task.done() else worker2_task.result()
84-
assert message == valid_broker_message
84+
assert message == valid_broker_message.message

0 commit comments

Comments
 (0)