Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Deduplicate prompt-injection findings for identical prompt content across multiple source locations.

## [0.1.4] - 2026-06-12

### Security
Expand Down
90 changes: 90 additions & 0 deletions src/mcts/analyzers/prompt_dedupe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Remove duplicate prompt findings reported for identical prompt content."""

from __future__ import annotations

from typing import Any

from mcts.reporting.models import Finding

_PROMPT_SURFACES = frozenset({"prompt", "instruction"})


def dedupe_prompt_findings(findings: list[Finding]) -> list[Finding]:
"""Merge prompt-injection findings that point at the same prompt text."""
if not any(_dedupe_key(finding) for finding in findings):
return findings

kept: list[Finding] = []
index_by_key: dict[tuple[str, str, str, str, str], int] = {}
for finding in findings:
key = _dedupe_key(finding)
if key is None:
kept.append(finding)
continue
existing_index = index_by_key.get(key)
if existing_index is None:
index_by_key[key] = len(kept)
kept.append(finding)
continue
kept[existing_index] = _merge_locations(kept[existing_index], finding)
return kept


def _dedupe_key(finding: Finding) -> tuple[str, str, str, str, str] | None:
if finding.analyzer != "prompt_injection":
return None
evidence = finding.evidence or {}
if evidence.get("surface") not in _PROMPT_SURFACES:
return None
if not finding.location or not finding.location.file:
return None
content_hash = str(evidence.get("content_hash") or "")
if not content_hash:
return None
finding_type = str(evidence.get("type") or "-".join(finding.id.split("-", 2)[:2]))
field = str(evidence.get("field") or "")
return finding.analyzer, finding_type, field, content_hash, finding.severity.value


def _merge_locations(existing: Finding, duplicate: Finding) -> Finding:
evidence = dict(existing.evidence or {})
locations = _unique_locations(
[
_location_row(existing),
*(evidence.get("also_found_in") or []),
_location_row(duplicate),
]
)
if len(locations) > 1:
evidence["also_found_in"] = locations
return existing.model_copy(update={"evidence": evidence})


def _location_row(finding: Finding) -> dict[str, Any] | None:
if not finding.location or not finding.location.file:
return None
row: dict[str, Any] = {"file": finding.location.file}
if finding.location.line is not None:
row["line"] = finding.location.line
return row


def _unique_locations(rows: list[Any]) -> list[dict[str, Any]]:
seen: set[tuple[str, int | None]] = set()
unique: list[dict[str, Any]] = []
for row in rows:
if not isinstance(row, dict):
continue
file = str(row.get("file") or "")
if not file:
continue
line = row.get("line")
key = (file, line if isinstance(line, int) else None)
if key in seen:
continue
seen.add(key)
item: dict[str, Any] = {"file": file}
if key[1] is not None:
item["line"] = key[1]
unique.append(item)
return unique
26 changes: 23 additions & 3 deletions src/mcts/analyzers/prompt_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import hashlib
import re

from mcts.analyzers.base import BaseAnalyzer
Expand All @@ -12,7 +13,7 @@
surface_text_fields,
tool_for_surface,
)
from mcts.analyzers.surfaces import ScanSurface
from mcts.analyzers.surfaces import ScanSurface, ScanSurfaceKind
from mcts.analyzers.tpa_patterns import (
find_homoglyphs,
has_ansi_smuggling,
Expand Down Expand Up @@ -47,9 +48,12 @@ def _analyze_surface(self, server: MCPServerInfo, surface: ScanSurface) -> list[

intentional_context = is_intentional_context_surface(surface)
for field, text in surface_text_fields(surface):
findings.extend(self._unicode_findings(surface, text, field, loc, tool_name))
field_findings = self._unicode_findings(surface, text, field, loc, tool_name)
if field == "description" and not intentional_context:
findings.extend(self._description_only_findings(surface, text, loc, tool, tool_name))
field_findings.extend(self._description_only_findings(surface, text, loc, tool, tool_name))
if surface.kind in {ScanSurfaceKind.PROMPT, ScanSurfaceKind.INSTRUCTION}:
field_findings = _with_content_hash(field_findings, text)
findings.extend(field_findings)

return findings

Expand Down Expand Up @@ -215,3 +219,19 @@ def _description_handler_mismatch(self, tool: MCPTool) -> bool:
w in snippet for w in ("subprocess", "os.system", "eval", "delete", "shell=true")
)
return claims_safe and handler_dangerous


def _with_content_hash(findings: list[Finding], text: str) -> list[Finding]:
if not findings:
return []
content_hash = hashlib.sha256(_normalize_text(text).encode("utf-8")).hexdigest()
rows: list[Finding] = []
for finding in findings:
evidence = dict(finding.evidence)
evidence["content_hash"] = content_hash
rows.append(finding.model_copy(update={"evidence": evidence}))
return rows


def _normalize_text(text: str) -> str:
return text.replace("\r\n", "\n").replace("\r", "\n").strip()
2 changes: 2 additions & 0 deletions src/mcts/core/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from mcts.analyzers.oauth_config import OAuthConfigAnalyzer
from mcts.analyzers.path_validation import PathValidationAnalyzer
from mcts.analyzers.permissions import PermissionAnalyzer
from mcts.analyzers.prompt_dedupe import dedupe_prompt_findings
from mcts.analyzers.prompt_defense import PromptDefenseAnalyzer
from mcts.analyzers.prompt_injection import PromptInjectionAnalyzer
from mcts.analyzers.runtime_events import RuntimeEventsAnalyzer
Expand Down Expand Up @@ -205,6 +206,7 @@ def analyze_server(self, server_info: MCPServerInfo) -> ScanReport:

findings = self._apply_filters(findings)
findings = dedupe_metadata_findings(findings)
findings = dedupe_prompt_findings(findings)
findings = dedupe_sigma_findings(findings)
findings = enrich_findings(findings)
findings.extend(self.compliance.check(findings, tools_discovered=len(server_info.tools)))
Expand Down
81 changes: 81 additions & 0 deletions tests/test_prompt_dedupe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Prompt finding dedupe tests."""

from __future__ import annotations

from pathlib import Path

from mcts.core.config import ScanConfig
from mcts.core.scanner import Scanner
from mcts.mcp.models import MCPPrompt, MCPServerInfo


def _scan_prompts(tmp_path: Path, prompts: list[MCPPrompt]):
config = ScanConfig(
target=tmp_path,
surfaces=["prompt"],
surface_scoped_analyzers=True,
analyzers=["prompt_injection"],
scoring_mode="legacy",
)
server = MCPServerInfo(name="test", prompts=prompts)
return Scanner(config).analyze_server(server)


def test_duplicate_prompt_content_merges_locations(tmp_path: Path) -> None:
text = "Safe prompt text with hidden marker\u200b.\n"
skill = tmp_path / "skills" / "deploy" / "SKILL.md"
skill.parent.mkdir(parents=True)
agent = tmp_path / "src" / "agent_instructions.py"
agent.parent.mkdir(parents=True)

report = _scan_prompts(
tmp_path,
[
MCPPrompt(
name="deploy",
description=text,
source_file=str(skill),
source_line=1,
discovered_via="skill-md",
),
MCPPrompt(
name="agent_instructions",
description=text,
source_file=str(agent),
source_line=4,
discovered_via="instruction-file",
),
],
)

prompt_findings = [finding for finding in report.findings if finding.analyzer == "prompt_injection"]
assert len(prompt_findings) == 1
also_found_in = prompt_findings[0].evidence.get("also_found_in")
assert also_found_in == [
{"file": str(skill), "line": 1},
{"file": str(agent), "line": 4},
]


def test_distinct_prompts_in_same_file_are_not_deduped(tmp_path: Path) -> None:
source = tmp_path / "prompts" / "agent_prompts.md"
source.parent.mkdir(parents=True)
prompts = [
MCPPrompt(
name="first",
description="First prompt with hidden marker\u200b.",
source_file=str(source),
source_line=1,
),
MCPPrompt(
name="second",
description="Second prompt with hidden marker\u200b.",
source_file=str(source),
source_line=7,
),
]

report = _scan_prompts(tmp_path, prompts)

prompt_findings = [finding for finding in report.findings if finding.analyzer == "prompt_injection"]
assert len(prompt_findings) == 2