Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.

Commit 01c33e9

Browse files
committed
fix(backend/postgres): allow concurrent pubs
This fix adds a lock (asyncio.Event based) to avoid `asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress` fixes #22
1 parent 9255c29 commit 01c33e9

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

broadcaster/_backends/postgres.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ def __init__(self, url: str):
1313

1414
async def connect(self) -> None:
1515
self._conn = await asyncpg.connect(self._url)
16+
self._event = asyncio.Event()
17+
self._event.set()
1618
self._listen_queue: asyncio.Queue = asyncio.Queue()
1719

1820
async def disconnect(self) -> None:
@@ -25,7 +27,12 @@ async def unsubscribe(self, channel: str) -> None:
2527
await self._conn.remove_listener(channel, self._listener)
2628

2729
async def publish(self, channel: str, message: str) -> None:
28-
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
30+
await self._event.wait()
31+
self._event.clear()
32+
try:
33+
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
34+
finally:
35+
self._event.set()
2936

3037
def _listener(self, *args: Any) -> None:
3138
connection, pid, channel, payload = args

0 commit comments

Comments
 (0)