Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.

Commit 9255c29

Browse files
committed
fix(backend/kafka): consumer unsubscribe not awaitable
1 parent aaf59e8 commit 9255c29

File tree

1 file changed

+3
-5
lines changed

1 file changed

+3
-5
lines changed

broadcaster/_backends/kafka.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
import typing
32
from urllib.parse import urlparse
43

@@ -14,9 +13,8 @@ def __init__(self, url: str):
1413
self._consumer_channels: typing.Set = set()
1514

1615
async def connect(self) -> None:
17-
loop = asyncio.get_event_loop()
18-
self._producer = AIOKafkaProducer(loop=loop, bootstrap_servers=self._servers)
19-
self._consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self._servers)
16+
self._producer = AIOKafkaProducer(bootstrap_servers=self._servers)
17+
self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers)
2018
await self._producer.start()
2119
await self._consumer.start()
2220

@@ -29,7 +27,7 @@ async def subscribe(self, channel: str) -> None:
2927
self._consumer.subscribe(topics=self._consumer_channels)
3028

3129
async def unsubscribe(self, channel: str) -> None:
32-
await self._consumer.unsubscribe()
30+
self._consumer.unsubscribe()
3331

3432
async def publish(self, channel: str, message: typing.Any) -> None:
3533
await self._producer.send_and_wait(channel, message.encode("utf8"))

0 commit comments

Comments
 (0)