Skip to content

Commit f57a3a9

Browse files
authored
Merge pull request #62 from taskiq-python/bugfix/cluster-source
2 parents dce4dac + 63e99c1 commit f57a3a9

File tree

2 files changed

+23
-16
lines changed

2 files changed

+23
-16
lines changed

taskiq_redis/schedule_source.py

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ def __init__(
117117
self,
118118
url: str,
119119
prefix: str = "schedule",
120-
buffer_size: int = 50,
121120
serializer: Optional[TaskiqSerializer] = None,
122121
**connection_kwargs: Any,
123122
) -> None:
@@ -126,7 +125,6 @@ def __init__(
126125
url,
127126
**connection_kwargs,
128127
)
129-
self.buffer_size = buffer_size
130128
if serializer is None:
131129
serializer = PickleSerializer()
132130
self.serializer = serializer
@@ -156,19 +154,14 @@ async def get_schedules(self) -> List[ScheduledTask]:
156154
:return: list of schedules.
157155
"""
158156
schedules = []
159-
buffer = []
160157
async for key in self.redis.scan_iter(f"{self.prefix}:*"): # type: ignore[attr-defined]
161-
buffer.append(key)
162-
if len(buffer) >= self.buffer_size:
163-
schedules.extend(await self.redis.mget(buffer)) # type: ignore[attr-defined]
164-
buffer = []
165-
if buffer:
166-
schedules.extend(await self.redis.mget(buffer)) # type: ignore[attr-defined]
167-
return [
168-
model_validate(ScheduledTask, self.serializer.loadb(schedule))
169-
for schedule in schedules
170-
if schedule
171-
]
158+
raw_schedule = await self.redis.get(key) # type: ignore[attr-defined]
159+
parsed_schedule = model_validate(
160+
ScheduledTask,
161+
self.serializer.loadb(raw_schedule),
162+
)
163+
schedules.append(parsed_schedule)
164+
return schedules
172165

173166
async def post_send(self, task: ScheduledTask) -> None:
174167
"""Delete a task after it's completed."""

tests/test_schedule_source.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,17 +196,31 @@ async def test_cluster_post_run_time(redis_cluster_url: str) -> None:
196196

197197

198198
@pytest.mark.anyio
199-
async def test_cluster_buffer(redis_cluster_url: str) -> None:
199+
async def test_cluster_get_schedules(redis_cluster_url: str) -> None:
200+
"""
201+
Test of a redis cluster source.
202+
203+
This test checks that if the schedules are located on different nodes,
204+
the source will still be able to get them all.
205+
206+
To simulate this we set a specific shard key for each schedule.
207+
The shard keys are from this gist:
208+
209+
https://gist.githubusercontent.com/dvirsky/93f43277317f629bb06e858946416f7e/raw/b0438faf6f5a0020c12a0730f6cd6ac4bdc4b171/crc16_slottable.h
210+
211+
"""
200212
prefix = uuid.uuid4().hex
201-
source = RedisClusterScheduleSource(redis_cluster_url, prefix=prefix, buffer_size=1)
213+
source = RedisClusterScheduleSource(redis_cluster_url, prefix=prefix)
202214
schedule1 = ScheduledTask(
215+
schedule_id=r"id-{06S}",
203216
task_name="test_task1",
204217
labels={},
205218
args=[],
206219
kwargs={},
207220
cron="* * * * *",
208221
)
209222
schedule2 = ScheduledTask(
223+
schedule_id=r"id-{4Rs}",
210224
task_name="test_task2",
211225
labels={},
212226
args=[],

0 commit comments

Comments
 (0)