Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Implements a LiteLLM guardrail plugin that integrates with Rubrik's security service. Supports both OpenAI and Anthropic formats in streaming and non-streaming modes with fail-open error handling. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
fa8f57c to
36e738f
Compare
Greptile SummaryThis PR adds the Rubrik LiteLLM plugin ( Key design points:
Issues found:
Confidence Score: 4/5
|
| Filename | Overview |
|---|---|
| litellm/integrations/rubrik.py | Core plugin implementation (1139 lines). Well-structured with fail-open behavior throughout; all issues from previous review (#22935) addressed. Two new minor issues: _detect_llm_response_format is unguarded against None proxy_server_request in the streaming hook (unlike the non-streaming hook), and buffered_choice becomes a stale reference after deep copy in _replay_filtered_tool_chunks. |
| litellm/proxy/guardrails/guardrail_hooks/rubrik/init.py | Guardrail hook initializer; registers RubrikLogger in guardrail_initializer_registry and guardrail_class_registry. Minimal and correct. |
| litellm/types/guardrails.py | Adds RUBRIK = "rubrik" to SupportedGuardrailIntegrations enum. Single-line, correct change. |
| tests/test_litellm/integrations/test_rubrik_openai.py | 28 tests covering initialization, URL/sampling config, batch logging, OpenAI non-streaming blocking, streaming blocking (all/partial/GPT-5 style), and fail-open recovery. All network calls are mocked; real ModelResponse fixtures used for the model_validate path. No real network calls detected. |
| tests/test_litellm/integrations/test_rubrik_anthropic.py | 14 tests for Anthropic non-streaming and streaming tool blocking, format detection, and pass-through for text-only responses. All mocked; SSE fixture files exercised via real file reads. |
Sequence Diagram
sequenceDiagram
participant Client
participant LiteLLM Proxy
participant RubrikLogger
participant LLM API
participant Rubrik Blocking Service
participant Rubrik Logging Service
Client->>LiteLLM Proxy: POST /v1/chat/completions (or /v1/messages)
LiteLLM Proxy->>LLM API: Forward request
LLM API-->>LiteLLM Proxy: Response (streaming or non-streaming)
alt Non-streaming (async_post_call_success_hook)
LiteLLM Proxy->>RubrikLogger: async_post_call_success_hook(response)
RubrikLogger->>RubrikLogger: _detect_llm_response_format()
alt OpenAI format with tool_calls
RubrikLogger->>Rubrik Blocking Service: POST /v1/after_completion/openai/v1
Rubrik Blocking Service-->>RubrikLogger: Filtered response (allowed tools only)
RubrikLogger->>RubrikLogger: ModelResponse.model_validate(modified_dict)
else Anthropic format with tool_use blocks
RubrikLogger->>RubrikLogger: _anthropic_response_to_openai_dict()
RubrikLogger->>Rubrik Blocking Service: POST /v1/after_completion/openai/v1
Rubrik Blocking Service-->>RubrikLogger: Filtered OpenAI dict
RubrikLogger->>RubrikLogger: _openai_dict_to_anthropic_response()
else Text-only or unknown format
RubrikLogger-->>LiteLLM Proxy: Return original response unchanged
end
RubrikLogger-->>LiteLLM Proxy: Modified response
else Streaming (async_post_call_streaming_iterator_hook)
LiteLLM Proxy->>RubrikLogger: async_post_call_streaming_iterator_hook(stream)
loop For each chunk in stream
alt Pre-tool chunks (no tool_calls yet)
RubrikLogger->>Client: Yield chunk immediately
else Tool-call chunk
RubrikLogger->>RubrikLogger: Buffer + accumulate tool call deltas
else Finish chunk (finish_reason set)
RubrikLogger->>RubrikLogger: Append to buffer
RubrikLogger->>Rubrik Blocking Service: POST /v1/after_completion/openai/v1
Rubrik Blocking Service-->>RubrikLogger: Allowed tool list + explanation
alt All tools allowed
RubrikLogger->>Client: Replay buffered chunks unchanged
else Some/all tools blocked
RubrikLogger->>Client: Replay filtered chunks + explanation + finish
end
end
end
end
Note over RubrikLogger,Rubrik Logging Service: Async batch logging (sampled)
RubrikLogger->>RubrikLogger: async_log_success_event → log_queue
RubrikLogger->>Rubrik Logging Service: POST /v1/litellm/batch (when queue full or periodic flush)
Comments Outside Diff (2)
-
litellm/integrations/rubrik.py, line 360 (link)Unguarded
AttributeErrorin streaming hook ifproxy_server_requestisNone_detect_llm_response_formatperforms:proxy_request = request_data.get("proxy_server_request", {}) url = proxy_request.get("url", "")
dict.get(key, default)only returns thedefaultwhen the key is absent. Ifrequest_datais{"proxy_server_request": None}(key present, valueNone),proxy_requestisNoneand the chained.get("url", "")raisesAttributeError.In
async_post_call_success_hookthis is harmless — the outertry/except Exceptioncatches it and returns the original response unchanged (fail-open). However,async_post_call_streaming_iterator_hookcalls_detect_llm_response_formatwith no surrounding exception handler, so theAttributeErrorwould propagate out of the generator and abort the streaming response entirely rather than failing open.A minimal fix is to guard against
None:Or wrap the call in the streaming hook:
try: response_format = self._detect_llm_response_format(request_data) except Exception: verbose_logger.warning("Could not detect LLM response format — passing stream through") async for chunk in response: yield chunk return
-
litellm/integrations/rubrik.py, line 499-530 (link)Stale
buffered_choicereference after deep copybuffered_choiceis assigned once at line 500 from the originalbuffered_chunk.choices[0]. When the code takes the GPT-5 all-blocked path at lines 513-514:buffered_chunk = buffered_chunk.model_copy(deep=True) buffered_chunk.choices[0].delta.tool_calls = None
buffered_chunkis now a new object, butbuffered_choicestill refers to the oldchoices[0]. The subsequent check at line 528 readsbuffered_choice.finish_reasonfrom the stale reference.In practice this produces the correct result today — a deep copy preserves
finish_reasonunchanged, so both the stale and fresh references agree — but it creates a maintenance hazard. If a future change setsfinish_reasonon the deep copy before line 528, the stale reference would silently disagree.Consider refreshing the alias after the copy, or using
buffered_chunk.choices[0].finish_reasonconsistently throughout the function:if not filtered_calls: if not buffered_choice.finish_reason: continue buffered_chunk = buffered_chunk.model_copy(deep=True) buffered_choice = buffered_chunk.choices[0] # refresh after copy buffered_choice.delta.tool_calls = None
Last reviewed commit: "Apply Black formatti..."
…remove alias - Add early-return guard for Anthropic text-only responses (skip blocking service) - Remove dead _send_batch method (parent calls async_send_batch directly) - Remove unnecessary _REPLACED_TYPES local alias Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| ) | ||
|
|
||
| self.log_queue = [] | ||
| asyncio.create_task(self.periodic_flush()) |
There was a problem hiding this comment.
asyncio.create_task in synchronous __init__ requires a running event loop
asyncio.create_task(self.periodic_flush()) is called inside __init__, but __init__ is a synchronous method. In Python 3.10+, asyncio.create_task() raises RuntimeError: no running event loop if there is no currently-running event loop.
The initialize_guardrail function in litellm/proxy/guardrails/guardrail_hooks/rubrik/__init__.py is a plain synchronous function — it has no async def signature. Depending on when LiteLLM calls it during the proxy startup lifecycle, there may be no running event loop at that point. The tests work around this by patching asyncio.create_task with a Mock(), which confirms the fragility.
A safer pattern used by other LiteLLM integrations is to start the periodic flush lazily on first use (e.g., from inside an async callback), or to ensure construction only happens inside an already-running event loop (e.g., via an async factory method or a __post_init__ async hook).
# Instead of calling in __init__:
asyncio.create_task(self.periodic_flush())
# Consider lazily initialising in the first async call, e.g.:
async def _ensure_flush_task_started(self):
if not hasattr(self, "_flush_task") or self._flush_task.done():
self._flush_task = asyncio.create_task(self.periodic_flush())There was a problem hiding this comment.
This is the established pattern across LiteLLM integrations — at least 15 other CustomBatchLogger subclasses call asyncio.create_task(self.periodic_flush()) in __init__ the same way (langsmith, datadog, s3_v2, gcs_bucket, slack_alerting, sqs, argilla, opik, azure_storage, azure_sentinel, posthog, generic_api_callback, etc.).
In practice, the proxy constructs these loggers during startup after the event loop is already running (FastAPI's lifespan), so the create_task call succeeds. Diverging from the established pattern for this one integration would be inconsistent without a broader refactor of all batch loggers.
| Strategy: | ||
| 1. Pass through chunks until the first tool call appears | ||
| 2. Buffer all subsequent chunks once a tool call is detected | ||
| 3. On message_delta (stream end), validate tools with blocking service | ||
| 4. Emit buffered chunks excluding blocked tools (with sequential reindexing) | ||
| 5. Append explanation text block if any tools were blocked | ||
| 6. Emit final message_delta and message_stop | ||
|
|
||
| Returns: | ||
| SSE-encoded bytes ready to be sent to the client. | ||
| """ | ||
| parsed_stream = self._parse_anthropic_sse_stream(response) | ||
| accumulated_tools: dict[str, AnthropicToolCallData] = {} | ||
| index_to_tool: dict[int, AnthropicToolCallData] = {} | ||
| buffered_chunks: list[dict[str, Any]] = [] | ||
| is_buffering = False | ||
| replay_index_base = -1 | ||
|
|
||
| async for chunk in parsed_stream: | ||
| event_type = chunk.get("type") | ||
| is_tool_chunk = self._is_tool_related_anthropic_chunk(chunk) | ||
|
|
||
| if not is_buffering and not is_tool_chunk and event_type in _CONTENT_BLOCK_EVENTS: | ||
| replay_index_base = chunk.get("index", 0) | ||
|
|
||
| if is_tool_chunk: | ||
| is_buffering = True | ||
| self._accumulate_anthropic_tool_call(chunk, accumulated_tools, index_to_tool) | ||
|
|
||
| # Pass through non-tool chunks before any tools appear | ||
| if not is_buffering: | ||
| yield self._encode_anthropic_chunk_to_sse(chunk) | ||
| continue | ||
|
|
||
| buffered_chunks.append(chunk) | ||
|
|
||
| # Continue buffering until stream ends | ||
| if event_type != _EVENT_MESSAGE_DELTA: | ||
| continue | ||
|
|
||
| # Stream complete - validate tools and emit filtered results | ||
| blocked_indices, explanation = await self._get_blocked_anthropic_tool_calls(accumulated_tools) | ||
|
|
||
| # Emit allowed chunks with sequential content block reindexing | ||
| for buffered_chunk in buffered_chunks: | ||
| if not self._should_yield_anthropic_chunk(buffered_chunk, blocked_indices): | ||
| continue | ||
|
|
||
| if buffered_chunk.get("type") in _CONTENT_BLOCK_EVENTS: | ||
| if buffered_chunk.get("type") == _EVENT_CONTENT_BLOCK_START: | ||
| replay_index_base += 1 | ||
| buffered_chunk["index"] = replay_index_base | ||
| yield self._encode_anthropic_chunk_to_sse(buffered_chunk) | ||
|
|
||
| # Add explanation text block if any tools were blocked | ||
| if explanation: | ||
| for explanation_chunk in self._generate_anthropic_text_block(explanation, replay_index_base + 1): | ||
| yield self._encode_anthropic_chunk_to_sse(explanation_chunk) | ||
|
|
||
| # Adjust stop_reason if all tools were blocked | ||
| if accumulated_tools and len(blocked_indices) == len(accumulated_tools): | ||
| chunk["delta"]["stop_reason"] = "end_turn" | ||
| yield self._encode_anthropic_chunk_to_sse(chunk) | ||
|
|
||
| # Successfully processed — clear buffer so fail-open guard doesn't re-emit | ||
| buffered_chunks.clear() | ||
| is_buffering = False | ||
|
|
||
| # Drain remaining stream events (e.g., message_stop) | ||
| async for remaining_chunk in parsed_stream: | ||
| yield self._encode_anthropic_chunk_to_sse(remaining_chunk) | ||
|
|
||
| # Post-loop fail-open: if we were still buffering when the stream ended | ||
| # (e.g., no message_delta was received), emit buffered non-terminal chunks as-is | ||
| if is_buffering and buffered_chunks: | ||
| verbose_logger.warning("Anthropic stream ended while still buffering — emitting buffered chunks (fail-open)") | ||
| for buffered_chunk in buffered_chunks: | ||
| event_type = buffered_chunk.get("type") | ||
| if event_type not in _TERMINAL_MESSAGE_EVENTS: | ||
| yield self._encode_anthropic_chunk_to_sse(buffered_chunk) | ||
|
|
||
| @staticmethod | ||
| def _accumulate_openai_tool_calls( | ||
| choice: StreamingChoices, |
There was a problem hiding this comment.
Provider-specific Anthropic SSE code lives outside
llms/ directory
The custom rule for this repository says to avoid writing provider-specific code outside the llms/ directory. _handle_anthropic_streaming, _parse_anthropic_sse_stream, _encode_anthropic_chunk_to_sse, _decode_all_anthropic_sse_events, _is_tool_related_anthropic_chunk, _accumulate_anthropic_tool_call, and the related constants (_EVENT_CONTENT_BLOCK_START, etc.) all contain Anthropic-specific SSE protocol handling directly in litellm/integrations/rubrik.py. While the plugin does delegate the format conversion work to helpers imported from litellm.llms.anthropic.*, the streaming state machine and raw SSE encode/decode logic is self-contained here.
Ideally the SSE parsing/encoding helpers should live under litellm/llms/anthropic/ and be imported here, keeping the integration file focused on orchestration rather than protocol implementation.
Rule Used: What: Avoid writing provider-specific code outside... (source)
There was a problem hiding this comment.
This was raised on the previous PR (#22935) as well. The Anthropic SSE handling here is specific to the plugin's buffering/replay architecture for tool blocking — it's not general-purpose Anthropic SSE parsing that other code would reuse. The llms/anthropic/ directory contains the canonical format transformation utilities (which we import and use), while the streaming state machine here is tightly coupled to the tool-blocking orchestration logic. Extracting it would create a module in llms/anthropic/ that has exactly one consumer.
| # Create a dedicated httpx client for tool blocking to avoid connection pooling issues | ||
| # with LiteLLM's shared client | ||
| self.tool_blocking_client = httpx.AsyncClient( | ||
| timeout=httpx.Timeout(5.0, connect=2.0), # 2s connect timeout, 5s total timeout | ||
| limits=httpx.Limits(max_connections=10, max_keepalive_connections=5), | ||
| ) |
There was a problem hiding this comment.
tool_blocking_client has no registered shutdown/cleanup hook
A bare httpx.AsyncClient is created and stored at self.tool_blocking_client, and an aclose() method is defined (line 166-168). However, aclose() is never wired up to any lifecycle manager, shutdown handler, or atexit hook. If the plugin is swapped out (e.g., during a config reload) or the proxy exits unexpectedly, the underlying TCP connection pool will be leaked.
Other LiteLLM integrations that create dedicated httpx clients typically register cleanup in a proxy shutdown event or expose the client through a context manager. Consider registering aclose() with FastAPI's shutdown lifecycle or using LiteLLM's shared client via get_async_httpx_client (which handles pooling and cleanup centrally) for the tool-blocking calls as well.
There was a problem hiding this comment.
This was also raised on #22935. We added the `aclose()` method (line 167) for this purpose. We use a dedicated client rather than `get_async_httpx_client` because the tool-blocking service is on the critical path with different timeout requirements (2s connect, 5s total) than the shared logging client.
Wiring `aclose()` into FastAPI's shutdown lifecycle would require changes to the proxy's guardrail management layer, which is outside the scope of this plugin PR. The connection pool will be cleaned up by the OS on process exit, and in the config-reload case, the guardrail manager would need to call `aclose()` on the old instance — that's a framework-level concern rather than something this plugin should self-manage.
Adds CustomGuardrail as a parent class (multiple inheritance with CustomBatchLogger) so RubrikLogger has get_config_model() and other guardrail infrastructure methods required by the guardrail registry. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| def _decode_all_anthropic_sse_events(raw_chunk: bytes) -> list[dict[str, Any]]: | ||
| """ | ||
| Decode all Anthropic SSE events from a raw chunk. | ||
|
|
||
| A single raw chunk may contain multiple SSE events separated by blank lines. | ||
| """ | ||
| data_prefix = "data:" | ||
| events: list[dict[str, Any]] = [] | ||
|
|
||
| for line in raw_chunk.decode("utf-8").split("\n"): | ||
| stripped_line = line.strip() | ||
| if stripped_line.startswith(data_prefix): | ||
| json_payload = stripped_line[len(data_prefix) :].strip() | ||
| events.append(json.loads(json_payload)) | ||
|
|
||
| return events |
There was a problem hiding this comment.
SSE
event: line is discarded during decoding, making encode/decode asymmetric
_decode_all_anthropic_sse_events silently drops the event: <type> line from each SSE event and relies exclusively on the "type" field inside the data JSON to carry the event type. While this works for the current Anthropic SSE format (the type is always present in the payload), the encode path in _encode_anthropic_chunk_to_sse re-attaches an event: line derived from the same "type" field, making the pipeline asymmetric with the upstream bytes.
A more robust decoder would parse the event: line alongside data:, letting the SSE envelope be the authoritative source of the event type (as the spec intends) and validating it matches the JSON payload:
current_event_type: str | None = None
for line in raw_chunk.decode("utf-8").split("\n"):
stripped = line.strip()
if stripped.startswith("event:"):
current_event_type = stripped[len("event:"):].strip()
elif stripped.startswith("data:"):
json_payload = stripped[len("data:"):].strip()
event = json.loads(json_payload)
if current_event_type and "type" not in event:
event["type"] = current_event_type
events.append(event)
current_event_type = NoneThere was a problem hiding this comment.
The asymmetry is intentional. Anthropic's API contract guarantees that every SSE data payload includes a type field — it's how their SDK identifies event types too (see the Anthropic streaming docs). The type field in the JSON is the authoritative source, not the SSE event: envelope line.
Parsing the event: line would add complexity to handle a scenario that doesn't exist with the Anthropic API. The encode side re-derives event: from type to produce well-formed SSE, which is correct — the client receives properly formatted SSE with matching event: and data.type values.
We do sanitize the type field during encoding (stripping \r/\n) as a defense-in-depth measure, which was a fix from the previous PR review.
|
@greptile review |
…ix comment - Remove redundant self.log_queue assignment (parent already initializes) - Add explicit return after Anthropic stream drain loop - Fix misleading comment about httpx error behavior Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…uard, dedup system msg - Use model_validate instead of setattr to preserve Pydantic types - Convert Anthropic usage (input_tokens/output_tokens) to OpenAI format - Add per-event JSONDecodeError handling in SSE decoder - Skip system message insertion if one already exists Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…fer layer The outer buffering_generator/chunks_forwarded mechanism had a bug: chunks_forwarded counted handler yields while buffered_chunks tracked input consumption — different numbers, causing incorrect slicing in the fail-open path. Each handler now owns its own try/except around the blocking service call and emits buffered chunks directly on failure, eliminating the dual-buffer complexity entirely. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…st helper Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…nthropic round-trip Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rgument integrity Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… during reindex Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
# Conflicts: # litellm/types/guardrails.py
…ue immutability Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Please use the 'apply_guardrail' function instead of async_post_call_success_hook
this will ensure it works across all endpoints, where we normalize the inputs
There was a problem hiding this comment.
Thanks for the review. I'll update the implementation. When I looked before, apply_guardrail didn't fit our use case (e.g. the docs here which show that guardrails can only use text, not tool calls which we need). The current version in the codebase is perfect for our usecase though.
There was a problem hiding this comment.
@krrishdholakia ok so i tried updating this plugin to use apply_guardrails and ran into the following limitation. It looks like the guardrail code only allows two approaches to tool moderation: modifying the tool call (e.g. redacting PII from arguments), or throwing an exception to abort the response entirely.
Neither of these fit our intent: we want to remove tool calls entirely from the response while not making it an error. We add some additional text to the response stating that the tool call has been removed to allow agents to respond to the block.
So it appears like we don't fit LiteLLM's intended model here. I'll think about this on our side, we may be able to adapt our implementation to account for this different model but it would take some doing. In the meantime, do you have any other suggestions (other than reverting to my bespoke async_post_call_success_hook strategy)?
There was a problem hiding this comment.
I posted #24262 that adds the guardrail functionality we need (ability to delete tool calls from responses) - iterating with greptile now.
Adds the Rubrik LiteLLM plugin (litellm/integrations/rubrik.py) — a callback logger that provides two capabilities:
Tool call blocking: Intercepts LLM responses (both streaming and non-streaming), validates tool calls against a Rubrik blocking service, and filters disallowed tools from the response before it reaches the client.
Request/response logging: Batches and logs LLM interactions to a Rubrik backend with configurable sampling.
The plugin supports both OpenAI and Anthropic response formats, handles streaming correctly (including GPT-5 style finish chunks), and uses a fail-open design — if the blocking service is
unavailable, responses are returned unchanged.
This plugin has been running in production at Rubrik for some time and is well tested against real-world traffic patterns.
Configuration
Environment variables:
RUBRIK_WEBHOOK_URL (required) — Base URL for the Rubrik service
RUBRIK_API_KEY (optional) — Bearer token for authentication
RUBRIK_BATCH_SIZE (optional) — Batch size for logging (default 512)
RUBRIK_SAMPLING_RATE (optional) — Fraction of requests to log, e.g. 0.5 (default 1.0)
Tests
Includes 42 unit tests across two test files with a comprehensive test framework:
test_rubrik_openai.py (28 tests) — Initialization, URL stripping, sampling rate parsing, non-streaming tool blocking (allow/block/partial), streaming tool blocking with accumulated deltas,
GPT-5 duplicate-args regression, fail-open error recovery, real SSE fixture data
test_rubrik_anthropic.py (14 tests) — Anthropic non-streaming tool blocking (allow/block/partial/text-only), OpenAI↔Anthropic format round-trip with and without usage key, streaming with tool
blocking/allowing/text-only pass-through, format detection, real SSE fixture data
Test fixtures in rubrik_test_sample_data/ include captured OpenAI and Anthropic responses (JSON and SSE streams) for realistic end-to-end testing.
Files
litellm/integrations/rubrik.py — Plugin implementation
tests/test_litellm/integrations/test_rubrik_openai.py — OpenAI format tests
tests/test_litellm/integrations/test_rubrik_anthropic.py — Anthropic format tests
tests/test_litellm/integrations/rubrik_test_sample_data/ — 13 test fixture files
Note that this is the second PR introducing this plugin. The previous one, #22935, was inadvertently based off an old version of the plugin. This PR incorporates all of the actionable feedback that greptile gave on that PR.
Relevant issues
Pre-Submission checklist
Please complete all items before asking a LiteLLM maintainer to review your PR
tests/test_litellm/directory, Adding at least 1 test is a hard requirement - see detailsmake test-unit@greptileaiand received a Confidence Score of at least 4/5 before requesting a maintainer reviewDelays in PR merge?
If you're seeing a delay in your PR being merged, ping the LiteLLM Team on Slack (#pr-review).
CI (LiteLLM team)
Branch creation CI run
Link:
CI run for the last commit
Link:
Merge / cherry-pick CI run
Links:
Type
🆕 New Feature
🐛 Bug Fix
🧹 Refactoring
📖 Documentation
🚄 Infrastructure
✅ Test
Changes