diff --git a/CHANGELOG.md b/CHANGELOG.md index c81d031..203080b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Deduplicate prompt-injection findings for identical prompt content across multiple source locations. + ## [0.1.4] - 2026-06-12 ### Security diff --git a/src/mcts/analyzers/prompt_dedupe.py b/src/mcts/analyzers/prompt_dedupe.py new file mode 100644 index 0000000..475787b --- /dev/null +++ b/src/mcts/analyzers/prompt_dedupe.py @@ -0,0 +1,90 @@ +"""Remove duplicate prompt findings reported for identical prompt content.""" + +from __future__ import annotations + +from typing import Any + +from mcts.reporting.models import Finding + +_PROMPT_SURFACES = frozenset({"prompt", "instruction"}) + + +def dedupe_prompt_findings(findings: list[Finding]) -> list[Finding]: + """Merge prompt-injection findings that point at the same prompt text.""" + if not any(_dedupe_key(finding) for finding in findings): + return findings + + kept: list[Finding] = [] + index_by_key: dict[tuple[str, str, str, str, str], int] = {} + for finding in findings: + key = _dedupe_key(finding) + if key is None: + kept.append(finding) + continue + existing_index = index_by_key.get(key) + if existing_index is None: + index_by_key[key] = len(kept) + kept.append(finding) + continue + kept[existing_index] = _merge_locations(kept[existing_index], finding) + return kept + + +def _dedupe_key(finding: Finding) -> tuple[str, str, str, str, str] | None: + if finding.analyzer != "prompt_injection": + return None + evidence = finding.evidence or {} + if evidence.get("surface") not in _PROMPT_SURFACES: + return None + if not finding.location or not finding.location.file: + return None + content_hash = str(evidence.get("content_hash") or "") + if not content_hash: + return None + finding_type = str(evidence.get("type") or "-".join(finding.id.split("-", 2)[:2])) + field = str(evidence.get("field") or "") + return finding.analyzer, finding_type, field, content_hash, finding.severity.value + + +def _merge_locations(existing: Finding, duplicate: Finding) -> Finding: + evidence = dict(existing.evidence or {}) + locations = _unique_locations( + [ + _location_row(existing), + *(evidence.get("also_found_in") or []), + _location_row(duplicate), + ] + ) + if len(locations) > 1: + evidence["also_found_in"] = locations + return existing.model_copy(update={"evidence": evidence}) + + +def _location_row(finding: Finding) -> dict[str, Any] | None: + if not finding.location or not finding.location.file: + return None + row: dict[str, Any] = {"file": finding.location.file} + if finding.location.line is not None: + row["line"] = finding.location.line + return row + + +def _unique_locations(rows: list[Any]) -> list[dict[str, Any]]: + seen: set[tuple[str, int | None]] = set() + unique: list[dict[str, Any]] = [] + for row in rows: + if not isinstance(row, dict): + continue + file = str(row.get("file") or "") + if not file: + continue + line = row.get("line") + key = (file, line if isinstance(line, int) else None) + if key in seen: + continue + seen.add(key) + item: dict[str, Any] = {"file": file} + if key[1] is not None: + item["line"] = key[1] + unique.append(item) + return unique diff --git a/src/mcts/analyzers/prompt_injection.py b/src/mcts/analyzers/prompt_injection.py index 35925dc..1be10a0 100644 --- a/src/mcts/analyzers/prompt_injection.py +++ b/src/mcts/analyzers/prompt_injection.py @@ -2,6 +2,7 @@ from __future__ import annotations +import hashlib import re from mcts.analyzers.base import BaseAnalyzer @@ -12,7 +13,7 @@ surface_text_fields, tool_for_surface, ) -from mcts.analyzers.surfaces import ScanSurface +from mcts.analyzers.surfaces import ScanSurface, ScanSurfaceKind from mcts.analyzers.tpa_patterns import ( find_homoglyphs, has_ansi_smuggling, @@ -47,9 +48,12 @@ def _analyze_surface(self, server: MCPServerInfo, surface: ScanSurface) -> list[ intentional_context = is_intentional_context_surface(surface) for field, text in surface_text_fields(surface): - findings.extend(self._unicode_findings(surface, text, field, loc, tool_name)) + field_findings = self._unicode_findings(surface, text, field, loc, tool_name) if field == "description" and not intentional_context: - findings.extend(self._description_only_findings(surface, text, loc, tool, tool_name)) + field_findings.extend(self._description_only_findings(surface, text, loc, tool, tool_name)) + if surface.kind in {ScanSurfaceKind.PROMPT, ScanSurfaceKind.INSTRUCTION}: + field_findings = _with_content_hash(field_findings, text) + findings.extend(field_findings) return findings @@ -215,3 +219,19 @@ def _description_handler_mismatch(self, tool: MCPTool) -> bool: w in snippet for w in ("subprocess", "os.system", "eval", "delete", "shell=true") ) return claims_safe and handler_dangerous + + +def _with_content_hash(findings: list[Finding], text: str) -> list[Finding]: + if not findings: + return [] + content_hash = hashlib.sha256(_normalize_text(text).encode("utf-8")).hexdigest() + rows: list[Finding] = [] + for finding in findings: + evidence = dict(finding.evidence) + evidence["content_hash"] = content_hash + rows.append(finding.model_copy(update={"evidence": evidence})) + return rows + + +def _normalize_text(text: str) -> str: + return text.replace("\r\n", "\n").replace("\r", "\n").strip() diff --git a/src/mcts/core/scanner.py b/src/mcts/core/scanner.py index 6f61d59..800b778 100644 --- a/src/mcts/core/scanner.py +++ b/src/mcts/core/scanner.py @@ -23,6 +23,7 @@ from mcts.analyzers.oauth_config import OAuthConfigAnalyzer from mcts.analyzers.path_validation import PathValidationAnalyzer from mcts.analyzers.permissions import PermissionAnalyzer +from mcts.analyzers.prompt_dedupe import dedupe_prompt_findings from mcts.analyzers.prompt_defense import PromptDefenseAnalyzer from mcts.analyzers.prompt_injection import PromptInjectionAnalyzer from mcts.analyzers.runtime_events import RuntimeEventsAnalyzer @@ -205,6 +206,7 @@ def analyze_server(self, server_info: MCPServerInfo) -> ScanReport: findings = self._apply_filters(findings) findings = dedupe_metadata_findings(findings) + findings = dedupe_prompt_findings(findings) findings = dedupe_sigma_findings(findings) findings = enrich_findings(findings) findings.extend(self.compliance.check(findings, tools_discovered=len(server_info.tools))) diff --git a/tests/test_prompt_dedupe.py b/tests/test_prompt_dedupe.py new file mode 100644 index 0000000..88cfac4 --- /dev/null +++ b/tests/test_prompt_dedupe.py @@ -0,0 +1,81 @@ +"""Prompt finding dedupe tests.""" + +from __future__ import annotations + +from pathlib import Path + +from mcts.core.config import ScanConfig +from mcts.core.scanner import Scanner +from mcts.mcp.models import MCPPrompt, MCPServerInfo + + +def _scan_prompts(tmp_path: Path, prompts: list[MCPPrompt]): + config = ScanConfig( + target=tmp_path, + surfaces=["prompt"], + surface_scoped_analyzers=True, + analyzers=["prompt_injection"], + scoring_mode="legacy", + ) + server = MCPServerInfo(name="test", prompts=prompts) + return Scanner(config).analyze_server(server) + + +def test_duplicate_prompt_content_merges_locations(tmp_path: Path) -> None: + text = "Safe prompt text with hidden marker\u200b.\n" + skill = tmp_path / "skills" / "deploy" / "SKILL.md" + skill.parent.mkdir(parents=True) + agent = tmp_path / "src" / "agent_instructions.py" + agent.parent.mkdir(parents=True) + + report = _scan_prompts( + tmp_path, + [ + MCPPrompt( + name="deploy", + description=text, + source_file=str(skill), + source_line=1, + discovered_via="skill-md", + ), + MCPPrompt( + name="agent_instructions", + description=text, + source_file=str(agent), + source_line=4, + discovered_via="instruction-file", + ), + ], + ) + + prompt_findings = [finding for finding in report.findings if finding.analyzer == "prompt_injection"] + assert len(prompt_findings) == 1 + also_found_in = prompt_findings[0].evidence.get("also_found_in") + assert also_found_in == [ + {"file": str(skill), "line": 1}, + {"file": str(agent), "line": 4}, + ] + + +def test_distinct_prompts_in_same_file_are_not_deduped(tmp_path: Path) -> None: + source = tmp_path / "prompts" / "agent_prompts.md" + source.parent.mkdir(parents=True) + prompts = [ + MCPPrompt( + name="first", + description="First prompt with hidden marker\u200b.", + source_file=str(source), + source_line=1, + ), + MCPPrompt( + name="second", + description="Second prompt with hidden marker\u200b.", + source_file=str(source), + source_line=7, + ), + ] + + report = _scan_prompts(tmp_path, prompts) + + prompt_findings = [finding for finding in report.findings if finding.analyzer == "prompt_injection"] + assert len(prompt_findings) == 2