Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
38f307c
clean all remaining stream ends on close
bjoaquinc Nov 2, 2025
7f241e7
add disconnect_event to handle closure from client/task_group
bjoaquinc Nov 2, 2025
7250289
create context_app using StreamingASGITransport and update test_reque…
bjoaquinc Nov 2, 2025
8a8e14f
add back await content_send_channel.aclose()
bjoaquinc Nov 2, 2025
7a83024
revert spaces
bjoaquinc Nov 2, 2025
c880cc1
remove try finally block for testing and just add sse_stream_reader.a…
bjoaquinc Nov 3, 2025
0aa5a45
fix ruff errors
bjoaquinc Nov 3, 2025
b489d30
run precommit
bjoaquinc Nov 3, 2025
bd70a51
update all sse tests that use uvicorn to use StreamingASGITransport i…
bjoaquinc Nov 6, 2025
5cc10fa
run precommit
bjoaquinc Nov 6, 2025
e1745f8
remove taskgroup fixture due to premature closing
bjoaquinc Nov 6, 2025
8fc6473
prevent sse_client from cancelling external task groups
bjoaquinc Nov 6, 2025
fabf6c5
run precommit
bjoaquinc Nov 6, 2025
e8a3b0e
revert sse_client and add cleanup to outer task group in tests
bjoaquinc Nov 6, 2025
8188bf2
remove timeout for sse_client
bjoaquinc Nov 6, 2025
86a377f
add reset_sse_app_status workaround for sse_starlette quirk
bjoaquinc Nov 6, 2025
f1748a0
improve workaround
bjoaquinc Nov 6, 2025
749d506
move workaround to conftest for other test files
bjoaquinc Nov 7, 2025
044728b
add version check into reset_sse_app_status
bjoaquinc Nov 7, 2025
2ec8907
revert duplicate http.response.start handling
bjoaquinc Nov 8, 2025
b5b4e6f
add NoopASGI
bjoaquinc Nov 8, 2025
6857129
add NoopASGI and clean tests
bjoaquinc Nov 8, 2025
3a9e576
remove uvicorn and DRY
bjoaquinc Nov 8, 2025
3d2fea9
add short comment explaining NoopASGI usage
bjoaquinc Nov 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/mcp/server/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ async def response_wrapper(scope: Scope, receive: Receive, send: Send):
)
await read_stream_writer.aclose()
await write_stream_reader.aclose()
await sse_stream_reader.aclose()
logging.debug(f"Client session disconnected {session_id}")

logger.debug("Starting SSE response task")
Expand Down
21 changes: 18 additions & 3 deletions src/mcp/server/streaming_asgi_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

import typing
from collections.abc import Awaitable, Callable
from typing import Any, cast

import anyio
Expand Down Expand Up @@ -65,6 +66,8 @@ async def handle_async_request(
) -> Response:
assert isinstance(request.stream, AsyncByteStream)

disconnect_event = anyio.Event()

# ASGI scope.
scope = {
"type": "http",
Expand Down Expand Up @@ -97,11 +100,17 @@ async def handle_async_request(
content_send_channel, content_receive_channel = anyio.create_memory_object_stream[bytes](100)

# ASGI callables.
async def send_disconnect() -> None:
disconnect_event.set()

async def receive() -> dict[str, Any]:
nonlocal request_complete

if disconnect_event.is_set():
return {"type": "http.disconnect"}

if request_complete:
await response_complete.wait()
await disconnect_event.wait()
return {"type": "http.disconnect"}

try:
Expand Down Expand Up @@ -176,7 +185,7 @@ async def process_messages() -> None:
return Response(
status_code,
headers=response_headers,
stream=StreamingASGIResponseStream(content_receive_channel),
stream=StreamingASGIResponseStream(content_receive_channel, send_disconnect),
)


Expand All @@ -192,12 +201,18 @@ class StreamingASGIResponseStream(AsyncByteStream):
def __init__(
self,
receive_channel: anyio.streams.memory.MemoryObjectReceiveStream[bytes],
send_disconnect: Callable[[], Awaitable[None]],
) -> None:
self.receive_channel = receive_channel
self.send_disconnect = send_disconnect

async def __aiter__(self) -> typing.AsyncIterator[bytes]:
try:
async for chunk in self.receive_channel:
yield chunk
finally:
await self.receive_channel.aclose()
await self.aclose()

async def aclose(self) -> None:
await self.receive_channel.aclose()
await self.send_disconnect()
39 changes: 39 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,45 @@
import anyio
import pytest
import sse_starlette
from packaging import version


@pytest.fixture
def anyio_backend():
return "asyncio"


@pytest.fixture(autouse=True)
def reset_sse_app_status():
"""Reset sse-starlette's global AppStatus singleton before each test.

AppStatus.should_exit_event is a global asyncio.Event that gets bound to
an event loop. This ensures each test gets a fresh Event and prevents
RuntimeError("bound to a different event loop") during parallel test
execution with pytest-xdist.

NOTE: This fixture is only necessary for sse-starlette < 3.0.0.
Version 3.0+ eliminated the global state issue entirely by using
context-local events instead of module-level singletons, providing
automatic test isolation without manual cleanup.

See <https://github.com/sysid/sse-starlette/pull/141> for more details.
"""

SSE_STARLETTE_VERSION = version.parse(sse_starlette.__version__)
NEEDS_RESET = SSE_STARLETTE_VERSION < version.parse("3.0.0")

if not NEEDS_RESET:
yield
return

# lazy import to avoid import errors
from sse_starlette.sse import AppStatus

# Setup: Reset before test
AppStatus.should_exit_event = anyio.Event() # type: ignore[attr-defined]

yield

# Teardown: Reset after test to prevent contamination
AppStatus.should_exit_event = anyio.Event() # type: ignore[attr-defined]
Loading