diff --git a/docs/servers/auth/oauth-proxy.mdx b/docs/servers/auth/oauth-proxy.mdx
index a5109ea0a..c06276e26 100644
--- a/docs/servers/auth/oauth-proxy.mdx
+++ b/docs/servers/auth/oauth-proxy.mdx
@@ -10,9 +10,9 @@ import { VersionBadge } from "/snippets/version-badge.mdx";
-OAuth Proxy enables FastMCP servers to authenticate with OAuth providers that **don't support Dynamic Client Registration (DCR)**. This includes virtually all traditional OAuth providers: GitHub, Google, Azure, AWS, Discord, Facebook, and most enterprise identity systems. For providers that do support DCR (like Descope and WorkOS AuthKit), use [`RemoteAuthProvider`](/servers/auth/remote-oauth) instead.
+The OAuth proxy enables FastMCP servers to authenticate with OAuth providers that **don't support Dynamic Client Registration (DCR)**. This includes virtually all traditional OAuth providers: GitHub, Google, Azure, AWS, Discord, Facebook, and most enterprise identity systems. For providers that do support DCR (like Descope and WorkOS AuthKit), use [`RemoteAuthProvider`](/servers/auth/remote-oauth) instead.
-MCP clients expect to register automatically and obtain credentials on the fly, but traditional providers require manual app registration through their developer consoles. OAuth Proxy bridges this gap by presenting a DCR-compliant interface to MCP clients while using your pre-registered credentials with the upstream provider. When a client attempts to register, the proxy returns your fixed credentials. When a client initiates authorization, the proxy handles the complexity of callback forwarding—storing the client's dynamic callback URL, using its own fixed callback with the provider, then forwarding back to the client after token exchange.
+MCP clients expect to register automatically and obtain credentials on the fly, but traditional providers require manual app registration through their developer consoles. The OAuth proxy bridges this gap by presenting a DCR-compliant interface to MCP clients while using your pre-registered credentials with the upstream provider. When a client attempts to register, the proxy returns your fixed credentials. When a client initiates authorization, the proxy handles the complexity of callback forwarding—storing the client's dynamic callback URL, using its own fixed callback with the provider, then forwarding back to the client after token exchange.
This approach enables any MCP client (whether using random localhost ports or fixed URLs like Claude.ai) to authenticate with any traditional OAuth provider, all while maintaining full OAuth 2.1 and PKCE security.
@@ -20,7 +20,7 @@ This approach enables any MCP client (whether using random localhost ports or fi
For providers that support OIDC discovery (Auth0, Google with OIDC
configuration, Azure AD), consider using [`OIDC
Proxy`](/servers/auth/oidc-proxy) for automatic configuration. OIDC Proxy
- extends OAuth Proxy to automatically discover endpoints from the provider's
+ extends the OAuth proxy to automatically discover endpoints from the provider's
`/.well-known/openid-configuration` URL, simplifying setup.
@@ -28,7 +28,7 @@ This approach enables any MCP client (whether using random localhost ports or fi
### Provider Setup Requirements
-Before using OAuth Proxy, you need to register your application with your OAuth provider:
+Before using the OAuth proxy, you need to register your application with your OAuth provider:
1. **Register your application** in the provider's developer console (GitHub Settings, Google Cloud Console, Azure Portal, etc.)
2. **Configure the redirect URI** as your FastMCP server URL plus your chosen callback path:
@@ -41,12 +41,12 @@ Before using OAuth Proxy, you need to register your application with your OAuth
The redirect URI you configure with your provider must exactly match your
FastMCP server's URL plus the callback path. If you customize `redirect_path`
- in OAuth Proxy, update your provider's redirect URI accordingly.
+ in the OAuth proxy, update your provider's redirect URI accordingly.
### Basic Setup
-Here's how to implement OAuth Proxy with any provider:
+Here's how to implement the OAuth proxy with any provider:
```python
from fastmcp import FastMCP
@@ -203,48 +203,6 @@ auth = OAuthProxy(..., client_storage=InMemoryStorage())
-### Provider-Specific Parameters
-
-Some OAuth providers require additional parameters beyond the standard OAuth2 flow. Use `extra_authorize_params` and `extra_token_params` to handle these requirements:
-
-#### Auth0 Example
-
-Auth0 requires an `audience` parameter to issue JWT tokens instead of opaque tokens:
-
-```python
-auth = OAuthProxy(
- upstream_authorization_endpoint="https://your-domain.auth0.com/authorize",
- upstream_token_endpoint="https://your-domain.auth0.com/oauth/token",
- upstream_client_id="your-auth0-client-id",
- upstream_client_secret="your-auth0-client-secret",
-
- # Auth0 requires audience for JWT tokens
- extra_authorize_params={
- "audience": "https://your-api-identifier.com"
- },
- extra_token_params={
- "audience": "https://your-api-identifier.com"
- },
-
- token_verifier=JWTVerifier(
- jwks_uri="https://your-domain.auth0.com/.well-known/jwks.json",
- issuer="https://your-domain.auth0.com/",
- audience="https://your-api-identifier.com"
- ),
-
- base_url="https://your-server.com"
-)
-```
-
-#### RFC 8707 Resource Indicators
-
-MCP clients can specify target resources using the standard `resource` parameter (RFC 8707). This is automatically forwarded when present:
-
-```python
-# Client code (automatic - no server configuration needed)
-# The resource parameter is passed through from AuthorizationParams
-```
-
### Using Built-in Providers
FastMCP includes pre-configured providers for common services:
@@ -263,21 +221,57 @@ mcp = FastMCP(name="My Server", auth=auth)
Available providers include `GitHubProvider`, `GoogleProvider`, and others. These handle token verification automatically.
+### Token Verification
+
+The OAuth proxy requires a compatible `TokenVerifier` to validate tokens from your provider. Different providers use different token formats:
+
+- **JWT tokens** (Google, Azure): Use `JWTVerifier` with the provider's JWKS endpoint
+- **Opaque tokens** (GitHub, Discord): Use provider-specific verifiers or implement custom validation
+
+See the [Token Verification guide](/servers/auth/token-verification) for detailed setup instructions for your provider.
+
### Scope Configuration
-OAuth scopes are configured through your `TokenVerifier`. Set `required_scopes` to automatically request the permissions your application needs:
+OAuth scopes control what permissions your application requests from users. They're configured through your `TokenVerifier` (required for the OAuth proxy to validate tokens from your provider). Set `required_scopes` to automatically request the permissions your application needs:
```python
JWTVerifier(..., required_scopes = ["read:user", "write:data"])
```
-Dynamic clients created by the proxy will automatically include these scopes in their authorization requests.
+Dynamic clients created by the proxy will automatically include these scopes in their authorization requests. See the [Token Verification](#token-verification) section below for detailed setup.
+
+### Custom Parameters
+
+Some OAuth providers require additional parameters beyond the standard OAuth2 flow. Use `extra_authorize_params` and `extra_token_params` to pass provider-specific requirements. For example, Auth0 requires an `audience` parameter to issue JWT tokens instead of opaque tokens:
+
+```python
+auth = OAuthProxy(
+ upstream_authorization_endpoint="https://your-domain.auth0.com/authorize",
+ upstream_token_endpoint="https://your-domain.auth0.com/oauth/token",
+ upstream_client_id="your-auth0-client-id",
+ upstream_client_secret="your-auth0-client-secret",
+
+ # Auth0-specific audience parameter
+ extra_authorize_params={"audience": "https://your-api-identifier.com"},
+ extra_token_params={"audience": "https://your-api-identifier.com"},
+
+ token_verifier=JWTVerifier(
+ jwks_uri="https://your-domain.auth0.com/.well-known/jwks.json",
+ issuer="https://your-domain.auth0.com/",
+ audience="https://your-api-identifier.com"
+ ),
+ base_url="https://your-server.com"
+)
+```
+
+The proxy also automatically forwards RFC 8707 `resource` parameters from MCP clients to upstream providers that support them.
-## How It Works
+## OAuth Flow
```mermaid
sequenceDiagram
participant Client as MCP Client (localhost:random)
+ participant User as User
participant Proxy as FastMCP OAuth Proxy (server:8000)
participant Provider as OAuth Provider (GitHub, etc.)
@@ -285,25 +279,27 @@ sequenceDiagram
Client->>Proxy: 1. POST /register redirect_uri: localhost:54321/callback
Proxy-->>Client: 2. Returns fixed upstream credentials
- Note over Client, Proxy: Authorization with PKCE & Callback Forwarding
+ Note over Client, User: Authorization with User Consent
Client->>Proxy: 3. GET /authorize redirect_uri=localhost:54321/callback code_challenge=CLIENT_CHALLENGE
Note over Proxy: Store transaction with client PKCE Generate proxy PKCE pair
- Proxy->>Provider: 4. Redirect to provider redirect_uri=server:8000/auth/callback code_challenge=PROXY_CHALLENGE
+ Proxy->>User: 4. Show consent page (client details, redirect URI, scopes)
+ User->>Proxy: 5. Approve/deny consent
+ Proxy->>Provider: 6. Redirect to provider redirect_uri=server:8000/auth/callback code_challenge=PROXY_CHALLENGE
Note over Provider, Proxy: Provider Callback
- Provider->>Proxy: 5. GET /auth/callback with authorization code
- Proxy->>Provider: 6. Exchange code for tokens code_verifier=PROXY_VERIFIER
- Provider-->>Proxy: 7. Access & refresh tokens
+ Provider->>Proxy: 7. GET /auth/callback with authorization code
+ Proxy->>Provider: 8. Exchange code for tokens code_verifier=PROXY_VERIFIER
+ Provider-->>Proxy: 9. Access & refresh tokens
Note over Proxy, Client: Client Callback Forwarding
- Proxy->>Client: 8. Redirect to localhost:54321/callback with new authorization code
+ Proxy->>Client: 10. Redirect to localhost:54321/callback with new authorization code
Note over Client, Proxy: Token Exchange
- Client->>Proxy: 9. POST /token with code code_verifier=CLIENT_VERIFIER
- Proxy-->>Client: 10. Returns stored provider tokens
+ Client->>Proxy: 11. POST /token with code code_verifier=CLIENT_VERIFIER
+ Proxy-->>Client: 12. Returns stored provider tokens
```
-The flow diagram above illustrates the complete OAuth Proxy pattern. Let's understand each phase:
+The flow diagram above illustrates the complete OAuth proxy pattern. Let's understand each phase:
### Registration Phase
@@ -315,9 +311,10 @@ The client initiates OAuth by redirecting to the proxy's `/authorize` endpoint.
1. Stores the client's transaction with its PKCE challenge
2. Generates its own PKCE parameters for upstream security
-3. Redirects to the upstream provider using the fixed callback URL
+3. Shows the user a consent page with the client's details, redirect URI, and requested scopes
+4. If the user approves (or the client was previously approved), redirects to the upstream provider using the fixed callback URL
-This dual-PKCE approach maintains end-to-end security at both the client-to-proxy and proxy-to-provider layers.
+This dual-PKCE approach maintains end-to-end security at both the client-to-proxy and proxy-to-provider layers. The consent step protects against confused deputy attacks by ensuring you explicitly approve each client before it can complete authorization.
### Callback Phase
@@ -336,7 +333,7 @@ This entire flow is transparent to the MCP client—it experiences a standard OA
### PKCE Forwarding
-OAuth Proxy automatically handles PKCE (Proof Key for Code Exchange) when working with providers that support or require it. The proxy generates its own PKCE parameters to send upstream while separately validating the client's PKCE, ensuring end-to-end security at both layers.
+The OAuth proxy automatically handles PKCE (Proof Key for Code Exchange) when working with providers that support or require it. The proxy generates its own PKCE parameters to send upstream while separately validating the client's PKCE, ensuring end-to-end security at both layers.
This is enabled by default via the `forward_pkce` parameter and works seamlessly with providers like Google, Azure AD, and GitHub. Only disable it for legacy providers that don't support PKCE:
@@ -350,7 +347,7 @@ auth = OAuthProxy(
### Redirect URI Validation
-While OAuth Proxy accepts all redirect URIs by default (for DCR compatibility), you can restrict which clients can connect by specifying allowed patterns:
+While the OAuth proxy accepts all redirect URIs by default (for DCR compatibility), you can restrict which clients can connect by specifying allowed patterns:
```python
# Allow only localhost clients (common for development)
@@ -375,20 +372,29 @@ auth = OAuthProxy(
Check your server logs for "Client registered with redirect_uri" messages to identify what URLs your clients use.
-## Token Verification
+## Security
-OAuth Proxy requires a compatible `TokenVerifier` to validate tokens from your provider. Different providers use different token formats:
+### Confused Deputy Attacks
-- **JWT tokens** (Google, Azure): Use `JWTVerifier` with the provider's JWKS endpoint
-- **Opaque tokens** (GitHub, Discord): Use provider-specific verifiers or implement custom validation
+
-See the [Token Verification guide](/servers/auth/token-verification) for detailed setup instructions for your provider.
+A confused deputy attack allows a malicious client to steal your authorization by tricking you into granting it access under your identity.
+
+The OAuth proxy works by bridging DCR clients to traditional auth providers, which means that multiple MCP clients connect through a single upstream OAuth application. An attacker can exploit this shared application by registering a malicious client with their own redirect URI, then sending you an authorization link. When you click it, your browser goes through the OAuth flow—but since you may have already authorized this OAuth app before, the provider might auto-approve the request. The authorization code then gets sent to the attacker's redirect URI instead of a legitimate client, giving them access under your credentials.
+
+#### Mitigation
+
+FastMCP's OAuth proxy requires you to explicitly consent whenever any new or unrecognized client attempts to connect to your server. Before any authorization happens, you see a consent page showing the client's details, redirect URI, and requested scopes. This gives you the opportunity to review and deny suspicious requests. Once you approve a client, it's remembered so you don't see the consent page again for that client. The consent mechanism is implemented with CSRF tokens and cryptographically signed cookies to prevent tampering.
+
+**Learn more:**
+- [MCP Security Best Practices](https://modelcontextprotocol.io/specification/2025-06-18/basic/security_best_practices#confused-deputy-problem) - Official specification guidance
+- [Confused Deputy Attacks Explained](https://den.dev/blog/mcp-confused-deputy-api-management/) - Detailed walkthrough by Den Delimarsky
## Environment Configuration
-For production deployments, configure OAuth Proxy through environment variables instead of hardcoding credentials:
+For production deployments, configure the OAuth proxy through environment variables instead of hardcoding credentials:
```bash
# Specify the provider implementation
diff --git a/docs/servers/auth/oidc-proxy.mdx b/docs/servers/auth/oidc-proxy.mdx
index e08d0c358..22f5e2ee8 100644
--- a/docs/servers/auth/oidc-proxy.mdx
+++ b/docs/servers/auth/oidc-proxy.mdx
@@ -10,15 +10,15 @@ import { VersionBadge } from "/snippets/version-badge.mdx";
-OIDC Proxy enables FastMCP servers to authenticate with OIDC providers that **don't support Dynamic Client Registration (DCR)** out of the box. This includes OAuth providers like: Auth0, Google, Azure, AWS, etc. For providers that do support DCR (like WorkOS AuthKit), use [`RemoteAuthProvider`](/servers/auth/remote-oauth) instead.
+The OIDC proxy enables FastMCP servers to authenticate with OIDC providers that **don't support Dynamic Client Registration (DCR)** out of the box. This includes OAuth providers like: Auth0, Google, Azure, AWS, etc. For providers that do support DCR (like WorkOS AuthKit), use [`RemoteAuthProvider`](/servers/auth/remote-oauth) instead.
-The OIDC Proxy is built upon [`OAuthProxy`](/servers/auth/oauth-proxy) so it has all the same functionality under the covers.
+The OIDC proxy is built upon [`OAuthProxy`](/servers/auth/oauth-proxy) so it has all the same functionality under the covers.
## Implementation
### Provider Setup Requirements
-Before using OIDC Proxy, you need to register your application with your OAuth provider:
+Before using the OIDC proxy, you need to register your application with your OAuth provider:
1. **Register your application** in the provider's developer console (Auth0 Applications, Google Cloud Console, Azure Portal, etc.)
2. **Configure the redirect URI** as your FastMCP server URL plus your chosen callback path:
@@ -30,12 +30,12 @@ Before using OIDC Proxy, you need to register your application with your OAuth p
The redirect URI you configure with your provider must exactly match your
FastMCP server's URL plus the callback path. If you customize `redirect_path`
- in OAuth Proxy, update your provider's redirect URI accordingly.
+ in the OIDC proxy, update your provider's redirect URI accordingly.
### Basic Setup
-Here's how to implement OIDC Proxy with any provider:
+Here's how to implement the OIDC proxy with any provider:
```python
from fastmcp import FastMCP
@@ -172,7 +172,7 @@ Dynamic clients created by the proxy will automatically include these scopes in
-For production deployments, configure OIDC Proxy through environment variables instead of hardcoding credentials:
+For production deployments, configure the OIDC proxy through environment variables instead of hardcoding credentials:
```bash
# Specify the provider implementation
diff --git a/src/fastmcp/client/oauth_callback.py b/src/fastmcp/client/oauth_callback.py
index c6da794e8..cf1166a80 100644
--- a/src/fastmcp/client/oauth_callback.py
+++ b/src/fastmcp/client/oauth_callback.py
@@ -12,12 +12,21 @@
from starlette.applications import Starlette
from starlette.requests import Request
-from starlette.responses import HTMLResponse
from starlette.routing import Route
from uvicorn import Config, Server
from fastmcp.utilities.http import find_available_port
from fastmcp.utilities.logging import get_logger
+from fastmcp.utilities.ui import (
+ HELPER_TEXT_STYLES,
+ INFO_BOX_STYLES,
+ STATUS_MESSAGE_STYLES,
+ create_info_box,
+ create_logo,
+ create_page,
+ create_secure_html_response,
+ create_status_message,
+)
logger = get_logger(__name__)
@@ -29,155 +38,41 @@ def create_callback_html(
server_url: str | None = None,
) -> str:
"""Create a styled HTML response for OAuth callbacks."""
- logo_url = "https://gofastmcp.com/assets/brand/blue-logo.png"
-
# Build the main status message
- if is_success:
- status_title = "Authentication successful"
- status_icon = "✓"
- icon_bg = "#10b98120"
- else:
- status_title = "Authentication failed"
- status_icon = "✕"
- icon_bg = "#ef444420"
+ status_title = (
+ "Authentication successful" if is_success else "Authentication failed"
+ )
# Add detail info box for both success and error cases
detail_info = ""
if is_success and server_url:
- detail_info = f"""
-
-
-
"""
+ # Additional styles needed for this page
+ additional_styles = STATUS_MESSAGE_STYLES + INFO_BOX_STYLES + HELPER_TEXT_STYLES
+
+ return create_page(
+ content=content,
+ title=title,
+ additional_styles=additional_styles,
+ )
+
@dataclass
class CallbackResponse:
@@ -221,32 +116,34 @@ async def callback_handler(request: Request):
if callback_response.error:
error_desc = callback_response.error_description or "Unknown error"
+ # Create user-friendly error messages
+ if callback_response.error == "access_denied":
+ user_message = "Access was denied by the authorization server."
+ else:
+ user_message = f"Authorization failed: {error_desc}"
+
# Resolve future with exception if provided
if response_future and not response_future.done():
- response_future.set_exception(
- RuntimeError(
- f"OAuth error: {callback_response.error} - {error_desc}"
- )
- )
+ response_future.set_exception(RuntimeError(user_message))
- return HTMLResponse(
+ return create_secure_html_response(
create_callback_html(
- f"FastMCP OAuth Error: {callback_response.error} {error_desc}",
+ user_message,
is_success=False,
),
status_code=400,
)
if not callback_response.code:
+ user_message = "No authorization code was received from the server."
+
# Resolve future with exception if provided
if response_future and not response_future.done():
- response_future.set_exception(
- RuntimeError("OAuth callback missing authorization code")
- )
+ response_future.set_exception(RuntimeError(user_message))
- return HTMLResponse(
+ return create_secure_html_response(
create_callback_html(
- "FastMCP OAuth Error: No authorization code received",
+ user_message,
is_success=False,
),
status_code=400,
@@ -254,17 +151,17 @@ async def callback_handler(request: Request):
# Check for missing state parameter (indicates OAuth flow issue)
if callback_response.state is None:
+ user_message = (
+ "The OAuth server did not return the expected state parameter."
+ )
+
# Resolve future with exception if provided
if response_future and not response_future.done():
- response_future.set_exception(
- RuntimeError(
- "OAuth server did not return state parameter - authentication failed"
- )
- )
+ response_future.set_exception(RuntimeError(user_message))
- return HTMLResponse(
+ return create_secure_html_response(
create_callback_html(
- "FastMCP OAuth Error: Authentication failed The OAuth server did not return the expected state parameter",
+ user_message,
is_success=False,
),
status_code=400,
@@ -276,7 +173,7 @@ async def callback_handler(request: Request):
(callback_response.code, callback_response.state)
)
- return HTMLResponse(
+ return create_secure_html_response(
create_callback_html("", is_success=True, server_url=server_url)
)
diff --git a/src/fastmcp/server/auth/oauth_proxy.py b/src/fastmcp/server/auth/oauth_proxy.py
index 998aa05a6..2781f3516 100644
--- a/src/fastmcp/server/auth/oauth_proxy.py
+++ b/src/fastmcp/server/auth/oauth_proxy.py
@@ -18,12 +18,15 @@
from __future__ import annotations
+import base64
import hashlib
+import hmac
+import json
import secrets
import time
from base64 import urlsafe_b64encode
from typing import TYPE_CHECKING, Any, Final
-from urllib.parse import urlencode
+from urllib.parse import urlencode, urlparse
import httpx
from authlib.common.security import generate_token
@@ -48,14 +51,26 @@
RevocationOptions,
)
from mcp.shared.auth import OAuthClientInformationFull, OAuthToken
-from pydantic import AnyHttpUrl, AnyUrl, Field, SecretStr
+from pydantic import AnyHttpUrl, AnyUrl, BaseModel, Field, SecretStr
from starlette.requests import Request
-from starlette.responses import RedirectResponse
+from starlette.responses import HTMLResponse, RedirectResponse
from starlette.routing import Route
from fastmcp.server.auth.auth import OAuthProvider, TokenVerifier
-from fastmcp.server.auth.redirect_validation import validate_redirect_uri
+from fastmcp.server.auth.redirect_validation import (
+ validate_redirect_uri,
+)
from fastmcp.utilities.logging import get_logger
+from fastmcp.utilities.ui import (
+ BUTTON_STYLES,
+ DETAIL_BOX_STYLES,
+ INFO_BOX_STYLES,
+ TOOLTIP_STYLES,
+ create_detail_box,
+ create_logo,
+ create_page,
+ create_secure_html_response,
+)
if TYPE_CHECKING:
pass
@@ -63,6 +78,62 @@
logger = get_logger(__name__)
+# -------------------------------------------------------------------------
+# Constants
+# -------------------------------------------------------------------------
+
+# Default token expiration times
+DEFAULT_ACCESS_TOKEN_EXPIRY_SECONDS: Final[int] = 60 * 60 # 1 hour
+DEFAULT_AUTH_CODE_EXPIRY_SECONDS: Final[int] = 5 * 60 # 5 minutes
+
+# HTTP client timeout
+HTTP_TIMEOUT_SECONDS: Final[int] = 30
+
+
+# -------------------------------------------------------------------------
+# Pydantic Models
+# -------------------------------------------------------------------------
+
+
+class OAuthTransaction(BaseModel):
+ """OAuth transaction state for consent flow.
+
+ Stored server-side to track active authorization flows with client context.
+ Includes CSRF tokens for consent protection per MCP security best practices.
+ """
+
+ txn_id: str
+ client_id: str
+ client_redirect_uri: str
+ client_state: str
+ code_challenge: str | None
+ code_challenge_method: str
+ scopes: list[str]
+ created_at: float
+ resource: str | None = None
+ proxy_code_verifier: str | None = None
+ csrf_token: str | None = None
+ csrf_expires_at: float | None = None
+
+
+class ClientCode(BaseModel):
+ """Client authorization code with PKCE and upstream tokens.
+
+ Stored server-side after upstream IdP callback. Contains the upstream
+ tokens bound to the client's PKCE challenge for secure token exchange.
+ """
+
+ code: str
+ client_id: str
+ redirect_uri: str
+ code_challenge: str | None
+ code_challenge_method: str
+ scopes: list[str]
+ idp_tokens: dict[str, Any]
+ expires_at: float
+ created_at: float
+
+
class ProxyDCRClient(OAuthClientInformationFull):
"""Client for DCR proxy with configurable redirect URI validation.
@@ -90,6 +161,7 @@ class ProxyDCRClient(OAuthClientInformationFull):
"""
allowed_redirect_uri_patterns: list[str] | None = Field(default=None)
+ client_name: str | None = Field(default=None)
def validate_redirect_uri(self, redirect_uri: AnyUrl | None) -> AnyUrl:
"""Validate redirect URI against allowed patterns.
@@ -112,12 +184,110 @@ def validate_redirect_uri(self, redirect_uri: AnyUrl | None) -> AnyUrl:
return super().validate_redirect_uri(redirect_uri)
-# Default token expiration times
-DEFAULT_ACCESS_TOKEN_EXPIRY_SECONDS: Final[int] = 60 * 60 # 1 hour
-DEFAULT_AUTH_CODE_EXPIRY_SECONDS: Final[int] = 5 * 60 # 5 minutes
+# -------------------------------------------------------------------------
+# Helper Functions
+# -------------------------------------------------------------------------
+
+
+def create_consent_html(
+ client_id: str,
+ redirect_uri: str,
+ scopes: list[str],
+ txn_id: str,
+ csrf_token: str,
+ client_name: str | None = None,
+ title: str = "Authorization Consent",
+) -> str:
+ """Create a styled HTML consent page for OAuth authorization requests."""
+ # Format scopes for display
+ scopes_display = ", ".join(scopes) if scopes else "None"
+
+ # Build warning box with client name if available
+ client_display = client_name or client_id
+ warning_box = f"""
+
+
{client_display} is requesting access to this FastMCP server.
+
Review the details below before approving.
+
+ """
-# HTTP client timeout
-HTTP_TIMEOUT_SECONDS: Final[int] = 30
+ # Build detail box with client information
+ detail_rows = []
+ if client_name:
+ detail_rows.append(("Client Name", client_name))
+ detail_rows.extend(
+ [
+ ("Client ID", client_id),
+ ("Redirect URI", redirect_uri),
+ ("Requested Scopes", scopes_display),
+ ]
+ )
+ detail_box = create_detail_box(detail_rows)
+
+ # Build form with buttons
+ form = f"""
+
+ """
+
+ # Build help link with tooltip
+ help_link = """
+
+
+ Why am I seeing this?
+
+ This FastMCP server requires your consent to allow a new client
+ to connect. This protects you from confused deputy
+ attacks, where malicious clients could impersonate you
+ and steal access.
+ """
+
+
+def create_info_box(
+ content: str, is_error: bool = False, centered: bool = False
+) -> str:
+ """
+ Create an info box.
+
+ Args:
+ content: HTML content for the info box
+ is_error: True for error styling, False for normal
+ centered: True to center the text, False for left-aligned
+
+ Returns:
+ HTML for info box
+ """
+ classes = ["info-box"]
+ if is_error:
+ classes.append("error")
+ if centered:
+ classes.append("centered")
+ class_str = " ".join(classes)
+ return f'
{content}
'
+
+
+def create_detail_box(rows: list[tuple[str, str]]) -> str:
+ """
+ Create a detail box with key-value pairs.
+
+ Args:
+ rows: List of (label, value) tuples
+
+ Returns:
+ HTML for detail box
+ """
+ rows_html = "\n".join(
+ f"""
+
+
{label}:
+
{value}
+
+ """
+ for label, value in rows
+ )
+
+ return f'
{rows_html}
'
+
+
+def create_button_group(buttons: list[tuple[str, str, str]]) -> str:
+ """
+ Create a group of buttons.
+
+ Args:
+ buttons: List of (text, value, css_class) tuples
+
+ Returns:
+ HTML for button group
+ """
+ buttons_html = "\n".join(
+ f''
+ for text, value, css_class in buttons
+ )
+
+ return f'
{buttons_html}
'
+
+
+def create_secure_html_response(html: str, status_code: int = 200) -> HTMLResponse:
+ """
+ Create an HTMLResponse with security headers.
+
+ Adds X-Frame-Options: DENY to prevent clickjacking attacks per MCP security best practices.
+
+ Args:
+ html: HTML content to return
+ status_code: HTTP status code
+
+ Returns:
+ HTMLResponse with security headers
+ """
+ return HTMLResponse(
+ content=html,
+ status_code=status_code,
+ headers={"X-Frame-Options": "DENY"},
+ )
diff --git a/tests/integration_tests/auth/test_github_provider_integration.py b/tests/integration_tests/auth/test_github_provider_integration.py
index 1fbb14b8c..54d159fbd 100644
--- a/tests/integration_tests/auth/test_github_provider_integration.py
+++ b/tests/integration_tests/auth/test_github_provider_integration.py
@@ -11,6 +11,7 @@
"""
import os
+import re
from collections.abc import Generator
from urllib.parse import parse_qs, urlparse
@@ -84,6 +85,8 @@ async def mock_authorize(client, params):
import secrets
import time
+ from fastmcp.server.auth.oauth_proxy import ClientCode
+
# Generate a fake authorization code
fake_code = secrets.token_urlsafe(32)
@@ -94,17 +97,21 @@ async def mock_authorize(client, params):
"expires_in": 3600,
}
- # Store the mock tokens in the proxy's client codes
- auth._client_codes[fake_code] = {
- "client_id": client.client_id,
- "redirect_uri": str(params.redirect_uri),
- "code_challenge": params.code_challenge,
- "code_challenge_method": getattr(params, "code_challenge_method", "S256"),
- "scopes": params.scopes or [],
- "idp_tokens": mock_tokens,
- "expires_at": int(time.time() + 300), # 5 minutes
- "created_at": time.time(),
- }
+ # Store the mock tokens in the proxy's code storage
+ await auth._code_store.put(
+ key=fake_code,
+ value=ClientCode(
+ code=fake_code,
+ client_id=client.client_id,
+ redirect_uri=str(params.redirect_uri),
+ code_challenge=params.code_challenge,
+ code_challenge_method=getattr(params, "code_challenge_method", "S256"),
+ scopes=params.scopes or [],
+ idp_tokens=mock_tokens,
+ expires_at=int(time.time() + 300), # 5 minutes
+ created_at=time.time(),
+ ),
+ )
# Return the redirect to the client's callback with the fake code
callback_params = {
@@ -204,11 +211,12 @@ async def test_github_oauth_credentials_available():
async def test_github_oauth_authorization_redirect(github_server: str):
- """Test that GitHub OAuth authorization redirects to GitHub correctly.
+ """Test that GitHub OAuth authorization redirects to GitHub correctly through consent flow.
Since HeadlessOAuth can't handle real GitHub redirects, we test that:
1. DCR client registration works
- 2. Authorization endpoint redirects to GitHub with correct parameters
+ 2. Authorization endpoint redirects to consent page
+ 3. Consent approval redirects to GitHub with correct parameters
"""
# Extract base URL
parsed = urlparse(github_server)
@@ -235,7 +243,7 @@ async def test_github_oauth_authorization_redirect(github_server: str):
client_id = client_info["client_id"]
assert client_id is not None
- # Step 2: Test authorization endpoint redirects to GitHub
+ # Step 2: Test authorization endpoint redirects to consent page
auth_url = f"{base_url}/authorize"
auth_params = {
"response_type": "code",
@@ -250,9 +258,44 @@ async def test_github_oauth_authorization_redirect(github_server: str):
auth_url, params=auth_params, follow_redirects=False
)
- # Should redirect to GitHub
+ # Should redirect to consent page (confused deputy protection)
assert auth_response.status_code == 302
- redirect_location = auth_response.headers["location"]
+ consent_location = auth_response.headers["location"]
+ assert "/consent" in consent_location
+
+ # Step 3: Visit consent page to get CSRF token
+ consent_response = await http_client.get(
+ consent_location, follow_redirects=False
+ )
+ assert consent_response.status_code == 200
+
+ # Extract CSRF token from consent page HTML
+ csrf_match = re.search(
+ r'name="csrf_token"\s+value="([^"]+)"', consent_response.text
+ )
+ assert csrf_match, "CSRF token not found in consent page"
+ csrf_token = csrf_match.group(1)
+
+ # Extract txn_id from consent URL
+ txn_id_match = re.search(r"txn_id=([^&]+)", consent_location)
+ assert txn_id_match, "txn_id not found in consent URL"
+ txn_id = txn_id_match.group(1)
+
+ # Step 4: Approve consent
+ approve_response = await http_client.post(
+ f"{base_url}/consent/submit",
+ data={
+ "action": "approve",
+ "txn_id": txn_id,
+ "csrf_token": csrf_token,
+ },
+ cookies=consent_response.cookies,
+ follow_redirects=False,
+ )
+
+ # Should redirect to GitHub
+ assert approve_response.status_code in (302, 303)
+ redirect_location = approve_response.headers["location"]
# Parse redirect URL - should be GitHub
redirect_parsed = urlparse(redirect_location)
diff --git a/tests/server/auth/providers/test_azure.py b/tests/server/auth/providers/test_azure.py
index ec360403a..970a5c3b3 100644
--- a/tests/server/auth/providers/test_azure.py
+++ b/tests/server/auth/providers/test_azure.py
@@ -213,6 +213,14 @@ async def test_authorize_filters_resource_and_prefixes_scopes_with_audience(self
base_url="https://srv.example",
)
+ await provider.register_client(
+ OAuthClientInformationFull(
+ client_id="dummy",
+ client_secret="secret",
+ redirect_uris=[AnyUrl("http://localhost:12345/callback")],
+ )
+ )
+
client = OAuthClientInformationFull(
client_id="dummy",
client_secret="secret",
@@ -230,13 +238,19 @@ async def test_authorize_filters_resource_and_prefixes_scopes_with_audience(self
url = await provider.authorize(client, params)
+ # Extract transaction ID from consent redirect
parsed = urlparse(url)
qs = parse_qs(parsed.query)
- assert "resource" not in qs
- scope_value = qs.get("scope", [""])[0]
- scope_parts = scope_value.split(" ") if scope_value else []
- assert "api://my-api/read" in scope_parts
- assert "api://my-api/profile" in scope_parts
+ assert "txn_id" in qs, "Should redirect to consent page with transaction ID"
+ txn_id = qs["txn_id"][0]
+
+ # Verify transaction contains correct parameters (resource filtered, scopes prefixed)
+ transaction = await provider._transaction_store.get(key=txn_id)
+ assert transaction is not None
+ assert "api://my-api/read" in transaction.scopes
+ assert "api://my-api/profile" in transaction.scopes
+ # Azure provider filters resource parameter (not stored in transaction)
+ assert transaction.resource is None
@pytest.mark.asyncio
async def test_authorize_appends_unprefixed_additional_scopes(self):
@@ -251,6 +265,14 @@ async def test_authorize_appends_unprefixed_additional_scopes(self):
additional_authorize_scopes=["Mail.Read", "User.Read"],
)
+ await provider.register_client(
+ OAuthClientInformationFull(
+ client_id="dummy",
+ client_secret="secret",
+ redirect_uris=[AnyUrl("http://localhost:12345/callback")],
+ )
+ )
+
client = OAuthClientInformationFull(
client_id="dummy",
client_secret="secret",
@@ -267,10 +289,15 @@ async def test_authorize_appends_unprefixed_additional_scopes(self):
url = await provider.authorize(client, params)
+ # Extract transaction ID from consent redirect
parsed = urlparse(url)
qs = parse_qs(parsed.query)
- scope_value = qs.get("scope", [""])[0]
- scope_parts = scope_value.split(" ") if scope_value else []
- assert "api://my-api/read" in scope_parts
- assert "Mail.Read" in scope_parts
- assert "User.Read" in scope_parts
+ assert "txn_id" in qs, "Should redirect to consent page with transaction ID"
+ txn_id = qs["txn_id"][0]
+
+ # Verify transaction contains correct scopes (prefixed + unprefixed additional)
+ transaction = await provider._transaction_store.get(key=txn_id)
+ assert transaction is not None
+ assert "api://my-api/read" in transaction.scopes
+ assert "Mail.Read" in transaction.scopes
+ assert "User.Read" in transaction.scopes
diff --git a/tests/server/auth/test_oauth_consent_flow.py b/tests/server/auth/test_oauth_consent_flow.py
new file mode 100644
index 000000000..41eff87ca
--- /dev/null
+++ b/tests/server/auth/test_oauth_consent_flow.py
@@ -0,0 +1,657 @@
+"""Tests for OAuth Proxy consent flow with server-side storage.
+
+This test suite verifies:
+1. OAuth transactions are stored in server-side storage (not in-memory)
+2. Authorization codes are stored in server-side storage
+3. Consent flow redirects correctly through /consent endpoint
+4. CSRF protection works with cookies
+5. State persists across storage backends
+6. Security headers (X-Frame-Options) are set correctly
+7. Cookie signing and tampering detection
+8. Auto-approve behavior with valid cookies
+"""
+
+import re
+import secrets
+import time
+from urllib.parse import parse_qs, urlparse
+
+import pytest
+from key_value.aio.stores.memory import MemoryStore
+from mcp.server.auth.provider import AuthorizationParams
+from mcp.shared.auth import OAuthClientInformationFull
+from pydantic import AnyUrl
+from starlette.applications import Starlette
+from starlette.testclient import TestClient
+
+from fastmcp.server.auth.auth import TokenVerifier
+from fastmcp.server.auth.oauth_proxy import OAuthProxy
+
+
+class MockTokenVerifier(TokenVerifier):
+ """Mock token verifier for testing."""
+
+ def __init__(self):
+ self.required_scopes = ["read", "write"]
+
+ async def verify_token(self, token: str):
+ """Mock token verification."""
+ from fastmcp.server.auth.auth import AccessToken
+
+ return AccessToken(
+ token=token,
+ client_id="mock-client",
+ scopes=self.required_scopes,
+ expires_at=int(time.time() + 3600),
+ )
+
+
+class _Verifier(TokenVerifier):
+ """Minimal token verifier for security tests."""
+
+ def __init__(self):
+ self.required_scopes = ["read"]
+
+ async def verify_token(self, token: str):
+ from fastmcp.server.auth.auth import AccessToken
+
+ return AccessToken(
+ token=token, client_id="c", scopes=self.required_scopes, expires_at=None
+ )
+
+
+@pytest.fixture
+def storage():
+ """Create a fresh in-memory storage for each test."""
+ return MemoryStore()
+
+
+@pytest.fixture
+def oauth_proxy_with_storage(storage):
+ """Create OAuth proxy with explicit storage backend."""
+ return OAuthProxy(
+ upstream_authorization_endpoint="https://github.com/login/oauth/authorize",
+ upstream_token_endpoint="https://github.com/login/oauth/access_token",
+ upstream_client_id="test-upstream-client",
+ upstream_client_secret="test-upstream-secret",
+ token_verifier=MockTokenVerifier(),
+ base_url="https://myserver.com",
+ redirect_path="/auth/callback",
+ client_storage=storage, # Use our test storage
+ )
+
+
+@pytest.fixture
+def oauth_proxy_https():
+ """OAuthProxy configured with HTTPS base_url for __Host- cookies."""
+ return OAuthProxy(
+ upstream_authorization_endpoint="https://github.com/login/oauth/authorize",
+ upstream_token_endpoint="https://github.com/login/oauth/access_token",
+ upstream_client_id="client-id",
+ upstream_client_secret="client-secret",
+ token_verifier=_Verifier(),
+ base_url="https://myserver.example",
+ client_storage=MemoryStore(),
+ )
+
+
+async def _start_flow(
+ proxy: OAuthProxy, client_id: str, redirect: str
+) -> tuple[str, str]:
+ """Register client and start auth; returns (txn_id, consent_url)."""
+ await proxy.register_client(
+ OAuthClientInformationFull(
+ client_id=client_id,
+ client_secret="s",
+ redirect_uris=[AnyUrl(redirect)],
+ )
+ )
+ params = AuthorizationParams(
+ redirect_uri=AnyUrl(redirect),
+ redirect_uri_provided_explicitly=True,
+ state="client-state-xyz",
+ code_challenge="challenge",
+ code_challenge_method="S256",
+ scopes=["read"],
+ )
+ consent_url = await proxy.authorize(
+ OAuthClientInformationFull(
+ client_id=client_id,
+ client_secret="s",
+ redirect_uris=[AnyUrl(redirect)],
+ ),
+ params,
+ )
+ qs = parse_qs(urlparse(consent_url).query)
+ return qs["txn_id"][0], consent_url
+
+
+def _extract_csrf(html: str) -> str | None:
+ """Extract CSRF token from HTML form."""
+ m = re.search(r"name=\"csrf_token\"\s+value=\"([^\"]+)\"", html)
+ return m.group(1) if m else None
+
+
+class TestServerSideStorage:
+ """Tests verifying OAuth state is stored in AsyncKeyValue storage."""
+
+ async def test_transaction_stored_in_storage_not_memory(
+ self, oauth_proxy_with_storage, storage
+ ):
+ """Verify OAuth transactions are stored in AsyncKeyValue, not in-memory dict."""
+ # Register client
+ client = OAuthClientInformationFull(
+ client_id="test-client",
+ client_secret="test-secret",
+ redirect_uris=[AnyUrl("http://localhost:54321/callback")],
+ )
+ await oauth_proxy_with_storage.register_client(client)
+
+ # Start authorization flow
+ params = AuthorizationParams(
+ redirect_uri=AnyUrl("http://localhost:54321/callback"),
+ redirect_uri_provided_explicitly=True,
+ state="client-state-123",
+ code_challenge="challenge-abc",
+ code_challenge_method="S256",
+ scopes=["read", "write"],
+ )
+
+ redirect_url = await oauth_proxy_with_storage.authorize(client, params)
+
+ # Extract transaction ID from consent redirect
+ parsed = urlparse(redirect_url)
+ assert "/consent" in parsed.path, "Should redirect to consent page"
+
+ query_params = parse_qs(parsed.query)
+ txn_id = query_params["txn_id"][0]
+
+ # Verify transaction is NOT in the old in-memory dict
+ # (the attribute should not exist or should be empty)
+ assert (
+ not hasattr(oauth_proxy_with_storage, "_oauth_transactions")
+ or len(getattr(oauth_proxy_with_storage, "_oauth_transactions", {})) == 0
+ )
+
+ # Verify transaction IS in storage backend
+ transaction = await storage.get(collection="mcp-oauth-transactions", key=txn_id)
+ assert transaction is not None, "Transaction should be in storage"
+
+ # Verify transaction has expected structure
+ assert transaction["client_id"] == "test-client"
+ assert transaction["client_redirect_uri"] == "http://localhost:54321/callback"
+ assert transaction["client_state"] == "client-state-123"
+ assert transaction["code_challenge"] == "challenge-abc"
+ assert transaction["scopes"] == ["read", "write"]
+
+ async def test_authorization_code_stored_in_storage(
+ self, oauth_proxy_with_storage, storage
+ ):
+ """Verify authorization codes are stored in AsyncKeyValue storage."""
+ # Register client
+ client = OAuthClientInformationFull(
+ client_id="test-client",
+ client_secret="test-secret",
+ redirect_uris=[AnyUrl("http://localhost:54321/callback")],
+ )
+ await oauth_proxy_with_storage.register_client(client)
+
+ # Create a test app with OAuth routes
+ app = Starlette(routes=oauth_proxy_with_storage.get_routes())
+
+ with TestClient(app) as test_client:
+ # Start authorization flow
+ params = AuthorizationParams(
+ redirect_uri=AnyUrl("http://localhost:54321/callback"),
+ redirect_uri_provided_explicitly=True,
+ state="client-state",
+ code_challenge="challenge-xyz",
+ code_challenge_method="S256",
+ scopes=["read"],
+ )
+
+ redirect_url = await oauth_proxy_with_storage.authorize(client, params)
+
+ # Extract txn_id from consent redirect
+ parsed = urlparse(redirect_url)
+ query_params = parse_qs(parsed.query)
+ txn_id = query_params["txn_id"][0]
+
+ # Simulate consent approval
+ # First, get the consent page to establish CSRF cookie
+ consent_response = test_client.get(
+ f"/consent?txn_id={txn_id}", follow_redirects=False
+ )
+
+ # Extract CSRF token from response (it's in the HTML form)
+ csrf_token = None
+ if consent_response.status_code == 200:
+ # For this test, we'll generate a CSRF token manually
+ # In production, this comes from the consent page HTML
+ csrf_token = secrets.token_urlsafe(32)
+
+ # Approve consent with CSRF token
+ approval_response = test_client.post(
+ "/consent",
+ data={"action": "approve", "txn": txn_id, "csrf_token": csrf_token},
+ cookies=consent_response.cookies,
+ follow_redirects=False,
+ )
+
+ # After approval, authorization code should be in storage
+ # The code is returned in the redirect URL
+ if approval_response.status_code in (302, 303):
+ location = approval_response.headers.get("location", "")
+ callback_params = parse_qs(urlparse(location).query)
+
+ if "code" in callback_params:
+ auth_code = callback_params["code"][0]
+
+ # Verify code is NOT in old in-memory dict
+ assert (
+ not hasattr(oauth_proxy_with_storage, "_client_codes")
+ or len(getattr(oauth_proxy_with_storage, "_client_codes", {}))
+ == 0
+ )
+
+ # Verify code IS in storage
+ code_data = await storage.get(
+ collection="mcp-authorization-codes", key=auth_code
+ )
+ assert code_data is not None, (
+ "Authorization code should be in storage"
+ )
+ assert code_data["client_id"] == "test-client"
+ assert code_data["scopes"] == ["read"]
+
+ async def test_storage_collections_are_isolated(self, oauth_proxy_with_storage):
+ """Verify that transactions, codes, and clients use separate collections."""
+ # Register a client
+ client = OAuthClientInformationFull(
+ client_id="isolation-test-client",
+ client_secret="test-secret",
+ redirect_uris=[AnyUrl("http://localhost:12345/callback")],
+ )
+ await oauth_proxy_with_storage.register_client(client)
+
+ # Start authorization to create transaction
+ params = AuthorizationParams(
+ redirect_uri=AnyUrl("http://localhost:12345/callback"),
+ redirect_uri_provided_explicitly=True,
+ state="test-state",
+ code_challenge="test-challenge",
+ code_challenge_method="S256",
+ scopes=["read"],
+ )
+
+ await oauth_proxy_with_storage.authorize(client, params)
+
+ # Get all collections from storage
+ storage = oauth_proxy_with_storage._client_storage
+
+ # Verify client is in client collection
+ client_data = await storage.get(
+ collection="mcp-oauth-proxy-clients", key="isolation-test-client"
+ )
+ assert client_data is not None
+
+ # Verify we can list transactions separately
+ # (This tests that collections are properly namespaced)
+ transactions = await storage.keys(collection="mcp-oauth-transactions")
+
+ assert len(transactions) > 0, "Should have at least one transaction"
+
+ # Verify transaction keys don't collide with client keys
+ for txn_key in transactions:
+ assert txn_key != "isolation-test-client"
+
+
+class TestConsentFlowRedirects:
+ """Tests for consent flow redirect behavior."""
+
+ async def test_authorize_redirects_to_consent_page(self, oauth_proxy_with_storage):
+ """Verify authorize() redirects to /consent instead of upstream."""
+ client = OAuthClientInformationFull(
+ client_id="consent-test-client",
+ client_secret="test-secret",
+ redirect_uris=[AnyUrl("http://localhost:8080/callback")],
+ )
+ await oauth_proxy_with_storage.register_client(client)
+
+ params = AuthorizationParams(
+ redirect_uri=AnyUrl("http://localhost:8080/callback"),
+ redirect_uri_provided_explicitly=True,
+ state="test-state",
+ code_challenge="",
+ scopes=["read"],
+ )
+
+ redirect_url = await oauth_proxy_with_storage.authorize(client, params)
+
+ # Should redirect to consent page, not upstream
+ assert "/consent" in redirect_url
+ assert "github.com" not in redirect_url
+ assert "?txn_id=" in redirect_url
+
+ async def test_consent_page_contains_transaction_id(self, oauth_proxy_with_storage):
+ """Verify consent page receives and displays transaction ID."""
+ client = OAuthClientInformationFull(
+ client_id="txn-test-client",
+ client_secret="test-secret",
+ redirect_uris=[AnyUrl("http://localhost:9090/callback")],
+ )
+ await oauth_proxy_with_storage.register_client(client)
+
+ params = AuthorizationParams(
+ redirect_uri=AnyUrl("http://localhost:9090/callback"),
+ redirect_uri_provided_explicitly=True,
+ state="test-state",
+ code_challenge="test-challenge",
+ scopes=["read", "write"],
+ )
+
+ redirect_url = await oauth_proxy_with_storage.authorize(client, params)
+
+ # Extract txn_id parameter
+ parsed = urlparse(redirect_url)
+ query = parse_qs(parsed.query)
+
+ assert "txn_id" in query
+ txn_id = query["txn_id"][0]
+ assert len(txn_id) > 0
+
+ # Create test client
+ app = Starlette(routes=oauth_proxy_with_storage.get_routes())
+
+ with TestClient(app) as test_client:
+ # Request consent page
+ response = test_client.get(
+ f"/consent?txn_id={txn_id}", follow_redirects=False
+ )
+
+ assert response.status_code == 200
+ # Consent page should contain transaction reference
+ assert txn_id.encode() in response.content or b"consent" in response.content
+
+
+class TestCSRFProtection:
+ """Tests for CSRF protection in consent flow."""
+
+ async def test_consent_requires_csrf_token(self, oauth_proxy_with_storage):
+ """Verify consent submission requires valid CSRF token."""
+ client = OAuthClientInformationFull(
+ client_id="csrf-test-client",
+ client_secret="test-secret",
+ redirect_uris=[AnyUrl("http://localhost:7070/callback")],
+ )
+ await oauth_proxy_with_storage.register_client(client)
+
+ params = AuthorizationParams(
+ redirect_uri=AnyUrl("http://localhost:7070/callback"),
+ redirect_uri_provided_explicitly=True,
+ state="test-state",
+ code_challenge="",
+ scopes=["read"],
+ )
+
+ redirect_url = await oauth_proxy_with_storage.authorize(client, params)
+ parsed = urlparse(redirect_url)
+ query = parse_qs(parsed.query)
+ txn_id = query["txn_id"][0]
+
+ app = Starlette(routes=oauth_proxy_with_storage.get_routes())
+
+ with TestClient(app) as test_client:
+ # Try to submit consent WITHOUT CSRF token
+ response = test_client.post(
+ "/consent/submit",
+ data={"action": "approve", "txn_id": txn_id},
+ # No CSRF token!
+ follow_redirects=False,
+ )
+
+ # Should reject or require CSRF
+ # (Implementation may vary - checking for error response)
+ assert response.status_code in (
+ 400,
+ 403,
+ 302,
+ ) # Error or redirect to error
+
+ async def test_consent_cookie_established_on_page_visit(
+ self, oauth_proxy_with_storage
+ ):
+ """Verify consent page establishes CSRF cookie."""
+ client = OAuthClientInformationFull(
+ client_id="cookie-test-client",
+ client_secret="test-secret",
+ redirect_uris=[AnyUrl("http://localhost:6060/callback")],
+ )
+ await oauth_proxy_with_storage.register_client(client)
+
+ params = AuthorizationParams(
+ redirect_uri=AnyUrl("http://localhost:6060/callback"),
+ redirect_uri_provided_explicitly=True,
+ state="test-state",
+ code_challenge="",
+ scopes=["read"],
+ )
+
+ redirect_url = await oauth_proxy_with_storage.authorize(client, params)
+ parsed = urlparse(redirect_url)
+ query = parse_qs(parsed.query)
+ txn_id = query["txn_id"][0]
+
+ app = Starlette(routes=oauth_proxy_with_storage.get_routes())
+
+ with TestClient(app) as test_client:
+ # Visit consent page
+ response = test_client.get(
+ f"/consent?txn_id={txn_id}", follow_redirects=False
+ )
+
+ # Should set cookies for CSRF protection
+ assert response.status_code == 200
+ # Cookie may be set via Set-Cookie header
+ cookies = response.cookies
+ # Look for any CSRF-related cookie (implementation dependent)
+ assert len(cookies) > 0 or "csrf" in response.text.lower(), (
+ "Consent page should establish CSRF protection"
+ )
+
+
+class TestStoragePersistence:
+ """Tests for state persistence across storage backends."""
+
+ async def test_transaction_persists_after_retrieval(self, oauth_proxy_with_storage):
+ """Verify transaction can be retrieved multiple times (until deleted)."""
+ client = OAuthClientInformationFull(
+ client_id="persist-test-client",
+ client_secret="test-secret",
+ redirect_uris=[AnyUrl("http://localhost:5050/callback")],
+ )
+ await oauth_proxy_with_storage.register_client(client)
+
+ params = AuthorizationParams(
+ redirect_uri=AnyUrl("http://localhost:5050/callback"),
+ redirect_uri_provided_explicitly=True,
+ state="persist-state",
+ code_challenge="persist-challenge",
+ code_challenge_method="S256",
+ scopes=["read"],
+ )
+
+ redirect_url = await oauth_proxy_with_storage.authorize(client, params)
+ parsed = urlparse(redirect_url)
+ query = parse_qs(parsed.query)
+ txn_id = query["txn_id"][0]
+
+ storage = oauth_proxy_with_storage._client_storage
+
+ # Retrieve transaction multiple times
+ txn1 = await storage.get(collection="mcp-oauth-transactions", key=txn_id)
+ assert txn1 is not None
+
+ txn2 = await storage.get(collection="mcp-oauth-transactions", key=txn_id)
+ assert txn2 is not None
+
+ # Should be the same data
+ assert txn1["client_id"] == txn2["client_id"]
+ assert txn1["client_state"] == txn2["client_state"]
+
+ async def test_storage_uses_pydantic_adapter(self, oauth_proxy_with_storage):
+ """Verify that PydanticAdapter serializes/deserializes correctly."""
+ from fastmcp.server.auth.oauth_proxy import OAuthTransaction
+
+ client = OAuthClientInformationFull(
+ client_id="pydantic-test-client",
+ client_secret="test-secret",
+ redirect_uris=[AnyUrl("http://localhost:4040/callback")],
+ )
+ await oauth_proxy_with_storage.register_client(client)
+
+ params = AuthorizationParams(
+ redirect_uri=AnyUrl("http://localhost:4040/callback"),
+ redirect_uri_provided_explicitly=True,
+ state="pydantic-state",
+ code_challenge="pydantic-challenge",
+ code_challenge_method="S256",
+ scopes=["read", "write"],
+ )
+
+ redirect_url = await oauth_proxy_with_storage.authorize(client, params)
+ parsed = urlparse(redirect_url)
+ query = parse_qs(parsed.query)
+ txn_id = query["txn_id"][0]
+
+ # Retrieve using PydanticAdapter (which is what the proxy uses)
+ transaction_store = oauth_proxy_with_storage._transaction_store
+ txn_model = await transaction_store.get(key=txn_id)
+
+ # Should be a Pydantic model instance
+ assert isinstance(txn_model, OAuthTransaction)
+ assert txn_model.client_id == "pydantic-test-client"
+ assert txn_model.client_state == "pydantic-state"
+ assert txn_model.code_challenge == "pydantic-challenge"
+ assert txn_model.scopes == ["read", "write"]
+
+
+class TestConsentSecurity:
+ """Tests for consent page security features."""
+
+ async def test_consent_sets_xfo_header(self, oauth_proxy_https):
+ """Verify consent page sets X-Frame-Options header to prevent clickjacking."""
+ txn_id, _ = await _start_flow(
+ oauth_proxy_https, "client-a", "http://localhost:5001/callback"
+ )
+ app = Starlette(routes=oauth_proxy_https.get_routes())
+ with TestClient(app) as c:
+ r = c.get(f"/consent?txn_id={txn_id}")
+ assert r.status_code == 200
+ assert r.headers.get("X-Frame-Options") == "DENY"
+
+ async def test_deny_sets_cookie_and_redirects_with_error(self, oauth_proxy_https):
+ """Verify denying consent sets signed cookie and redirects with error."""
+ client_redirect = "http://localhost:5002/callback"
+ txn_id, _ = await _start_flow(oauth_proxy_https, "client-b", client_redirect)
+ app = Starlette(routes=oauth_proxy_https.get_routes())
+ with TestClient(app) as c:
+ consent = c.get(f"/consent?txn_id={txn_id}")
+ csrf = _extract_csrf(consent.text)
+ assert csrf
+ # Persist consent page cookies on client instance to avoid per-request deprecation
+ for k, v in consent.cookies.items():
+ c.cookies.set(k, v)
+ r = c.post(
+ "/consent/submit",
+ data={"action": "deny", "txn_id": txn_id, "csrf_token": csrf},
+ follow_redirects=False,
+ )
+ assert r.status_code in (302, 303)
+ loc = r.headers.get("location", "")
+ parsed = urlparse(loc)
+ assert parsed.scheme == "http" and parsed.netloc.startswith("localhost")
+ q = parse_qs(parsed.query)
+ assert q.get("error") == ["access_denied"]
+ assert q.get("state") == ["client-state-xyz"]
+ # Signed denied cookie should be set
+ assert "MCP_DENIED_CLIENTS" in ";\n".join(
+ r.headers.get("set-cookie", "").splitlines()
+ )
+
+ async def test_approve_sets_cookie_and_redirects_to_upstream(
+ self, oauth_proxy_https
+ ):
+ """Verify approving consent sets signed cookie and redirects to upstream."""
+ txn_id, _ = await _start_flow(
+ oauth_proxy_https, "client-c", "http://localhost:5003/callback"
+ )
+ app = Starlette(routes=oauth_proxy_https.get_routes())
+ with TestClient(app) as c:
+ consent = c.get(f"/consent?txn_id={txn_id}")
+ csrf = _extract_csrf(consent.text)
+ assert csrf
+ for k, v in consent.cookies.items():
+ c.cookies.set(k, v)
+ r = c.post(
+ "/consent/submit",
+ data={"action": "approve", "txn_id": txn_id, "csrf_token": csrf},
+ follow_redirects=False,
+ )
+ assert r.status_code in (302, 303)
+ loc = r.headers.get("location", "")
+ assert loc.startswith("https://github.com/login/oauth/authorize")
+ assert f"state={txn_id}" in loc
+ # Signed approved cookie should be set with __Host- prefix for HTTPS
+ set_cookie = ";\n".join(r.headers.get("set-cookie", "").splitlines())
+ assert "__Host-MCP_APPROVED_CLIENTS" in set_cookie
+
+ async def test_tampered_cookie_is_ignored(self, oauth_proxy_https):
+ """Verify tampered approval cookie is ignored and consent page shown."""
+ txn_id, _ = await _start_flow(
+ oauth_proxy_https, "client-d", "http://localhost:5004/callback"
+ )
+ app = Starlette(routes=oauth_proxy_https.get_routes())
+ with TestClient(app) as c:
+ # Create a tampered cookie (invalid signature)
+ # Value format: payload.signature; using wrong signature to force failure
+ tampered_value = "W10=.invalidsig"
+ c.cookies.set("__Host-MCP_APPROVED_CLIENTS", tampered_value)
+ r = c.get(f"/consent?txn_id={txn_id}", follow_redirects=False)
+ # Should not auto-redirect to upstream; should show consent page
+ assert r.status_code == 200
+ # httpx returns a URL object; compare path or stringify
+ assert urlparse(str(r.request.url)).path == "/consent"
+
+ async def test_autoapprove_cookie_skips_consent(self, oauth_proxy_https):
+ """Verify valid approval cookie auto-approves and redirects to upstream."""
+ client_id = "client-e"
+ redirect = "http://localhost:5005/callback"
+ txn_id, _ = await _start_flow(oauth_proxy_https, client_id, redirect)
+ app = Starlette(routes=oauth_proxy_https.get_routes())
+ with TestClient(app) as c:
+ # Approve once to set approved cookie
+ consent = c.get(f"/consent?txn_id={txn_id}")
+ csrf = _extract_csrf(consent.text)
+ for k, v in consent.cookies.items():
+ c.cookies.set(k, v)
+ r = c.post(
+ "/consent/submit",
+ data={"action": "approve", "txn_id": txn_id, "csrf_token": csrf},
+ follow_redirects=False,
+ )
+ # Extract approved cookie value
+ set_cookie = ";\n".join(r.headers.get("set-cookie", "").splitlines())
+ m = re.search(r"__Host-MCP_APPROVED_CLIENTS=([^;]+)", set_cookie)
+ assert m, "approved cookie should be set"
+ approved_cookie = m.group(1)
+
+ # Start a new flow for the same client and redirect
+ new_txn, _ = await _start_flow(oauth_proxy_https, client_id, redirect)
+ # Should auto-redirect to upstream when visiting consent due to cookie
+ c.cookies.set("__Host-MCP_APPROVED_CLIENTS", approved_cookie)
+ r2 = c.get(f"/consent?txn_id={new_txn}", follow_redirects=False)
+ assert r2.status_code in (302, 303)
+ assert r2.headers.get("location", "").startswith(
+ "https://github.com/login/oauth/authorize"
+ )
diff --git a/tests/server/auth/test_oauth_proxy.py b/tests/server/auth/test_oauth_proxy.py
index c171547eb..13ceb5185 100644
--- a/tests/server/auth/test_oauth_proxy.py
+++ b/tests/server/auth/test_oauth_proxy.py
@@ -441,13 +441,16 @@ class TestOAuthProxyAuthorization:
"""Tests for OAuth proxy authorization flow."""
async def test_authorize_creates_transaction(self, oauth_proxy):
- """Test that authorize creates transaction and returns upstream URL."""
+ """Test that authorize creates transaction and redirects to consent."""
client = OAuthClientInformationFull(
client_id="test-client",
client_secret="test-secret",
redirect_uris=[AnyUrl("http://localhost:54321/callback")],
)
+ # Register client first (required for consent flow)
+ await oauth_proxy.register_client(client)
+
params = AuthorizationParams(
redirect_uri=AnyUrl("http://localhost:54321/callback"),
redirect_uri_provided_explicitly=True,
@@ -463,18 +466,18 @@ async def test_authorize_creates_transaction(self, oauth_proxy):
parsed = urlparse(redirect_url)
query_params = parse_qs(parsed.query)
- # Verify upstream URL structure
- assert "github.com/login/oauth/authorize" in redirect_url
- assert query_params["client_id"][0] == "test-client-id"
- assert query_params["response_type"][0] == "code"
- assert "state" in query_params # Transaction ID
+ # Should redirect to consent page
+ assert "/consent" in redirect_url
+ assert "txn_id" in query_params
- # Verify transaction was stored
- txn_id = query_params["state"][0]
- assert txn_id in oauth_proxy._oauth_transactions
- transaction = oauth_proxy._oauth_transactions[txn_id]
- assert transaction["client_id"] == "test-client"
- assert transaction["code_challenge"] == "challenge-abc"
+ # Verify transaction was stored with correct data
+ txn_id = query_params["txn_id"][0]
+ transaction = await oauth_proxy._transaction_store.get(key=txn_id)
+ assert transaction is not None
+ assert transaction.client_id == "test-client"
+ assert transaction.code_challenge == "challenge-abc"
+ assert transaction.client_state == "client-state-123"
+ assert transaction.scopes == ["read", "write"]
class TestOAuthProxyPKCE:
@@ -512,6 +515,9 @@ async def test_pkce_forwarding_enabled(self, proxy_with_pkce):
redirect_uris=[AnyUrl("http://localhost:12345/callback")],
)
+ # Register client first
+ await proxy_with_pkce.register_client(client)
+
params = AuthorizationParams(
redirect_uri=AnyUrl("http://localhost:12345/callback"),
redirect_uri_provided_explicitly=True,
@@ -523,16 +529,19 @@ async def test_pkce_forwarding_enabled(self, proxy_with_pkce):
redirect_url = await proxy_with_pkce.authorize(client, params)
query_params = parse_qs(urlparse(redirect_url).query)
- # Proxy should forward its own PKCE
- assert "code_challenge" in query_params
- assert query_params["code_challenge"][0] != "client_challenge"
- assert query_params["code_challenge_method"] == ["S256"]
+ # Should redirect to consent page
+ assert "/consent" in redirect_url
+ assert "txn_id" in query_params
# Transaction should store both challenges
- txn_id = query_params["state"][0]
- transaction = proxy_with_pkce._oauth_transactions[txn_id]
- assert transaction["code_challenge"] == "client_challenge" # Client's
- assert "proxy_code_verifier" in transaction # Proxy's verifier
+ txn_id = query_params["txn_id"][0]
+ transaction = await proxy_with_pkce._transaction_store.get(key=txn_id)
+ assert transaction is not None
+ assert transaction.code_challenge == "client_challenge" # Client's
+ assert transaction.proxy_code_verifier is not None # Proxy's verifier
+ # Proxy code challenge is computed from verifier when building upstream URL
+ # Just verify the verifier exists and is different from client's challenge
+ assert len(transaction.proxy_code_verifier) > 0
async def test_pkce_forwarding_disabled(self, proxy_without_pkce):
"""Test that PKCE is not forwarded when disabled."""
@@ -542,6 +551,9 @@ async def test_pkce_forwarding_disabled(self, proxy_without_pkce):
redirect_uris=[AnyUrl("http://localhost:12345/callback")],
)
+ # Register client first
+ await proxy_without_pkce.register_client(client)
+
params = AuthorizationParams(
redirect_uri=AnyUrl("http://localhost:12345/callback"),
redirect_uri_provided_explicitly=True,
@@ -553,15 +565,16 @@ async def test_pkce_forwarding_disabled(self, proxy_without_pkce):
redirect_url = await proxy_without_pkce.authorize(client, params)
query_params = parse_qs(urlparse(redirect_url).query)
- # No PKCE forwarded to upstream
- assert "code_challenge" not in query_params
- assert "code_challenge_method" not in query_params
+ # Should redirect to consent page
+ assert "/consent" in redirect_url
+ assert "txn_id" in query_params
- # Client's challenge still stored
- txn_id = query_params["state"][0]
- transaction = proxy_without_pkce._oauth_transactions[txn_id]
- assert transaction["code_challenge"] == "client_challenge"
- assert "proxy_code_verifier" not in transaction
+ # Client's challenge still stored, but no proxy PKCE
+ txn_id = query_params["txn_id"][0]
+ transaction = await proxy_without_pkce._transaction_store.get(key=txn_id)
+ assert transaction is not None
+ assert transaction.code_challenge == "client_challenge"
+ assert transaction.proxy_code_verifier is None # No proxy PKCE when disabled
class TestOAuthProxyTokenEndpointAuth:
@@ -682,6 +695,9 @@ def protected_tool() -> str:
redirect_uris=[AnyUrl("http://localhost:12345/callback")],
)
+ # Register client first
+ await proxy.register_client(client_info)
+
params = AuthorizationParams(
redirect_uri=AnyUrl("http://localhost:12345/callback"),
redirect_uri_provided_explicitly=True,
@@ -690,28 +706,22 @@ def protected_tool() -> str:
scopes=["read"],
)
- # Get authorization URL
+ # Get authorization URL (now returns consent redirect)
auth_url = await proxy.authorize(client_info, params)
- # Verify mock provider was called
- assert mock_oauth_provider.authorize_endpoint in auth_url
-
- # Verify state is present (transaction ID)
+ # Should redirect to consent page
+ assert "/consent" in auth_url
query_params = parse_qs(urlparse(auth_url).query)
- assert "state" in query_params
-
- # Simulate authorization callback
- async with httpx.AsyncClient() as http_client:
- # This would normally redirect, but our mock returns the code
- response = await http_client.get(auth_url, follow_redirects=False)
+ assert "txn_id" in query_params
- # Extract code from redirect location
- location = response.headers.get("location", "")
- callback_params = parse_qs(urlparse(location).query)
- auth_code = callback_params.get("code", [None])[0]
-
- assert auth_code is not None
- assert mock_oauth_provider.authorize_called
+ # Verify transaction was created with correct configuration
+ txn_id = query_params["txn_id"][0]
+ transaction = await proxy._transaction_store.get(key=txn_id)
+ assert transaction is not None
+ assert transaction.client_id == "test-client"
+ assert transaction.scopes == ["read"]
+ # Transaction ID itself is used as upstream state parameter
+ assert transaction.txn_id == txn_id
@pytest.mark.asyncio
async def test_token_refresh_with_mock_provider(self, mock_oauth_provider):
@@ -790,6 +800,9 @@ async def test_pkce_validation_with_mock_provider(self, mock_oauth_provider):
redirect_uris=[AnyUrl("http://localhost:12345/callback")],
)
+ # Register client first
+ await proxy.register_client(client)
+
params = AuthorizationParams(
redirect_uri=AnyUrl("http://localhost:12345/callback"),
redirect_uri_provided_explicitly=True,
@@ -803,14 +816,20 @@ async def test_pkce_validation_with_mock_provider(self, mock_oauth_provider):
auth_url = await proxy.authorize(client, params)
query_params = parse_qs(urlparse(auth_url).query)
- # Verify PKCE was forwarded (proxy's challenge, not client's)
- assert "code_challenge" in query_params
- assert query_params["code_challenge"][0] != "client_challenge_value"
+ # Should redirect to consent page
+ assert "/consent" in auth_url
+ assert "txn_id" in query_params
- # Transaction should have proxy's verifier
- txn_id = query_params["state"][0]
- transaction = proxy._oauth_transactions[txn_id]
- assert "proxy_code_verifier" in transaction
+ # Transaction should have proxy's PKCE verifier (different from client's)
+ txn_id = query_params["txn_id"][0]
+ transaction = await proxy._transaction_store.get(key=txn_id)
+ assert transaction is not None
+ assert (
+ transaction.code_challenge == "client_challenge_value"
+ ) # Client's challenge
+ assert transaction.proxy_code_verifier is not None # Proxy generated its own
+ # Proxy code challenge is computed from verifier when needed
+ assert len(transaction.proxy_code_verifier) > 0
class TestParameterForwarding:
@@ -850,6 +869,9 @@ async def test_resource_parameter_forwarding(self, proxy_without_extra_params):
redirect_uris=[AnyUrl("http://localhost:12345/callback")],
)
+ # Register client first
+ await proxy_without_extra_params.register_client(client)
+
params = AuthorizationParams(
redirect_uri=AnyUrl("http://localhost:12345/callback"),
redirect_uri_provided_explicitly=True,
@@ -862,9 +884,17 @@ async def test_resource_parameter_forwarding(self, proxy_without_extra_params):
redirect_url = await proxy_without_extra_params.authorize(client, params)
query_params = parse_qs(urlparse(redirect_url).query)
- # Resource parameter should be forwarded to upstream
- assert "resource" in query_params
- assert query_params["resource"][0] == "https://api.example.com/v1"
+ # Should redirect to consent page
+ assert "/consent" in redirect_url
+ assert "txn_id" in query_params
+
+ # Resource parameter should be stored in transaction for upstream forwarding
+ txn_id = query_params["txn_id"][0]
+ transaction = await proxy_without_extra_params._transaction_store.get(
+ key=txn_id
+ )
+ assert transaction is not None
+ assert transaction.resource == "https://api.example.com/v1"
async def test_extra_authorize_params(self, proxy_with_extra_params):
"""Test that extra authorization parameters are included."""
@@ -874,6 +904,9 @@ async def test_extra_authorize_params(self, proxy_with_extra_params):
redirect_uris=[AnyUrl("http://localhost:12345/callback")],
)
+ # Register client first
+ await proxy_with_extra_params.register_client(client)
+
params = AuthorizationParams(
redirect_uri=AnyUrl("http://localhost:12345/callback"),
redirect_uri_provided_explicitly=True,
@@ -885,9 +918,19 @@ async def test_extra_authorize_params(self, proxy_with_extra_params):
redirect_url = await proxy_with_extra_params.authorize(client, params)
query_params = parse_qs(urlparse(redirect_url).query)
- # Extra audience parameter should be included
- assert "audience" in query_params
- assert query_params["audience"][0] == "https://api.example.com"
+ # Should redirect to consent page
+ assert "/consent" in redirect_url
+ assert "txn_id" in query_params
+
+ # Extra audience parameter is configured at proxy level (not per-transaction)
+ txn_id = query_params["txn_id"][0]
+ transaction = await proxy_with_extra_params._transaction_store.get(key=txn_id)
+ assert transaction is not None
+ # Verify proxy has extra params configured
+ assert (
+ proxy_with_extra_params._extra_authorize_params.get("audience")
+ == "https://api.example.com"
+ )
async def test_resource_and_extra_params_together(self, proxy_with_extra_params):
"""Test that both resource and extra params can be used together."""
@@ -897,6 +940,9 @@ async def test_resource_and_extra_params_together(self, proxy_with_extra_params)
redirect_uris=[AnyUrl("http://localhost:12345/callback")],
)
+ # Register client first
+ await proxy_with_extra_params.register_client(client)
+
params = AuthorizationParams(
redirect_uri=AnyUrl("http://localhost:12345/callback"),
redirect_uri_provided_explicitly=True,
@@ -909,11 +955,19 @@ async def test_resource_and_extra_params_together(self, proxy_with_extra_params)
redirect_url = await proxy_with_extra_params.authorize(client, params)
query_params = parse_qs(urlparse(redirect_url).query)
- # Both resource and audience should be present
- assert "resource" in query_params
- assert query_params["resource"][0] == "https://resource.example.com"
- assert "audience" in query_params
- assert query_params["audience"][0] == "https://api.example.com"
+ # Should redirect to consent page
+ assert "/consent" in redirect_url
+ assert "txn_id" in query_params
+
+ # Resource stored in transaction, extra params configured at proxy level
+ txn_id = query_params["txn_id"][0]
+ transaction = await proxy_with_extra_params._transaction_store.get(key=txn_id)
+ assert transaction is not None
+ assert transaction.resource == "https://resource.example.com"
+ assert (
+ proxy_with_extra_params._extra_authorize_params.get("audience")
+ == "https://api.example.com"
+ )
async def test_no_extra_params_when_not_configured(
self, proxy_without_extra_params
@@ -964,6 +1018,9 @@ async def test_multiple_extra_params(self, jwt_verifier):
redirect_uris=[AnyUrl("http://localhost:12345/callback")],
)
+ # Register client first
+ await proxy.register_client(client)
+
params = AuthorizationParams(
redirect_uri=AnyUrl("http://localhost:12345/callback"),
redirect_uri_provided_explicitly=True,
@@ -975,10 +1032,20 @@ async def test_multiple_extra_params(self, jwt_verifier):
redirect_url = await proxy.authorize(client, params)
query_params = parse_qs(urlparse(redirect_url).query)
- # All extra parameters should be included
- assert query_params["audience"][0] == "https://api.example.com"
- assert query_params["prompt"][0] == "consent"
- assert query_params["max_age"][0] == "3600"
+ # Should redirect to consent page
+ assert "/consent" in redirect_url
+ assert "txn_id" in query_params
+
+ # All extra parameters configured at proxy level
+ txn_id = query_params["txn_id"][0]
+ transaction = await proxy._transaction_store.get(key=txn_id)
+ assert transaction is not None
+ # Verify proxy has all extra params configured
+ assert (
+ proxy._extra_authorize_params.get("audience") == "https://api.example.com"
+ )
+ assert proxy._extra_authorize_params.get("prompt") == "consent"
+ assert proxy._extra_authorize_params.get("max_age") == "3600"
@pytest.mark.asyncio
async def test_token_endpoint_invalid_client_error(self, jwt_verifier):