This repository was archived by the owner on Aug 19, 2025. It is now read-only.
File tree Expand file tree Collapse file tree 1 file changed +14
-8
lines changed Expand file tree Collapse file tree 1 file changed +14
-8
lines changed Original file line number Diff line number Diff line change @@ -43,14 +43,20 @@ async def next_published(self) -> Event:
43
43
async def _pubsub_listener (self ) -> None :
44
44
# redis-py does not listen to the pubsub connection if there are no channels subscribed
45
45
# so we need to wait until the first channel is subscribed to start listening
46
- await self ._ready .wait ()
47
- async for message in self ._pubsub .listen ():
48
- if message ["type" ] == "message" :
49
- event = Event (
50
- channel = message ["channel" ].decode (),
51
- message = message ["data" ].decode (),
52
- )
53
- await self ._queue .put (event )
46
+ while True :
47
+ await self ._ready .wait ()
48
+ async for message in self ._pubsub .listen ():
49
+ if message ["type" ] == "message" :
50
+ event = Event (
51
+ channel = message ["channel" ].decode (),
52
+ message = message ["data" ].decode (),
53
+ )
54
+ await self ._queue .put (event )
55
+
56
+ # when no channel subscribed, clear the event.
57
+ # And then in next loop, event will blocked again until
58
+ # the new channel subscribed.Now asyncio.Task will not exit again.
59
+ self ._ready .clear ()
54
60
55
61
56
62
StreamMessageType = typing .Tuple [bytes , typing .Tuple [typing .Tuple [bytes , typing .Dict [bytes , bytes ]]]]
You can’t perform that action at this time.
0 commit comments