Skip to content

Use FastStream 0.6.0 #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
aad0c32
Fix FastStream & OTEL deprecations
vrslev Jul 18, 2025
8743e63
Update faststream dependency to 0.6 and configure uv to use git source
vrslev Jul 18, 2025
1ea2397
Refactor StompBroker to use configuration and specification classes
vrslev Jul 18, 2025
e28f36f
Update import to use faststream.message for StreamMessage and gen_cor_id
vrslev Jul 18, 2025
17e57e9
Add `TestStompBroker` to exports and note pending export update
vrslev Jul 18, 2025
af3aa2a
Update imports and class inheritance for better internal module align…
vrslev Jul 18, 2025
14e231e
Update imports to use internal module structure and remove deprecated…
vrslev Jul 18, 2025
73e5f03
Implement Stomp subscriber configuration and specification classes, a…
vrslev Jul 18, 2025
daaeb52
Refactor StompSubscriber to use config-based client and prefix handling
vrslev Jul 18, 2025
3acc9a8
Remove unused logging context and get_fmt method from StompBroker
vrslev Jul 18, 2025
c93fc47
Remove unused StompLogContext type and simplify get_log_context return
vrslev Jul 18, 2025
768d8a6
Refactor StompProducer to use StompPublishCommand and update method s…
vrslev Jul 18, 2025
7ebc804
Move StompBrokerConfig and add publisher configuration classes to imp…
vrslev Jul 18, 2025
a84d065
Refactor publisher classes to use new specification and usecase confi…
vrslev Jul 18, 2025
00fa5be
Refactor StompRegistrator and StompSubscriber to use configuration an…
vrslev Jul 18, 2025
3c6f816
Remove unused publisher kwargs and restructure publisher initializati…
vrslev Jul 18, 2025
e9edf8f
Update dependencies type hint from Depends to Dependant in StompRegis…
vrslev Jul 18, 2025
a14be1b
Refactor StompBroker to use stop instead of _close and remove depreca…
vrslev Jul 18, 2025
83c55ab
Improve StompBroker initialization by removing unused import and field
vrslev Jul 18, 2025
a31df7e
Refactor publish and request methods to use StompPublishCommand
vrslev Jul 18, 2025
6c634c9
Update telemetry and prometheus modules to use StompPublishCommand in…
vrslev Jul 18, 2025
531dd61
Update imports and adjust type parameters in StompRouter to use inter…
vrslev Jul 18, 2025
45bfd99
Add subscriber and publisher tracking with prefix support and context…
vrslev Jul 18, 2025
9dcf822
Implement custom fake publisher for STOMP with reply destination hand…
vrslev Jul 18, 2025
2c3614b
Refactor StompBroker to use updated internal broker and configuration…
vrslev Jul 18, 2025
333391d
Add TODO comments for interface improvements in Stomp router components
vrslev Jul 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/faststream-stomp/faststream_stomp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
"StompSubscriber",
"TestStompBroker",
]
# TODO: update exports # noqa: FIX002, TD002, TD003
160 changes: 77 additions & 83 deletions packages/faststream-stomp/faststream_stomp/broker.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,40 @@
import asyncio
import logging
import types
from collections.abc import Callable, Iterable, Mapping, Sequence
from typing import Any, Unpack
import typing
from collections.abc import Iterable, Sequence
from typing import (
TYPE_CHECKING,
cast,
)

import anyio
import stompman
from fast_depends.dependencies import Depends
from faststream.asyncapi.schema import Tag, TagDict
from faststream.broker.core.usecase import BrokerUsecase
from faststream.broker.types import BrokerMiddleware, CustomCallable
from faststream.log.logging import get_broker_logger
from fast_depends.dependencies import Dependant
from faststream._internal.basic_types import AnyDict, LoggerProto, SendableMessage
from faststream._internal.broker import BrokerUsecase
from faststream._internal.broker.broker import BrokerUsecase
from faststream._internal.broker.registrator import Registrator
from faststream._internal.configs import (
BrokerConfig,
)
from faststream._internal.constants import EMPTY
from faststream._internal.di import FastDependsConfig
from faststream._internal.types import (
BrokerMiddleware,
CustomCallable,
)
from faststream.response.publish_type import PublishType
from faststream.security import BaseSecurity
from faststream.types import EMPTY, AnyDict, Decorator, LoggerProto, SendableMessage
from faststream.specification.schema import BrokerSpec
from faststream.specification.schema.extra import Tag, TagDict

from faststream_stomp.publisher import StompProducer, StompPublisher
from faststream_stomp.configs import StompBrokerConfig
from faststream_stomp.publisher import StompPublishCommand, StompPublisher
from faststream_stomp.registrator import StompRegistrator
from faststream_stomp.subscriber import StompLogContext, StompSubscriber

if TYPE_CHECKING:
from faststream_stomp.subscriber import StompSubscriber


class StompSecurity(BaseSecurity):
Expand All @@ -31,94 +49,67 @@ def get_schema(self) -> dict[str, dict[str, str]]: # noqa: PLR6301
return {"user-password": {"type": "userPassword"}}


def _handle_listen_task_done(listen_task: asyncio.Task[None]) -> None:
# Not sure how to test this. See https://github.com/community-of-python/stompman/pull/117#issuecomment-2983584449.
task_exception = listen_task.exception()
if isinstance(task_exception, ExceptionGroup) and isinstance(
task_exception.exceptions[0], stompman.FailedAllConnectAttemptsError
):
raise SystemExit(1)


class StompBroker(StompRegistrator, BrokerUsecase[stompman.MessageFrame, stompman.Client]):
_subscribers: Mapping[int, StompSubscriber]
_publishers: Mapping[int, StompPublisher]
__max_msg_id_ln = 10
_max_channel_name = 4
class StompBroker(StompRegistrator, BrokerUsecase[stompman.MessageFrame, stompman.Client, BrokerConfig]):
_subscribers: list["StompSubscriber"] # type: ignore[assignment]
_publishers: list["StompPublisher"] # type: ignore[assignment]

def __init__(
self,
client: stompman.Client,
*,
decoder: CustomCallable | None = None,
parser: CustomCallable | None = None,
dependencies: Iterable[Depends] = (),
middlewares: Sequence[BrokerMiddleware[stompman.MessageFrame]] = (),
graceful_timeout: float | None = 15.0,
# Logging args
dependencies: Iterable[Dependant] = (),
middlewares: Sequence[BrokerMiddleware[stompman.MessageFrame, StompPublishCommand]] = (),
graceful_timeout: float | None = None,
routers: Sequence[Registrator[stompman.MessageFrame]] = (),
logger: LoggerProto | None = EMPTY,
log_level: int = logging.INFO,
log_fmt: str | None = None,
# FastDepends args
apply_types: bool = True,
validate: bool = True,
_get_dependant: Callable[..., Any] | None = None,
_call_decorators: Iterable[Decorator] = (),
# AsyncAPI kwargs,
# AsyncAPI args
description: str | None = None,
tags: Iterable[Tag | TagDict] | None = None,
tags: Iterable[Tag | TagDict] = (),
) -> None:
super().__init__(
client=client, # **connection_kwargs
decoder=decoder,
parser=parser,
dependencies=dependencies,
middlewares=middlewares,
broker_config = StompBrokerConfig(
broker_middlewares=cast("Sequence[BrokerMiddleware]", middlewares),
broker_parser=parser,
broker_decoder=decoder,
logger=logger, # TODO
fd_config=FastDependsConfig(use_fastdepends=apply_types),
broker_dependencies=dependencies,
graceful_timeout=graceful_timeout,
logger=logger,
log_level=log_level,
log_fmt=log_fmt,
apply_types=apply_types,
validate=validate,
_get_dependant=_get_dependant,
_call_decorators=_call_decorators,
extra_context={"broker": self},
client=client,
)
specification = BrokerSpec(
url=[f"{one_server.host}:{one_server.port}" for one_server in broker_config.client.servers],
protocol="STOMP",
protocol_version="1.2",
description=description,
tags=tags,
asyncapi_url=[f"{one_server.host}:{one_server.port}" for one_server in client.servers],
security=StompSecurity(),
default_logger=get_broker_logger(
name="stomp", default_context={"channel": ""}, message_id_ln=self.__max_msg_id_ln
),
)
self._attempted_to_connect = False

async def start(self) -> None:
await super().start()

for handler in self._subscribers.values():
self._log(f"`{handler.call_name}` waiting for messages", extra=handler.get_log_context(None))
await handler.start()
super().__init__(config=broker_config, specification=specification, routers=routers)
self._attempted_to_connect = False

async def _connect(self, client: stompman.Client) -> stompman.Client: # type: ignore[override]
if self._attempted_to_connect:
return client
self._attempted_to_connect = True
self._producer = StompProducer(client)
await client.__aenter__()
client._listen_task.add_done_callback(_handle_listen_task_done) # noqa: SLF001
return client

async def _close(
async def stop(
self,
exc_type: type[BaseException] | None = None,
exc_val: BaseException | None = None,
exc_tb: types.TracebackType | None = None,
) -> None:
if self._connection:
await self._connection.__aexit__(exc_type, exc_val, exc_tb)
return await super()._close(exc_type, exc_val, exc_tb)
return await super().stop(exc_type, exc_val, exc_tb)

async def ping(self, timeout: float | None = None) -> bool:
sleep_time = (timeout or 10) / 10
Expand All @@ -137,21 +128,6 @@ async def ping(self, timeout: float | None = None) -> bool:

return False # pragma: no cover

def get_fmt(self) -> str:
# `StompLogContext`
return (
"%(asctime)s %(levelname)-8s - "
f"%(destination)-{self._max_channel_name}s | "
f"%(message_id)-{self.__max_msg_id_ln}s "
"- %(message)s"
)

def _setup_log_context(self, **log_context: Unpack[StompLogContext]) -> None: ... # type: ignore[override]

@property
def _subscriber_setup_extra(self) -> "AnyDict":
return {**super()._subscriber_setup_extra, "client": self._connection}

async def publish( # type: ignore[override]
self,
message: SendableMessage,
Expand All @@ -160,19 +136,37 @@ async def publish( # type: ignore[override]
correlation_id: str | None = None,
headers: dict[str, str] | None = None,
) -> None:
await super().publish(
publish_command = StompPublishCommand(
message,
producer=self._producer,
correlation_id=correlation_id,
_publish_type=PublishType.PUBLISH,
destination=destination,
correlation_id=correlation_id,
headers=headers,
)
return typing.cast("None", self._basic_publish(publish_command, producer=self._producer))

async def request( # type: ignore[override]
self,
msg: Any, # noqa: ANN401
message: SendableMessage,
destination: str,
*,
correlation_id: str | None = None,
headers: dict[str, str] | None = None,
) -> Any: # noqa: ANN401
return await super().request(msg, producer=self._producer, correlation_id=correlation_id, headers=headers)
) -> None:
publish_command = StompPublishCommand(
message,
_publish_type=PublishType.REQUEST,
destination=destination,
correlation_id=correlation_id,
headers=headers,
)
return typing.cast("None", self._basic_request(publish_command, producer=self._producer))


def _handle_listen_task_done(listen_task: asyncio.Task[None]) -> None:
# Not sure how to test this. See https://github.com/community-of-python/stompman/pull/117#issuecomment-2983584449.
task_exception = listen_task.exception()
if isinstance(task_exception, ExceptionGroup) and isinstance(
task_exception.exceptions[0], stompman.FailedAllConnectAttemptsError
):
raise SystemExit(1)
63 changes: 63 additions & 0 deletions packages/faststream-stomp/faststream_stomp/configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from dataclasses import dataclass, field

import stompman
from faststream._internal.configs import (
BrokerConfig,
PublisherSpecificationConfig,
PublisherUsecaseConfig,
SubscriberSpecificationConfig,
SubscriberUsecaseConfig,
)
from faststream._internal.types import AsyncCallable
from faststream._internal.utils.functions import to_async
from faststream.message import decode_message
from faststream.middlewares import AckPolicy

from faststream_stomp.message import StompStreamMessage


@dataclass(kw_only=True)
class StompBrokerConfig(BrokerConfig):
client: stompman.Client


@dataclass(kw_only=True)
class StompBaseSubscriberConfig:
destination: str
ack_mode: stompman.AckMode
headers: dict[str, str] | None


@dataclass(kw_only=True)
class StompSubscriberSpecificationConfig(StompBaseSubscriberConfig, SubscriberSpecificationConfig):
parser: AsyncCallable = StompStreamMessage.from_frame
decoder: AsyncCallable = field(default=to_async(decode_message))

@property
def ack_policy(self) -> AckPolicy:
return AckPolicy.MANUAL if self.ack_mode == "auto" else AckPolicy.NACK_ON_ERROR


@dataclass(kw_only=True)
class StompSubscriberUsecaseConfig(StompBaseSubscriberConfig, SubscriberUsecaseConfig):
_outer_config: StompBrokerConfig
parser: AsyncCallable = StompStreamMessage.from_frame
decoder: AsyncCallable = field(default=to_async(decode_message))

@property
def ack_policy(self) -> AckPolicy:
return AckPolicy.MANUAL if self.ack_mode == "auto" else AckPolicy.NACK_ON_ERROR


@dataclass(kw_only=True)
class StompBasePublisherConfig:
destination: str


@dataclass(kw_only=True)
class StompPublisherSpecificationConfig(StompBasePublisherConfig, PublisherSpecificationConfig): ...


@dataclass(kw_only=True)
class StompPublisherUsecaseConfig(StompBasePublisherConfig, PublisherUsecaseConfig):
_outer_config: StompBrokerConfig
2 changes: 1 addition & 1 deletion packages/faststream-stomp/faststream_stomp/message.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Self, cast

import stompman
from faststream.broker.message import StreamMessage, gen_cor_id
from faststream.message import StreamMessage, gen_cor_id


class StompStreamMessage(StreamMessage[stompman.AckableMessageFrame]):
Expand Down
31 changes: 16 additions & 15 deletions packages/faststream-stomp/faststream_stomp/opentelemetry.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,43 @@
import stompman
from faststream.broker.message import StreamMessage
from faststream._internal.basic_types import AnyDict
from faststream.message import StreamMessage
from faststream.opentelemetry import TelemetrySettingsProvider
from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME
from faststream.opentelemetry.middleware import TelemetryMiddleware
from faststream.types import AnyDict
from opentelemetry.metrics import Meter, MeterProvider
from opentelemetry.semconv._incubating.attributes import messaging_attributes
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import TracerProvider

from faststream_stomp.publisher import StompProducerPublishKwargs
from faststream_stomp.publisher import StompPublishCommand


class StompTelemetrySettingsProvider(TelemetrySettingsProvider[stompman.MessageFrame]):
class StompTelemetrySettingsProvider(TelemetrySettingsProvider[stompman.MessageFrame, StompPublishCommand]):
messaging_system = "stomp"

def get_consume_attrs_from_message(self, msg: StreamMessage[stompman.MessageFrame]) -> "AnyDict":
return {
SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id,
SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id,
messaging_attributes.MESSAGING_SYSTEM: self.messaging_system,
messaging_attributes.MESSAGING_MESSAGE_ID: msg.message_id,
messaging_attributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id,
SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body),
MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.headers["destination"],
}

def get_consume_destination_name(self, msg: StreamMessage[stompman.MessageFrame]) -> str: # noqa: PLR6301
return msg.raw_message.headers["destination"]

def get_publish_attrs_from_kwargs(self, kwargs: StompProducerPublishKwargs) -> AnyDict: # type: ignore[override]
def get_publish_attrs_from_cmd(self, cmd: StompPublishCommand) -> AnyDict:
publish_attrs = {
SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
SpanAttributes.MESSAGING_DESTINATION_NAME: kwargs["destination"],
messaging_attributes.MESSAGING_SYSTEM: self.messaging_system,
messaging_attributes.MESSAGING_DESTINATION_NAME: cmd.destination,
}
if kwargs["correlation_id"]:
publish_attrs[SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID] = kwargs["correlation_id"]
if cmd.correlation_id:
publish_attrs[messaging_attributes.MESSAGING_MESSAGE_CONVERSATION_ID] = cmd.correlation_id
return publish_attrs

def get_publish_destination_name(self, kwargs: StompProducerPublishKwargs) -> str: # type: ignore[override] # noqa: PLR6301
return kwargs["destination"]
def get_publish_destination_name(self, cmd: StompPublishCommand) -> str: # noqa: PLR6301
return cmd.destination


class StompTelemetryMiddleware(TelemetryMiddleware):
Expand All @@ -48,7 +49,7 @@ def __init__(
meter: Meter | None = None,
) -> None:
super().__init__(
settings_provider_factory=lambda _: StompTelemetrySettingsProvider(),
settings_provider_factory=lambda _: StompTelemetrySettingsProvider(), # type: ignore[arg-type,return-value]
tracer_provider=tracer_provider,
meter_provider=meter_provider,
meter=meter,
Expand Down
Loading