forked from pnc/less-lethal
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfilter.py
More file actions
204 lines (173 loc) · 6.97 KB
/
filter.py
File metadata and controls
204 lines (173 loc) · 6.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""
mitmproxy allowlist filter — controls what the VM can access.
Traffic is filtered at two levels:
1. **Trusted domains** (below): infrastructure the VM needs to function —
package repos, CA cert endpoint. All HTTP methods and paths are allowed.
Edit these only when changing system-level dependencies.
2. **User rules** (allowlist.txt): per-method, per-URL patterns that grant
access to specific endpoints. Each non-blank, non-comment line must be:
METHOD https://hostname/path/pattern
Wildcards (*) are allowed only in the path, not in the hostname.
The filter reloads the file automatically when it changes.
"""
import json
import re
import time
from pathlib import Path
from urllib.parse import urlparse
from mitmproxy import http
# ── Trusted domains ─────────────────────────────────────────────────
# Full-domain allowlist for system infrastructure. Patterns are
# matched with re.fullmatch against the request hostname.
TRUSTED_DOMAINS: list[str] = [
# OS package repos — scoped to actual apt hostnames
r".*\.debian\.org",
"archive.ubuntu.com",
"security.ubuntu.com",
"ports.ubuntu.com",
r".*\.archive\.ubuntu\.com",
# Python package repos
"pypi.org",
r".*\.pypi\.org",
"files.pythonhosted.org",
# mitmproxy's magic domain that serves the CA cert
"mitm.it",
]
_trusted = [re.compile(p) for p in TRUSTED_DOMAINS]
# ── Paths ───────────────────────────────────────────────────────────
ALLOWLIST_PATH = Path(__file__).parent / "allowlist.txt"
BLOCKED_LOG = Path(__file__).parent / ".vm" / "blocked.jsonl"
# ── Cached state ────────────────────────────────────────────────────
_rules: list[tuple[str, str]] = []
_mtime: float = 0.0
def parse_allowlist(path: Path) -> list[tuple[str, str]]:
"""Parse *path* into a list of ``(METHOD, url_pattern)`` tuples.
Lines that are blank or start with ``#`` are skipped. Every other line
must be ``METHOD https://host/path...`` — bare domains and wildcard
hostnames are rejected with a warning printed to the mitmproxy log.
"""
rules: list[tuple[str, str]] = []
if not path.exists():
return rules
for lineno, raw in enumerate(path.read_text().splitlines(), 1):
line = raw.strip()
if not line or line.startswith("#"):
continue
parts = line.split(None, 1)
if len(parts) != 2:
print(f"[filter] {path.name}:{lineno}: expected 'METHOD URL', got: {raw!r}")
continue
method, url_pattern = parts
method = method.upper()
parsed = urlparse(url_pattern)
if not parsed.scheme or not parsed.hostname:
print(f"[filter] {path.name}:{lineno}: invalid URL: {url_pattern!r}")
continue
if "*" in parsed.hostname:
print(
f"[filter] {path.name}:{lineno}: wildcards not allowed in domain:"
f" {parsed.hostname!r}"
)
continue
rules.append((method, url_pattern))
return rules
def _maybe_reload() -> None:
"""Re-read allowlist.txt if it has been modified since last check."""
global _rules, _mtime
try:
mt = ALLOWLIST_PATH.stat().st_mtime
except FileNotFoundError:
if _rules or _mtime:
_rules, _mtime = [], 0.0
return
if mt != _mtime:
_rules = parse_allowlist(ALLOWLIST_PATH)
_mtime = mt
def is_allowed(
rules: list[tuple[str, str]],
method: str,
host: str,
url: str,
) -> bool:
"""Return True if the request is permitted.
Checks trusted domains first (all methods/paths allowed), then user
rules. A ``GET`` rule implicitly allows ``HEAD`` requests to the
same URL pattern.
"""
if any(p.fullmatch(host) for p in _trusted):
return True
req_path = urlparse(url).path or "/"
for rule_method, url_pattern in rules:
check = "GET" if method == "HEAD" and rule_method == "GET" else method
if rule_method != check:
continue
parsed = urlparse(url_pattern)
if parsed.hostname != host:
continue
pat_path = parsed.path or "/"
path_re = re.escape(pat_path).replace(r"\*", ".*")
if re.fullmatch(path_re, req_path):
return True
return False
def _log_blocked(flow: http.HTTPFlow) -> None:
"""Append a JSON line to the blocked-requests log."""
try:
BLOCKED_LOG.parent.mkdir(parents=True, exist_ok=True)
with BLOCKED_LOG.open("a") as f:
json.dump(
{
"ts": time.time(),
"method": flow.request.method,
"host": flow.request.pretty_host,
"url": flow.request.pretty_url,
},
f,
)
f.write("\n")
except OSError:
pass
BLOCKED_STATUS = 418
def _blocked_body(method: str, url: str) -> str:
"""Build the response body for a blocked request."""
parsed = urlparse(url)
clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
return (
"================================================================\n"
"PROXY BLOCK — HTTP 418 (not a remote server error)\n"
"================================================================\n"
"\n"
f"Blocked: {method} {url}\n"
"\n"
"This VM's outbound traffic is filtered by an allowlist proxy\n"
'to prevent the "lethal trifecta" (code execution + internet\n'
"access + autonomy without human oversight). A 418 response\n"
"means the proxy blocked this request — it never left the VM.\n"
"\n"
"STOP. Do not retry or try to work around this block.\n"
"\n"
"Instead, ask the user to approve this request by adding a\n"
"rule to allowlist.txt on the host machine. Suggest a\n"
"narrowly-scoped rule and justify any wildcards:\n"
"\n"
f" {method} {clean_url}\n"
"\n"
"Changes to allowlist.txt take effect on the next request.\n"
"================================================================\n"
)
class AllowlistAddon:
def request(self, flow: http.HTTPFlow) -> None:
_maybe_reload()
if is_allowed(
_rules, flow.request.method, flow.request.pretty_host, flow.request.pretty_url
):
return
method = flow.request.method
url = flow.request.pretty_url
print(f"[filter] BLOCKED {method} {url}")
_log_blocked(flow)
flow.response = http.Response.make(
BLOCKED_STATUS,
_blocked_body(method, url),
{"Content-Type": "text/plain"},
)
addons = [AllowlistAddon()]