Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/agent-sec-core/cosh-extension/hooks/prompt_scanner_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ def _allow() -> str:
return json.dumps({"decision": "allow"})


def _ask(reason: str) -> str:
"""Return an 'ask' cosh HookOutput so the user sees the issue and decides."""
return json.dumps({"decision": "ask", "reason": reason}, ensure_ascii=False)


def _build_detail_reason(scan_result: dict) -> str:
"""Build a detailed reason string from scan result for security operations."""
threat_type = scan_result.get("threat_type", "")
Expand Down Expand Up @@ -71,9 +76,9 @@ def _format_cosh(scan_result: dict) -> str:
verdict == "pass" -> decision "allow"
verdict == "warn" -> decision "ask" (let user decide)
verdict == "deny" -> decision "ask" (let user decide)
otherwise -> fail-open "allow"
otherwise -> decision "ask" (fail-ask: surface scan failure to user)
"""
verdict = scan_result.get("verdict", "pass")
verdict = scan_result.get("verdict")

if verdict == "pass":
return json.dumps({"decision": "allow"})
Expand All @@ -92,8 +97,9 @@ def _format_cosh(scan_result: dict) -> str:
{"decision": "ask", "reason": reason},
ensure_ascii=False,
)
# other error or unknown verdict -> fail-open
return json.dumps({"decision": "allow"})
# missing, error, or unknown verdict -> fail-ask: let user decide
error_detail = scan_result.get("summary", verdict or "unknown")
return _ask(f"[prompt-scanner] 扫描异常 (verdict={verdict}): {error_detail}")


# -- main ------------------------------------------------------------------
Expand Down Expand Up @@ -142,11 +148,11 @@ def main() -> None:
f"[prompt-scanner] CLI timed out after {exc.timeout}s",
file=sys.stderr,
)
print(_allow())
print(_ask(f"[prompt-scanner] 安全扫描超时 ({exc.timeout}s),未完成扫描"))
return
except Exception as exc:
print(f"[prompt-scanner] CLI invocation failed: {exc}", file=sys.stderr)
print(_allow())
print(_ask(f"[prompt-scanner] 安全扫描调用失败: {exc}"))
return

if proc.returncode != 0:
Expand All @@ -156,7 +162,7 @@ def main() -> None:
f" {'; '.join(stderr_tail)}",
file=sys.stderr,
)
print(_allow())
print(_ask(f"[prompt-scanner] 安全扫描异常退出 (code={proc.returncode})"))
return

# 4. Parse ScanResult JSON from stdout
Expand All @@ -167,7 +173,7 @@ def main() -> None:
f"[prompt-scanner] failed to parse CLI output: {exc}",
file=sys.stderr,
)
print(_allow())
print(_ask("[prompt-scanner] 安全扫描结果解析失败,未完成扫描"))
return

# 5. Format and print cosh output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

Tests cover:
1. verdict → decision mapping (pass, warn, deny, error, unknown)
2. Error verdict fails open
2. Error/unknown verdicts fail-ask (not fail-open)
3. Subprocess integration: pipe JSON into the hook and verify stdout
"""

Expand Down Expand Up @@ -90,35 +90,39 @@ def test_deny_returns_ask(self):


class TestFormatCoshError:
"""verdict=error → fail-open allow."""
"""verdict=error → fail-ask: surface to user."""

def test_error_returns_allow(self):
def test_error_returns_ask(self):
result = json.loads(
_format_cosh(
{
"verdict": "error",
"summary": "internal scanner failure",
"summary": "agent-sec daemon is unavailable",
}
)
)
assert result["decision"] == "allow"
assert result["decision"] == "ask"
assert "扫描异常" in result["reason"]
assert "agent-sec daemon is unavailable" in result["reason"]

def test_error_with_empty_summary_returns_allow(self):
def test_error_with_no_detail_returns_ask(self):
result = json.loads(_format_cosh({"verdict": "error"}))
assert result["decision"] == "allow"
assert result["decision"] == "ask"
assert "error" in result["reason"]


class TestFormatCoshUnknown:
"""Unknown verdict → fail-open allow."""
"""Unknown verdict → fail-ask: surface to user."""

def test_unknown_verdict_returns_allow(self):
def test_unknown_verdict_returns_ask(self):
result = json.loads(_format_cosh({"verdict": "unknown"}))
assert result["decision"] == "allow"
assert result["decision"] == "ask"

def test_missing_verdict_defaults_to_allow(self):
"""When verdict key is missing, default is 'pass' → allow."""
def test_missing_verdict_returns_ask(self):
"""When verdict key is missing, fail-ask (not fail-open)."""
result = json.loads(_format_cosh({}))
assert result["decision"] == "allow"
assert result["decision"] == "ask"
assert "扫描异常" in result["reason"]


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -220,3 +224,63 @@ def fake_run(args, **kwargs):
"user_input",
]
assert captured["kwargs"]["check"] is False

def _mock_stdin(self, monkeypatch):
"""Set stdin to a valid prompt so main() reaches the subprocess call."""
monkeypatch.setattr(
prompt_scanner_hook.sys,
"stdin",
io.StringIO(json.dumps({"prompt": "test prompt"})),
)

def test_cli_timeout_returns_ask(self, monkeypatch, capsys):
self._mock_stdin(monkeypatch)

def fake_run(args, **kwargs):
raise subprocess.TimeoutExpired(args, kwargs.get("timeout", 10))

monkeypatch.setattr(prompt_scanner_hook.subprocess, "run", fake_run)
prompt_scanner_hook.main()
output = json.loads(capsys.readouterr().out)
assert output["decision"] == "ask"
assert "超时" in output["reason"]

def test_cli_exception_returns_ask(self, monkeypatch, capsys):
self._mock_stdin(monkeypatch)

def fake_run(args, **kwargs):
raise OSError("No such file")

monkeypatch.setattr(prompt_scanner_hook.subprocess, "run", fake_run)
prompt_scanner_hook.main()
output = json.loads(capsys.readouterr().out)
assert output["decision"] == "ask"
assert "调用失败" in output["reason"]

def test_cli_nonzero_exit_returns_ask(self, monkeypatch, capsys):
self._mock_stdin(monkeypatch)

def fake_run(args, **kwargs):
return subprocess.CompletedProcess(
args, returncode=1, stdout="", stderr="segfault"
)

monkeypatch.setattr(prompt_scanner_hook.subprocess, "run", fake_run)
prompt_scanner_hook.main()
output = json.loads(capsys.readouterr().out)
assert output["decision"] == "ask"
assert "异常退出" in output["reason"]

def test_cli_bad_json_returns_ask(self, monkeypatch, capsys):
self._mock_stdin(monkeypatch)

def fake_run(args, **kwargs):
return subprocess.CompletedProcess(
args, returncode=0, stdout="not-json", stderr=""
)

monkeypatch.setattr(prompt_scanner_hook.subprocess, "run", fake_run)
prompt_scanner_hook.main()
output = json.loads(capsys.readouterr().out)
assert output["decision"] == "ask"
assert "解析失败" in output["reason"]
Loading