Skip to content
This repository was archived by the owner on Aug 19, 2025. It is now read-only.

Commit 54e70b2

Browse files
committed
fix(backend/postgres): allow concurrent pubs
This fix adds an asyncio.Lock to avoid `asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress` fixes #22
1 parent 9255c29 commit 54e70b2

File tree

2 files changed

+39
-3
lines changed

2 files changed

+39
-3
lines changed

broadcaster/_backends/postgres.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,23 @@ def __init__(self, url: str):
1313

1414
async def connect(self) -> None:
1515
self._conn = await asyncpg.connect(self._url)
16+
self._lock = asyncio.Lock()
1617
self._listen_queue: asyncio.Queue = asyncio.Queue()
1718

1819
async def disconnect(self) -> None:
1920
await self._conn.close()
2021

2122
async def subscribe(self, channel: str) -> None:
22-
await self._conn.add_listener(channel, self._listener)
23+
async with self._lock:
24+
await self._conn.add_listener(channel, self._listener)
2325

2426
async def unsubscribe(self, channel: str) -> None:
25-
await self._conn.remove_listener(channel, self._listener)
27+
async with self._lock:
28+
await self._conn.remove_listener(channel, self._listener)
2629

2730
async def publish(self, channel: str, message: str) -> None:
28-
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
31+
async with self._lock:
32+
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
2933

3034
def _listener(self, *args: Any) -> None:
3135
connection, pid, channel, payload = args

tests/test_concurrent.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,35 @@ async def test_broadcast(setup_broadcast):
2828
event = await subscriber.get()
2929
assert event.channel == channel
3030
assert event.message == msg
31+
32+
33+
@pytest.mark.asyncio
34+
@pytest.mark.parametrize(["setup_broadcast"], URLS, indirect=True)
35+
async def test_sub(setup_broadcast):
36+
uid = uuid4()
37+
channel1 = f"chatroom-{uid}1"
38+
channel2 = f"chatroom-{uid}2"
39+
40+
to_sub = [
41+
setup_broadcast._backend.subscribe(channel1),
42+
setup_broadcast._backend.subscribe(channel2),
43+
]
44+
await asyncio.gather(*to_sub)
45+
46+
47+
@pytest.mark.asyncio
48+
@pytest.mark.parametrize(["setup_broadcast"], URLS, indirect=True)
49+
async def test_unsub(setup_broadcast):
50+
uid = uuid4()
51+
channel1 = f"chatroom-{uid}1"
52+
channel2 = f"chatroom-{uid}2"
53+
54+
await setup_broadcast._backend.subscribe(channel1)
55+
await setup_broadcast._backend.subscribe(channel2)
56+
57+
to_unsub = [
58+
setup_broadcast._backend.unsubscribe(channel1),
59+
setup_broadcast._backend.unsubscribe(channel2),
60+
]
61+
62+
await asyncio.gather(*to_unsub)

0 commit comments

Comments
 (0)