Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 48 additions & 5 deletions docs/development/tests.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,53 @@ async def test_database_tool():

### Testing Network Transports

While in-memory testing covers most unit testing needs, you'll occasionally need to test actual network transports. Use the `run_server_in_process` utility to spawn a server in a separate process for testing:
While in-memory testing covers most unit testing needs, you'll occasionally need to test actual network transports like HTTP or SSE. FastMCP provides two approaches: in-process async servers using AnyIO task groups (preferred), and separate subprocess servers (for special cases).

#### In-Process Network Testing (Preferred)

For most network transport tests, use `run_server_async` with AnyIO task groups. This runs the server as a task in the same process, providing fast, deterministic tests with full debugger support:

```python
import pytest
from anyio.abc import TaskGroup
from fastmcp import FastMCP, Client
from fastmcp.client.transports import StreamableHttpTransport
from fastmcp.utilities.tests import run_server_async

def create_test_server() -> FastMCP:
"""Create a test server instance."""
server = FastMCP("TestServer")

@server.tool
def greet(name: str) -> str:
return f"Hello, {name}!"

return server

@pytest.fixture
async def http_server(task_group: TaskGroup) -> str:
"""Start server in-process using task group."""
server = create_test_server()
url = await run_server_async(task_group, server, transport="http")
return url

async def test_http_transport(http_server: str):
"""Test actual HTTP transport behavior."""
async with Client(
transport=StreamableHttpTransport(http_server)
) as client:
result = await client.ping()
assert result is True

greeting = await client.call_tool("greet", {"name": "World"})
assert greeting.data == "Hello, World!"
```

The `task_group` fixture is provided globally by `conftest.py` and automatically handles server lifecycle and cleanup. This approach is faster than subprocess-based testing and provides better error messages.

#### Subprocess Testing (Special Cases)

For tests that require complete process isolation (like STDIO transport or testing subprocess behavior), use `run_server_in_process`:

```python
import pytest
Expand Down Expand Up @@ -328,12 +374,9 @@ async def test_http_transport(http_server: str):
) as client:
result = await client.ping()
assert result is True

greeting = await client.call_tool("greet", {"name": "World"})
assert greeting.data == "Hello, World!"
```

The `run_server_in_process` utility handles server lifecycle, port allocation, and cleanup automatically. This pattern is essential for testing transport-specific behavior like timeouts, headers, and authentication. Note that FastMCP often uses the `client_process` marker to isolate tests that spawn processes, as they can create contention in CI.
The `run_server_in_process` utility handles server lifecycle, port allocation, and cleanup automatically. Use this only when subprocess isolation is truly necessary, as it's slower and harder to debug than in-process testing. FastMCP uses the `client_process` marker to isolate these tests in CI.

### Documentation Testing

Expand Down
5 changes: 1 addition & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ dev = [
"pyinstrument>=5.0.2",
"pyperclip>=1.9.0",
"pytest>=8.3.3",
"pytest-asyncio>=0.23.5",
"pytest-cov>=6.1.1",
"pytest-env>=1.1.5",
"pytest-flakefinder",
Expand Down Expand Up @@ -97,9 +96,7 @@ fallback-version = "0.0.0"


[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "session"
asyncio_default_test_loop_scope = "session"
anyio_mode = "auto"
# filterwarnings = ["error::DeprecationWarning"]
timeout = 5
env = [
Expand Down
3 changes: 2 additions & 1 deletion src/fastmcp/server/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from enum import Enum
from typing import Any, Literal, cast, get_origin, overload

import anyio
from mcp import LoggingLevel, ServerSession
from mcp.server.lowlevel.helper_types import ReadResourceContents
from mcp.server.lowlevel.server import request_ctx
Expand Down Expand Up @@ -51,7 +52,7 @@

T = TypeVar("T", default=Any)
_current_context: ContextVar[Context | None] = ContextVar("context", default=None) # type: ignore[assignment]
_flush_lock = asyncio.Lock()
_flush_lock = anyio.Lock()


@dataclass
Expand Down
6 changes: 3 additions & 3 deletions src/fastmcp/server/middleware/rate_limiting.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Rate limiting middleware for protecting FastMCP servers from abuse."""

import asyncio
import time
from collections import defaultdict, deque
from collections.abc import Callable
from typing import Any

import anyio
from mcp import McpError
from mcp.types import ErrorData

Expand Down Expand Up @@ -33,7 +33,7 @@ def __init__(self, capacity: int, refill_rate: float):
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
self._lock = asyncio.Lock()
self._lock = anyio.Lock()

async def consume(self, tokens: int = 1) -> bool:
"""Try to consume tokens from the bucket.
Expand Down Expand Up @@ -71,7 +71,7 @@ def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
self._lock = asyncio.Lock()
self._lock = anyio.Lock()

async def is_allowed(self) -> bool:
"""Check if a request is allowed."""
Expand Down
84 changes: 84 additions & 0 deletions src/fastmcp/utilities/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
import time
from collections.abc import Callable, Generator
from contextlib import contextmanager
from functools import partial
from typing import TYPE_CHECKING, Any, Literal
from urllib.parse import parse_qs, urlparse

import anyio
import httpx
import uvicorn
from anyio.abc import TaskGroup

from fastmcp import settings
from fastmcp.client.auth.oauth import OAuth
Expand Down Expand Up @@ -140,6 +143,87 @@ def run_server_in_process(
raise RuntimeError("Server process failed to terminate even after kill")


async def run_server_async(
task_group: TaskGroup,
server: FastMCP,
port: int | None = None,
transport: str = "http",
path: str = "/mcp",
host: str = "127.0.0.1",
) -> str:
"""
Start a FastMCP server in an AnyIO task group for in-process async testing.

This is the recommended way to test FastMCP servers. It runs the server
as an async task in the same process, eliminating subprocess coordination,
sleeps, and cleanup issues.

Args:
task_group: AnyIO task group to run the server in
server: FastMCP server instance
port: Port to bind to (default: find available port)
transport: Transport type ("http" or "sse")
path: URL path for the server (default: "/mcp")
host: Host to bind to (default: "127.0.0.1")

Returns:
Server URL string

Example:
```python
import anyio
import pytest
from anyio.abc import TaskGroup
from fastmcp import FastMCP, Client
from fastmcp.client.transports import StreamableHttpTransport
from fastmcp.utilities.tests import run_server_async

@pytest.fixture
async def task_group():
async with anyio.create_task_group() as tg:
yield tg
tg.cancel_scope.cancel()

@pytest.fixture
async def server(task_group: TaskGroup):
mcp = FastMCP("test")

@mcp.tool()
def greet(name: str) -> str:
return f"Hello, {name}!"

url = await run_server_async(task_group, mcp)
return url

async def test_greet(server: str):
async with Client(StreamableHttpTransport(server)) as client:
result = await client.call_tool("greet", {"name": "World"})
assert result.content[0].text == "Hello, World!"
```
"""
if port is None:
port = find_available_port()

# Wait a tiny bit for the port to be released if it was just used
await anyio.sleep(0.01)

task_group.start_soon(
partial(
server.run_async,
host=host,
port=port,
transport=transport,
path=path,
show_banner=False,
)
)

# Give the server a moment to start
await anyio.sleep(0.1)

return f"http://{host}:{port}{path}"


@contextmanager
def caplog_for_fastmcp(caplog):
"""Context manager to capture logs from FastMCP loggers even when propagation is disabled."""
Expand Down
1 change: 0 additions & 1 deletion tests/cli/test_mcp_server_config_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def test_detect_test_fastmcp_json(self, tmp_path):
class TestConfigWithClient:
"""Test fastmcp.json configuration with client connections."""

@pytest.mark.asyncio
async def test_config_server_with_client(self, server_with_config):
"""Test that a server loaded from config works with a client."""
# Load the config
Expand Down
4 changes: 0 additions & 4 deletions tests/cli/test_server_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
class TestServerArguments:
"""Test passing arguments to servers."""

@pytest.mark.asyncio
async def test_server_with_argparse(self, tmp_path):
"""Test a server that uses argparse with command line arguments."""
server_file = tmp_path / "argparse_server.py"
Expand Down Expand Up @@ -53,7 +52,6 @@ def get_config() -> dict:
tools = await server.get_tools()
assert "get_config" in tools

@pytest.mark.asyncio
async def test_server_with_no_args(self, tmp_path):
"""Test a server that uses argparse with no arguments (defaults)."""
server_file = tmp_path / "default_server.py"
Expand All @@ -79,7 +77,6 @@ async def test_server_with_no_args(self, tmp_path):

assert server.name == "DefaultName"

@pytest.mark.asyncio
async def test_server_with_sys_argv_access(self, tmp_path):
"""Test a server that directly accesses sys.argv."""
server_file = tmp_path / "sysargv_server.py"
Expand Down Expand Up @@ -112,7 +109,6 @@ async def test_server_with_sys_argv_access(self, tmp_path):

assert server.name == "DirectServer"

@pytest.mark.asyncio
async def test_config_server_example(self):
"""Test the actual config_server.py example."""
# Find the examples directory
Expand Down
29 changes: 13 additions & 16 deletions tests/client/auth/test_oauth_client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from collections.abc import Generator
from urllib.parse import urlparse

import httpx
import pytest
from anyio.abc import TaskGroup

from fastmcp.client import Client
from fastmcp.client.transports import StreamableHttpTransport
from fastmcp.server.auth.auth import ClientRegistrationOptions
from fastmcp.server.auth.providers.in_memory import InMemoryOAuthProvider
from fastmcp.server.server import FastMCP
from fastmcp.utilities.tests import HeadlessOAuth, run_server_in_process
from fastmcp.utilities.http import find_available_port
from fastmcp.utilities.tests import HeadlessOAuth, run_server_async


def fastmcp_server(issuer_url: str):
Expand All @@ -35,31 +36,27 @@ def get_test_resource() -> str:
return server


def run_server(host: str, port: int, **kwargs) -> None:
fastmcp_server(f"http://{host}:{port}").run(host=host, port=port, **kwargs)


@pytest.fixture
def streamable_http_server() -> Generator[str, None, None]:
with run_server_in_process(run_server, transport="http") as url:
yield f"{url}/mcp"
async def streamable_http_server(task_group: TaskGroup):
"""Start OAuth-enabled server."""
port = find_available_port()
server = fastmcp_server(f"http://127.0.0.1:{port}")
url = await run_server_async(task_group, server, port=port, transport="http")
return url


@pytest.fixture()
@pytest.fixture
def client_unauthorized(streamable_http_server: str) -> Client:
return Client(transport=StreamableHttpTransport(streamable_http_server))


@pytest.fixture()
def client_with_headless_oauth(
streamable_http_server: str,
) -> Generator[Client, None, None]:
@pytest.fixture
def client_with_headless_oauth(streamable_http_server: str) -> Client:
"""Client with headless OAuth that bypasses browser interaction."""
client = Client(
return Client(
transport=StreamableHttpTransport(streamable_http_server),
auth=HeadlessOAuth(mcp_url=streamable_http_server),
)
yield client


async def test_unauthorized(client_unauthorized: Client):
Expand Down
6 changes: 0 additions & 6 deletions tests/client/auth/test_oauth_token_expiry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
from datetime import datetime, timedelta, timezone
from pathlib import Path

import pytest
from mcp.shared.auth import OAuthToken

from fastmcp.client.auth.oauth import FileTokenStorage


@pytest.mark.asyncio
async def test_token_storage_with_expiry(tmp_path: Path):
"""Test that tokens are stored with absolute expiry time and loaded correctly."""
storage = FileTokenStorage("http://test.example.com", cache_dir=tmp_path)
Expand Down Expand Up @@ -52,7 +50,6 @@ async def test_token_storage_with_expiry(tmp_path: Path):
assert 3595 <= loaded_token.expires_in <= 3600


@pytest.mark.asyncio
async def test_expired_token_returns_none(tmp_path: Path):
"""Test that expired tokens return None when loaded."""
storage = FileTokenStorage("http://test.example.com", cache_dir=tmp_path)
Expand All @@ -79,7 +76,6 @@ async def test_expired_token_returns_none(tmp_path: Path):
assert loaded_token is None


@pytest.mark.asyncio
async def test_token_without_expiry(tmp_path: Path):
"""Test that tokens without expires_in are handled correctly."""
storage = FileTokenStorage("http://test.example.com", cache_dir=tmp_path)
Expand Down Expand Up @@ -109,7 +105,6 @@ async def test_token_without_expiry(tmp_path: Path):
assert loaded_token.expires_in is None


@pytest.mark.asyncio
async def test_invalid_format_returns_none(tmp_path: Path):
"""Test that invalid token format returns None."""
storage = FileTokenStorage("http://test.example.com", cache_dir=tmp_path)
Expand All @@ -129,7 +124,6 @@ async def test_invalid_format_returns_none(tmp_path: Path):
assert loaded_token is None


@pytest.mark.asyncio
async def test_token_expiry_recalculated_on_load(tmp_path: Path):
"""Test that expires_in is correctly recalculated when loading tokens."""
storage = FileTokenStorage("http://test.example.com", cache_dir=tmp_path)
Expand Down
Loading
Loading