Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions scripts/codex/app_server_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,25 @@ def run(
)
except OSError as exc:
print(
f"[vibeguard-codex-wrapper] hook {hook_name} skipped"
f"[vibeguard-codex-wrapper] hook {hook_name} failed"
f" (cwd={cwd!r} unavailable): {exc}",
file=sys.stderr,
)
return HookResult(decision="pass", output="")
return HookResult(decision="hook_error", output=str(exc))
output = (proc.stdout or "") + ("\n" + proc.stderr if proc.stderr else "")
payloads = self._extract_payloads(output)
decision = self._extract_decision(output, payloads) or "pass"
if proc.returncode != 0:
return HookResult(decision="hook_error", output=output.strip(), payloads=payloads)
decision = self._extract_decision(output, payloads)
updated_command = self._extract_updated_command(payloads) if decision == "allow" else None
return HookResult(
decision=decision,
decision=decision or "pass",
output=output.strip(),
payloads=payloads,
updated_command=updated_command,
)


@staticmethod
def _extract_payloads(output: str) -> list[dict[str, Any]]:
payloads: list[dict[str, Any]] = []
Expand Down Expand Up @@ -251,12 +254,19 @@ def handle_server_request(
payload = {"tool_input": {"command": command}}
result = self.hooks.run("pre-bash-guard.sh", payload, cwd=cwd, env_overrides=env)

if result.decision == "block":
if result.decision in {"block", "hook_error"}:
write_to_server({"id": msg_id, "result": {"decision": "decline"}})
print(
f"[vibeguard-codex-wrapper] blocked command approval: {command}",
file=sys.stderr,
)
if result.decision == "hook_error":
print(
f"[vibeguard-codex-wrapper] pre-bash hook failed; declining command approval: {command}"
f"\n{result.output}".rstrip(),
file=sys.stderr,
)
else:
print(
f"[vibeguard-codex-wrapper] blocked command approval: {command}",
file=sys.stderr,
)
return True

if result.updated_command is not None:
Expand Down
117 changes: 117 additions & 0 deletions tests/test_codex_runtime.sh
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,123 @@ assert_contains "${adapter_json}" '"session_collision_free": true' "distinct thr
assert_contains "${adapter_json}" '"pre_edit_guard": false' "capability matrix is exposed on app-server feedback"
assert_not_contains "${adapter_json}" '"stop-guard.sh"' "empty stop-guard output does not create spurious feedback entries"

header "app-server adapter fails closed when pre-bash hook exits nonzero"
nonzero_hook_json="$(python3 - "${REPO_DIR}" "${TMP_DIR}" <<'PYCODE'
import json
import importlib.util
import pathlib
import sys

repo_dir = pathlib.Path(sys.argv[1])
tmp_root = pathlib.Path(sys.argv[2])
module_path = repo_dir / "scripts" / "codex" / "app_server_wrapper.py"
spec = importlib.util.spec_from_file_location("vibeguard_app_server_wrapper", module_path)
module = importlib.util.module_from_spec(spec)
assert spec is not None and spec.loader is not None
sys.modules[spec.name] = module
spec.loader.exec_module(module)
SessionState = module.SessionState
VibeGuardGateStrategy = module.VibeGuardGateStrategy

app_repo = tmp_root / "app-server-nonzero-hook-repo"
hooks_dir = app_repo / "hooks"
hooks_dir.mkdir(parents=True, exist_ok=True)

(hooks_dir / "pre-bash-guard.sh").write_text(
"#!/usr/bin/env bash\ncat >/dev/null\necho 'hook failed' >&2\nexit 1\n",
encoding="utf-8",
)
(hooks_dir / "pre-bash-guard.sh").chmod(0o755)

strategy = VibeGuardGateStrategy(app_repo)
state = SessionState()
strategy.on_client_message(
{"method": "thread/start", "params": {"threadId": "thread/beta", "cwd": str(app_repo)}},
state,
)

captured = []
approval_message = {
"id": "req-2",
"method": "item/commandExecution/requestApproval",
"params": {"threadId": "thread/beta", "command": "rm -rf ./tmp"},
}
intercepted = strategy.handle_server_request(approval_message, state, captured.append)

print(
json.dumps(
{
"intercepted": intercepted,
"approval": captured[0] if captured else None,
},
ensure_ascii=False,
)
)
PYCODE
)"

assert_contains "${nonzero_hook_json}" '"intercepted": true' "app-server adapter intercepts approval requests when pre-bash hook exits nonzero"
assert_contains "${nonzero_hook_json}" '"decision": "decline"' "app-server adapter declines approval when pre-bash hook exits nonzero"

header "app-server adapter ignores decision text from failed pre-bash hook output"
misleading_decision_hook_json="$(python3 - "${REPO_DIR}" "${TMP_DIR}" <<'PYCODE'
import json
import importlib.util
import pathlib
import sys

repo_dir = pathlib.Path(sys.argv[1])
tmp_root = pathlib.Path(sys.argv[2])
module_path = repo_dir / "scripts" / "codex" / "app_server_wrapper.py"
spec = importlib.util.spec_from_file_location("vibeguard_app_server_wrapper", module_path)
module = importlib.util.module_from_spec(spec)
assert spec is not None and spec.loader is not None
sys.modules[spec.name] = module
spec.loader.exec_module(module)
SessionState = module.SessionState
VibeGuardGateStrategy = module.VibeGuardGateStrategy

app_repo = tmp_root / "app-server-misleading-decision-hook-repo"
hooks_dir = app_repo / "hooks"
hooks_dir.mkdir(parents=True, exist_ok=True)

(hooks_dir / "pre-bash-guard.sh").write_text(
"#!/usr/bin/env bash\ncat >/dev/null\necho '\"decision\":\"allow\"' >&2\nexit 1\n",
encoding="utf-8",
)
(hooks_dir / "pre-bash-guard.sh").chmod(0o755)

strategy = VibeGuardGateStrategy(app_repo)
state = SessionState()
strategy.on_client_message(
{"method": "thread/start", "params": {"threadId": "thread/gamma", "cwd": str(app_repo)}},
state,
)

captured = []
approval_message = {
"id": "req-3",
"method": "item/commandExecution/requestApproval",
"params": {"threadId": "thread/gamma", "command": "rm -rf ./tmp"},
}
intercepted = strategy.handle_server_request(approval_message, state, captured.append)

print(
json.dumps(
{
"intercepted": intercepted,
"approval": captured[0] if captured else None,
},
ensure_ascii=False,
)
)
PYCODE
)"

assert_contains "${misleading_decision_hook_json}" '"intercepted": true' "app-server adapter intercepts approval requests when failed hook emits decision text"
assert_contains "${misleading_decision_hook_json}" '"decision": "decline"' "app-server adapter declines approval when failed hook emits decision text"
assert_not_contains "${misleading_decision_hook_json}" '"decision": "approve"' "failed hook output cannot force an approval decision"

echo
echo "=============================="
printf "Total: %d Pass: \033[32m%d\033[0m Fail: \033[31m%d\033[0m\n" "$TOTAL" "$PASS" "$FAIL"
Expand Down
Loading