From 96461823373347db03c6e7ef81eee75074178159 Mon Sep 17 00:00:00 2001 From: zhangluguang Date: Thu, 21 Nov 2024 14:11:35 +0800 Subject: [PATCH 1/8] feat: :sparkles: fix redis stream empty subscribe bug && supported redis stream cached(#148) --- .gitignore | 2 + README.md | 1 + broadcaster/__init__.py | 3 +- broadcaster/_base.py | 38 +++++++++------ broadcaster/_event.py | 10 ++++ broadcaster/backends/base.py | 19 +++++++- broadcaster/backends/kafka.py | 2 +- broadcaster/backends/memory.py | 2 +- broadcaster/backends/postgres.py | 2 +- broadcaster/backends/redis.py | 84 ++++++++++++++++++++++++++++++-- example/app.py | 19 ++++++-- tests/test_broadcast.py | 23 +++++++++ 12 files changed, 178 insertions(+), 27 deletions(-) create mode 100644 broadcaster/_event.py diff --git a/.gitignore b/.gitignore index 013870b..2823f34 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ test.db venv/ build/ dist/ +.idea/ +.vscode/ diff --git a/README.md b/README.md index 959b34d..be36329 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,7 @@ Python 3.8+ * `Broadcast('memory://')` * `Broadcast("redis://localhost:6379")` * `Broadcast("redis-stream://localhost:6379")` +* `Broadcast("redis-stream-cached://localhost:6379")` * `Broadcast("postgres://localhost:5432/broadcaster")` * `Broadcast("kafka://localhost:9092")` diff --git a/broadcaster/__init__.py b/broadcaster/__init__.py index 0bcd9d2..f5db5d4 100644 --- a/broadcaster/__init__.py +++ b/broadcaster/__init__.py @@ -1,4 +1,5 @@ -from ._base import Broadcast, Event +from ._base import Broadcast +from ._event import Event from .backends.base import BroadcastBackend __version__ = "0.3.1" diff --git a/broadcaster/_base.py b/broadcaster/_base.py index a63b22b..d34e12d 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -5,20 +5,12 @@ from typing import TYPE_CHECKING, Any, AsyncGenerator, AsyncIterator, cast from urllib.parse import urlparse -if TYPE_CHECKING: # pragma: no cover - from broadcaster.backends.base import BroadcastBackend - - -class Event: - def __init__(self, channel: str, message: str) -> None: - self.channel = channel - self.message = message +from broadcaster.backends.base import BroadcastCacheBackend - def __eq__(self, other: object) -> bool: - return isinstance(other, Event) and self.channel == other.channel and self.message == other.message +from ._event import Event - def __repr__(self) -> str: - return f"Event(channel={self.channel!r}, message={self.message!r})" +if TYPE_CHECKING: # pragma: no cover + from broadcaster.backends.base import BroadcastBackend class Unsubscribed(Exception): @@ -43,6 +35,11 @@ def _create_backend(self, url: str) -> BroadcastBackend: return RedisStreamBackend(url) + elif parsed_url.scheme == "redis-stream-cached": + from broadcaster.backends.redis import RedisStreamCachedBackend + + return RedisStreamCachedBackend(url) + elif parsed_url.scheme in ("postgres", "postgresql"): from broadcaster.backends.postgres import PostgresBackend @@ -87,7 +84,7 @@ async def publish(self, channel: str, message: Any) -> None: await self._backend.publish(channel, message) @asynccontextmanager - async def subscribe(self, channel: str) -> AsyncIterator[Subscriber]: + async def subscribe(self, channel: str, history: int | None = None) -> AsyncIterator[Subscriber]: queue: asyncio.Queue[Event | None] = asyncio.Queue() try: @@ -95,7 +92,20 @@ async def subscribe(self, channel: str) -> AsyncIterator[Subscriber]: await self._backend.subscribe(channel) self._subscribers[channel] = {queue} else: - self._subscribers[channel].add(queue) + if isinstance(self._backend, BroadcastCacheBackend): + try: + current_id = await self._backend.get_current_channel_id(channel) + self._backend._ready.clear() + messages = await self._backend.get_history_messages(channel, current_id, history) + for message in messages: + queue.put_nowait(message) + self._subscribers[channel].add(queue) + finally: + # wake up the listener after inqueue history messages + # for sorted messages by publish time + self._backend._ready.set() + else: + self._subscribers[channel].add(queue) yield Subscriber(queue) finally: diff --git a/broadcaster/_event.py b/broadcaster/_event.py new file mode 100644 index 0000000..65436cb --- /dev/null +++ b/broadcaster/_event.py @@ -0,0 +1,10 @@ +class Event: + def __init__(self, channel: str, message: str) -> None: + self.channel = channel + self.message = message + + def __eq__(self, other: object) -> bool: + return isinstance(other, Event) and self.channel == other.channel and self.message == other.message + + def __repr__(self) -> str: + return f"Event(channel={self.channel!r}, message={self.message!r})" diff --git a/broadcaster/backends/base.py b/broadcaster/backends/base.py index 7017df3..5e1260e 100644 --- a/broadcaster/backends/base.py +++ b/broadcaster/backends/base.py @@ -1,6 +1,8 @@ -from typing import Any +from __future__ import annotations -from .._base import Event +from typing import Any, AsyncGenerator + +from .._event import Event class BroadcastBackend: @@ -24,3 +26,16 @@ async def publish(self, channel: str, message: Any) -> None: async def next_published(self) -> Event: raise NotImplementedError() + + +class BroadcastCacheBackend(BroadcastBackend): + async def get_current_channel_id(self, channel: str): + raise NotImplementedError() + + async def get_history_messages( + self, + channel: str, + msg_id: int | bytes | str | memoryview, + count: int | None = None, + ) -> AsyncGenerator[Event, None]: + raise NotImplementedError() diff --git a/broadcaster/backends/kafka.py b/broadcaster/backends/kafka.py index f09dca1..065a347 100644 --- a/broadcaster/backends/kafka.py +++ b/broadcaster/backends/kafka.py @@ -6,7 +6,7 @@ from aiokafka import AIOKafkaConsumer, AIOKafkaProducer -from .._base import Event +from .._event import Event from .base import BroadcastBackend diff --git a/broadcaster/backends/memory.py b/broadcaster/backends/memory.py index bfd0c44..2704124 100644 --- a/broadcaster/backends/memory.py +++ b/broadcaster/backends/memory.py @@ -3,7 +3,7 @@ import asyncio import typing -from .._base import Event +from .._event import Event from .base import BroadcastBackend diff --git a/broadcaster/backends/postgres.py b/broadcaster/backends/postgres.py index 7769962..d0bd42c 100644 --- a/broadcaster/backends/postgres.py +++ b/broadcaster/backends/postgres.py @@ -3,7 +3,7 @@ import asyncpg -from .._base import Event +from .._event import Event from .base import BroadcastBackend diff --git a/broadcaster/backends/redis.py b/broadcaster/backends/redis.py index 1be4195..c389065 100644 --- a/broadcaster/backends/redis.py +++ b/broadcaster/backends/redis.py @@ -5,8 +5,8 @@ from redis import asyncio as redis -from .._base import Event -from .base import BroadcastBackend +from .._event import Event +from .base import BroadcastBackend, BroadcastCacheBackend class RedisBackend(BroadcastBackend): @@ -88,14 +88,71 @@ async def subscribe(self, channel: str) -> None: async def unsubscribe(self, channel: str) -> None: self.streams.pop(channel, None) + if not self.streams: + self._ready.clear() + + async def publish(self, channel: str, message: typing.Any) -> None: + await self._producer.xadd(channel, {"message": message}) + + async def wait_for_messages(self) -> list[StreamMessageType]: + messages = None + while not messages: + if not self.streams: + # 1. save cpu usage + # 2. redis raise expection when self.streams is empty + self._ready.clear() + await self._ready.wait() + messages = await self._consumer.xread(self.streams, count=1, block=100) + return messages + + async def next_published(self) -> Event: + messages = await self.wait_for_messages() + stream, events = messages[0] + _msg_id, message = events[0] + self.streams[stream.decode("utf-8")] = _msg_id.decode("utf-8") + return Event( + channel=stream.decode("utf-8"), + message=message.get(b"message", b"").decode("utf-8"), + ) + + +class RedisStreamCachedBackend(BroadcastCacheBackend): + def __init__(self, url: str): + url = url.replace("redis-stream-cached", "redis", 1) + self.streams: dict[bytes | str | memoryview, int | bytes | str | memoryview] = {} + self._ready = asyncio.Event() + self._producer = redis.Redis.from_url(url) + self._consumer = redis.Redis.from_url(url) + + async def connect(self) -> None: + pass + + async def disconnect(self) -> None: + await self._producer.aclose() + await self._consumer.aclose() + + async def subscribe(self, channel: str) -> None: + # read from beginning + last_id = "0" + self.streams[channel] = last_id + self._ready.set() + + async def unsubscribe(self, channel: str) -> None: + self.streams.pop(channel, None) + if not self.streams: + self._ready.clear() async def publish(self, channel: str, message: typing.Any) -> None: await self._producer.xadd(channel, {"message": message}) async def wait_for_messages(self) -> list[StreamMessageType]: - await self._ready.wait() messages = None while not messages: + if not self.streams: + # 1. save cpu usage + # 2. redis raise expection when self.streams is empty + self._ready.clear() + await self._ready.wait() messages = await self._consumer.xread(self.streams, count=1, block=100) return messages @@ -108,3 +165,24 @@ async def next_published(self) -> Event: channel=stream.decode("utf-8"), message=message.get(b"message", b"").decode("utf-8"), ) + + async def get_current_channel_id(self, channel: str): + try: + info = await self._consumer.xinfo_stream(channel) + last_id = info["last-generated-id"] + except redis.ResponseError: + last_id = "0" + return last_id + + async def get_history_messages( + self, + channel: str, + msg_id: int | bytes | str | memoryview, + count: int | None = None, + ) -> typing.AsyncGenerator[Event, None]: + messages = await self._consumer.xrevrange(channel, max=msg_id, count=count) + for _, message in reversed(messages or []): + yield Event( + channel=channel, + message=message.get(b"message", b"").decode("utf-8"), + ) diff --git a/example/app.py b/example/app.py index a201221..362f028 100644 --- a/example/app.py +++ b/example/app.py @@ -1,5 +1,7 @@ -import os +from __future__ import annotations +import os +from pathlib import Path import anyio from starlette.applications import Starlette @@ -10,8 +12,9 @@ BROADCAST_URL = os.environ.get("BROADCAST_URL", "memory://") -broadcast = Broadcast(BROADCAST_URL) -templates = Jinja2Templates("example/templates") +templates_dir = Path(__file__).parent / "templates" +broadcast = Broadcast("redis-stream-cached://localhost:6379/8") +templates = Jinja2Templates(templates_dir) async def homepage(request): @@ -51,5 +54,13 @@ async def chatroom_ws_sender(websocket): app = Starlette( - routes=routes, on_startup=[broadcast.connect], on_shutdown=[broadcast.disconnect], + routes=routes, + on_startup=[broadcast.connect], + on_shutdown=[broadcast.disconnect], ) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=7777) diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index a8bd3eb..047c0f3 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -71,6 +71,29 @@ async def test_redis_stream(): assert event.message == "hello" +@pytest.mark.asyncio +async def test_redis_stream_cache(): + messages = ["hello", "I'm cached"] + async with Broadcast("redis-stream-cached://localhost:6379") as broadcast: + await broadcast.publish("chatroom_cached", messages[0]) + await broadcast.publish("chatroom_cached", messages[1]) + await broadcast.publish("chatroom_cached", "quit") + sub1_messages = [] + async with broadcast.subscribe("chatroom_cached") as subscriber: + async for event in subscriber: + if event.message == "quit": + break + sub1_messages.append(event.message) + sub2_messages = [] + async with broadcast.subscribe("chatroom_cached") as subscriber: + async for event in subscriber: + if event.message == "quit": + break + sub2_messages.append(event.message) + + assert sub1_messages == sub2_messages == messages + + @pytest.mark.asyncio async def test_postgres(): async with Broadcast("postgres://postgres:postgres@localhost:5432/broadcaster") as broadcast: From 509e8fbc819ca2c43d7934503acf7d462c6bb076 Mon Sep 17 00:00:00 2001 From: zhangluguang Date: Thu, 21 Nov 2024 14:49:13 +0800 Subject: [PATCH 2/8] fix: :bug: fixed type lint --- broadcaster/_base.py | 3 +-- broadcaster/backends/base.py | 42 +++++++++++++++++++---------------- broadcaster/backends/redis.py | 2 +- tests/test_broadcast.py | 14 +++++++----- 4 files changed, 33 insertions(+), 28 deletions(-) diff --git a/broadcaster/_base.py b/broadcaster/_base.py index d34e12d..c3f2cd0 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -96,8 +96,7 @@ async def subscribe(self, channel: str, history: int | None = None) -> AsyncIter try: current_id = await self._backend.get_current_channel_id(channel) self._backend._ready.clear() - messages = await self._backend.get_history_messages(channel, current_id, history) - for message in messages: + async for message in self._backend.get_history_messages(channel, current_id, history): queue.put_nowait(message) self._subscribers[channel].add(queue) finally: diff --git a/broadcaster/backends/base.py b/broadcaster/backends/base.py index 5e1260e..d45d0aa 100644 --- a/broadcaster/backends/base.py +++ b/broadcaster/backends/base.py @@ -1,41 +1,45 @@ from __future__ import annotations +import asyncio +from abc import ABC, abstractmethod from typing import Any, AsyncGenerator from .._event import Event -class BroadcastBackend: - def __init__(self, url: str) -> None: - raise NotImplementedError() +class BroadcastBackend(ABC): + @abstractmethod + def __init__(self, url: str) -> None: ... - async def connect(self) -> None: - raise NotImplementedError() + @abstractmethod + async def connect(self) -> None: ... - async def disconnect(self) -> None: - raise NotImplementedError() + @abstractmethod + async def disconnect(self) -> None: ... - async def subscribe(self, channel: str) -> None: - raise NotImplementedError() + @abstractmethod + async def subscribe(self, channel: str) -> None: ... - async def unsubscribe(self, channel: str) -> None: - raise NotImplementedError() + @abstractmethod + async def unsubscribe(self, channel: str) -> None: ... - async def publish(self, channel: str, message: Any) -> None: - raise NotImplementedError() + @abstractmethod + async def publish(self, channel: str, message: Any) -> None: ... - async def next_published(self) -> Event: - raise NotImplementedError() + @abstractmethod + async def next_published(self) -> Event: ... class BroadcastCacheBackend(BroadcastBackend): - async def get_current_channel_id(self, channel: str): - raise NotImplementedError() + _ready: asyncio.Event + @abstractmethod + async def get_current_channel_id(self, channel: str) -> str | bytes | memoryview | int: ... + + @abstractmethod async def get_history_messages( self, channel: str, msg_id: int | bytes | str | memoryview, count: int | None = None, - ) -> AsyncGenerator[Event, None]: - raise NotImplementedError() + ) -> AsyncGenerator[Event, None]: ... diff --git a/broadcaster/backends/redis.py b/broadcaster/backends/redis.py index c389065..da67273 100644 --- a/broadcaster/backends/redis.py +++ b/broadcaster/backends/redis.py @@ -166,7 +166,7 @@ async def next_published(self) -> Event: message=message.get(b"message", b"").decode("utf-8"), ) - async def get_current_channel_id(self, channel: str): + async def get_current_channel_id(self, channel: str) -> int | bytes | str | memoryview: try: info = await self._consumer.xinfo_stream(channel) last_id = info["last-generated-id"] diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 047c0f3..19e9881 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -81,15 +81,17 @@ async def test_redis_stream_cache(): sub1_messages = [] async with broadcast.subscribe("chatroom_cached") as subscriber: async for event in subscriber: - if event.message == "quit": - break - sub1_messages.append(event.message) + if event: + if event.message == "quit": + break + sub1_messages.append(event.message) sub2_messages = [] async with broadcast.subscribe("chatroom_cached") as subscriber: async for event in subscriber: - if event.message == "quit": - break - sub2_messages.append(event.message) + if event: + if event.message == "quit": + break + sub2_messages.append(event.message) assert sub1_messages == sub2_messages == messages From f69f830bd4cff6b425e36a7f614fac2932c5b647 Mon Sep 17 00:00:00 2001 From: zhangluguang Date: Thu, 21 Nov 2024 15:06:11 +0800 Subject: [PATCH 3/8] fix: :bug: fixed type lint --- broadcaster/_base.py | 2 +- broadcaster/backends/base.py | 2 +- broadcaster/backends/redis.py | 10 ++++++---- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/broadcaster/_base.py b/broadcaster/_base.py index c3f2cd0..2bdd3cd 100644 --- a/broadcaster/_base.py +++ b/broadcaster/_base.py @@ -96,7 +96,7 @@ async def subscribe(self, channel: str, history: int | None = None) -> AsyncIter try: current_id = await self._backend.get_current_channel_id(channel) self._backend._ready.clear() - async for message in self._backend.get_history_messages(channel, current_id, history): + for message in await self._backend.get_history_messages(channel, current_id, history): queue.put_nowait(message) self._subscribers[channel].add(queue) finally: diff --git a/broadcaster/backends/base.py b/broadcaster/backends/base.py index d45d0aa..f7bbbcb 100644 --- a/broadcaster/backends/base.py +++ b/broadcaster/backends/base.py @@ -42,4 +42,4 @@ async def get_history_messages( channel: str, msg_id: int | bytes | str | memoryview, count: int | None = None, - ) -> AsyncGenerator[Event, None]: ... + ) -> list[Event]: ... diff --git a/broadcaster/backends/redis.py b/broadcaster/backends/redis.py index da67273..43905bc 100644 --- a/broadcaster/backends/redis.py +++ b/broadcaster/backends/redis.py @@ -169,7 +169,7 @@ async def next_published(self) -> Event: async def get_current_channel_id(self, channel: str) -> int | bytes | str | memoryview: try: info = await self._consumer.xinfo_stream(channel) - last_id = info["last-generated-id"] + last_id: int = info["last-generated-id"] except redis.ResponseError: last_id = "0" return last_id @@ -179,10 +179,12 @@ async def get_history_messages( channel: str, msg_id: int | bytes | str | memoryview, count: int | None = None, - ) -> typing.AsyncGenerator[Event, None]: + ) -> list[Event]: messages = await self._consumer.xrevrange(channel, max=msg_id, count=count) - for _, message in reversed(messages or []): - yield Event( + return [ + Event( channel=channel, message=message.get(b"message", b"").decode("utf-8"), ) + for _, message in reversed(messages or []) + ] From bdfbf24bec521027b82bf7fa5bae144585b69547 Mon Sep 17 00:00:00 2001 From: zhangluguang Date: Thu, 21 Nov 2024 15:11:43 +0800 Subject: [PATCH 4/8] fix: :bug: fixed type lint --- broadcaster/backends/redis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broadcaster/backends/redis.py b/broadcaster/backends/redis.py index 43905bc..a4d0347 100644 --- a/broadcaster/backends/redis.py +++ b/broadcaster/backends/redis.py @@ -169,7 +169,7 @@ async def next_published(self) -> Event: async def get_current_channel_id(self, channel: str) -> int | bytes | str | memoryview: try: info = await self._consumer.xinfo_stream(channel) - last_id: int = info["last-generated-id"] + last_id: int | bytes | str | memoryview = info["last-generated-id"] except redis.ResponseError: last_id = "0" return last_id From 7885f76785c533d142d86429f7f6365614fc679b Mon Sep 17 00:00:00 2001 From: zhangluguang Date: Thu, 21 Nov 2024 15:14:02 +0800 Subject: [PATCH 5/8] fix: :bug: fixed type lint --- broadcaster/backends/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broadcaster/backends/base.py b/broadcaster/backends/base.py index f7bbbcb..42bd98d 100644 --- a/broadcaster/backends/base.py +++ b/broadcaster/backends/base.py @@ -2,7 +2,7 @@ import asyncio from abc import ABC, abstractmethod -from typing import Any, AsyncGenerator +from typing import Any from .._event import Event From b1b4344250be0d7b8005e1c899faeb6d07d6f54b Mon Sep 17 00:00:00 2001 From: zhangluguang Date: Thu, 21 Nov 2024 16:34:31 +0800 Subject: [PATCH 6/8] revert app.py && abc --- broadcaster/backends/base.py | 42 ++++++++++++++++-------------------- example/app.py | 15 +++---------- 2 files changed, 22 insertions(+), 35 deletions(-) diff --git a/broadcaster/backends/base.py b/broadcaster/backends/base.py index 42bd98d..0a7ab24 100644 --- a/broadcaster/backends/base.py +++ b/broadcaster/backends/base.py @@ -1,45 +1,41 @@ from __future__ import annotations -import asyncio -from abc import ABC, abstractmethod from typing import Any from .._event import Event -class BroadcastBackend(ABC): - @abstractmethod - def __init__(self, url: str) -> None: ... +class BroadcastBackend: + def __init__(self, url: str) -> None: + raise NotImplementedError() - @abstractmethod - async def connect(self) -> None: ... + async def connect(self) -> None: + raise NotImplementedError() - @abstractmethod - async def disconnect(self) -> None: ... + async def disconnect(self) -> None: + raise NotImplementedError() - @abstractmethod - async def subscribe(self, channel: str) -> None: ... + async def subscribe(self, channel: str) -> None: + raise NotImplementedError() - @abstractmethod - async def unsubscribe(self, channel: str) -> None: ... + async def unsubscribe(self, channel: str) -> None: + raise NotImplementedError() - @abstractmethod - async def publish(self, channel: str, message: Any) -> None: ... + async def publish(self, channel: str, message: Any) -> None: + raise NotImplementedError() - @abstractmethod - async def next_published(self) -> Event: ... + async def next_published(self) -> Event: + raise NotImplementedError() class BroadcastCacheBackend(BroadcastBackend): - _ready: asyncio.Event + async def get_current_channel_id(self, channel: str): + raise NotImplementedError() - @abstractmethod - async def get_current_channel_id(self, channel: str) -> str | bytes | memoryview | int: ... - - @abstractmethod async def get_history_messages( self, channel: str, msg_id: int | bytes | str | memoryview, count: int | None = None, - ) -> list[Event]: ... + ) -> list[Event]: + raise NotImplementedError() diff --git a/example/app.py b/example/app.py index 362f028..e012eef 100644 --- a/example/app.py +++ b/example/app.py @@ -1,7 +1,5 @@ -from __future__ import annotations - import os -from pathlib import Path + import anyio from starlette.applications import Starlette @@ -12,9 +10,8 @@ BROADCAST_URL = os.environ.get("BROADCAST_URL", "memory://") -templates_dir = Path(__file__).parent / "templates" -broadcast = Broadcast("redis-stream-cached://localhost:6379/8") -templates = Jinja2Templates(templates_dir) +broadcast = Broadcast(BROADCAST_URL) +templates = Jinja2Templates("example/templates") async def homepage(request): @@ -58,9 +55,3 @@ async def chatroom_ws_sender(websocket): on_startup=[broadcast.connect], on_shutdown=[broadcast.disconnect], ) - - -if __name__ == "__main__": - import uvicorn - - uvicorn.run(app, host="0.0.0.0", port=7777) From 719e832b2dcb69eb4daa2f843ffe11e68b070241 Mon Sep 17 00:00:00 2001 From: zhangluguang Date: Thu, 21 Nov 2024 16:36:52 +0800 Subject: [PATCH 7/8] fix: :bug: fixed type lint --- broadcaster/backends/base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/broadcaster/backends/base.py b/broadcaster/backends/base.py index 0a7ab24..d5b1d14 100644 --- a/broadcaster/backends/base.py +++ b/broadcaster/backends/base.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from typing import Any from .._event import Event @@ -29,6 +30,8 @@ async def next_published(self) -> Event: class BroadcastCacheBackend(BroadcastBackend): + _ready: asyncio.Event + async def get_current_channel_id(self, channel: str): raise NotImplementedError() From 904172c16d94eb88b6f927d6be4d47531321c389 Mon Sep 17 00:00:00 2001 From: zhangluguang Date: Thu, 21 Nov 2024 16:39:37 +0800 Subject: [PATCH 8/8] fix: :bug: fixed type lint --- broadcaster/backends/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broadcaster/backends/base.py b/broadcaster/backends/base.py index d5b1d14..1a27ef8 100644 --- a/broadcaster/backends/base.py +++ b/broadcaster/backends/base.py @@ -32,7 +32,7 @@ async def next_published(self) -> Event: class BroadcastCacheBackend(BroadcastBackend): _ready: asyncio.Event - async def get_current_channel_id(self, channel: str): + async def get_current_channel_id(self, channel: str) -> str | bytes | memoryview | int: raise NotImplementedError() async def get_history_messages(