-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrewriter.py
More file actions
137 lines (123 loc) · 5.63 KB
/
Copy pathrewriter.py
File metadata and controls
137 lines (123 loc) · 5.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
"""
rewriter.py — Emit unified-diff patches by inserting guard blocks near risky sites.
Improvements:
- Groups candidates per file and applies them in-order on a single in-memory buffer (stable diffs).
- Supports insertion by exact source location if available: cand["loc"] = {"line": int, "col": int}.
Falls back to substring marker match cand["marker"].
- Idempotency: skips insertion if an identical guard already exists at the target site.
- Consistent indentation, newline normalization (LF), and optional guard banner.
- Git-compatible unified diffs with file paths and context lines.
- Returns an index with per-file patch path, plus stats.
Expected candidate shape (from synthesizer):
{
"file": "/abs/path/to/foo.c",
"kind": "BO"|"IO"|"MEM"|"NPD",
"cond": "if (idx >= bound)",
"body": ["/* guard */", "return -EINVAL;"],
"marker": "memcpy(buf, src, n)", # optional if loc given
"loc": {"line": 123, "col": 8} # optional; takes precedence when present
}
"""
from __future__ import annotations
from pathlib import Path
from typing import Dict, Any, List, Tuple
import difflib, json, os, datetime
LF = "\n"
def _normalize_newlines(s: str) -> str:
return s.replace("\r\n", "\n").replace("\r", "\n")
def _within_guard_block(lines: List[str], i: int) -> bool:
# Simple heuristic to avoid nesting the same guard right above itself.
# Checks previous 8 lines for an "if (" and "/* guard */" marker.
start = max(0, i - 8)
window = LF.join(lines[start:i])
return "/* guard */" in window or "CAGE-generated" in window
def _render_guard(cond: str, body: List[str], indent: str = " ", banner: str | None = None) -> List[str]:
out: List[str] = []
if banner:
out.append(f"/* {banner} */")
out.append(f"{cond} {{")
for b in body:
out.append(f"{indent}{b}")
out.append("}")
return out
def _insert_by_loc(src_lines: List[str], loc: Dict[str, int], cond: str, body: List[str], banner: str | None) -> Tuple[List[str], bool]:
# Insert guard directly above the line (1-based) specified by loc["line"].
line_no = max(1, int(loc.get("line", 1)))
i = min(len(src_lines), line_no) - 1 # index of target line
if _within_guard_block(src_lines, i):
return src_lines, False
guard = _render_guard(cond, body, banner=banner)
new_lines = src_lines[:i] + guard + [src_lines[i]] + src_lines[i+1:]
return new_lines, True
def _insert_by_marker(src_lines: List[str], marker: str, cond: str, body: List[str], banner: str | None) -> Tuple[List[str], bool]:
for i, ln in enumerate(src_lines):
if marker and marker in ln:
if _within_guard_block(src_lines, i):
return src_lines, False
guard = _render_guard(cond, body, banner=banner)
new_lines = src_lines[:i] + guard + [ln] + src_lines[i+1:]
return new_lines, True
# If marker not found, append at end (last resort)
guard = _render_guard(cond, body, banner=banner)
return src_lines + [""] + guard, True
class Rewriter:
def __init__(self, context_lines: int = 3, banner: str | None = "CAGE-generated guard"):
self.context_lines = context_lines
self.banner = banner
def _patch_for_file(self, path: Path, cand_list: List[Dict[str, Any]]) -> str:
orig = _normalize_newlines(path.read_text(encoding="utf-8", errors="ignore"))
src_lines = orig.split(LF)
applied = 0
for cand in cand_list:
cond = cand["cond"]
body = cand.get("body", ["/* guard */"])
loc = cand.get("loc")
if loc and isinstance(loc, dict) and "line" in loc:
src_lines, changed = _insert_by_loc(src_lines, loc, cond, body, self.banner)
else:
src_lines, changed = _insert_by_marker(src_lines, cand.get("marker", ""), cond, body, self.banner)
applied += 1 if changed else 0
patched = LF.join(src_lines)
# Produce unified diff (git-style header paths)
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
diff = difflib.unified_diff(
orig.splitlines(True),
patched.splitlines(True),
fromfile=str(path),
tofile=str(path) + ".guarded",
fromfiledate=ts,
tofiledate=ts,
n=self.context_lines
)
return "".join(diff)
def emit(self, ctx: Dict[str, Any], plan: Dict[str, Any], out_dir: Path) -> Dict[str, Any]:
out_dir.mkdir(parents=True, exist_ok=True)
# Group by file
per_file: Dict[str, List[Dict[str, Any]]] = {}
for c in plan.get("candidates", []):
per_file.setdefault(c["file"], []).append(c)
patches_index: List[Dict[str, Any]] = []
files_processed = 0
for fpath, cands in per_file.items():
fp = Path(fpath)
if not fp.exists():
continue
# Stable order: by location line when available, else by marker text
cands.sort(key=lambda c: (c.get("loc", {}).get("line", 10**9), c.get("marker", ""), c.get("kind","")))
diff_text = self._patch_for_file(fp, cands)
if not diff_text.strip():
continue
outp = out_dir / (fp.name + ".patch")
outp.write_text(diff_text, encoding="utf-8")
patches_index.append({
"file": fpath,
"patch": str(outp),
"candidates": len(cands)
})
files_processed += 1
return {
"patches": patches_index,
"files": files_processed,
"total_candidates": len(plan.get("candidates", []))
}