Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,30 @@ def _get_mode(event: SecurityEvent) -> str:
return ""


def _has_hardening_stats(event: SecurityEvent) -> bool:
"""Return whether a hardening event has parsed rule summary statistics."""
result = _get_result(event)
return isinstance(result.get("total"), int) and result.get("total", 0) > 0


def _has_actionable_hardening_failure(event: SecurityEvent) -> bool:
"""Return whether a hardening event has a parsed rule failure to fix."""
result = _get_result(event)
failures = result.get("failures", [])
if not isinstance(failures, list):
return False

for failure in failures:
if not isinstance(failure, dict):
continue
if failure.get("status") == "UNKNOWN":
continue
if not failure.get("rule_id"):
continue
return True
return False


def _format_timestamp(ts: str) -> str:
"""Render an ISO-8601 timestamp in local time for inline display."""
try:
Expand Down Expand Up @@ -192,8 +216,9 @@ def _summarize_hardening(events: list[SecurityEvent]) -> str:
f"(succeeded: {reinf_ok}, failed: {reinf_fail})"
)

# Latest scan result details (prefer succeeded, fall back to latest failed)
latest_scan = next((e for e in scans if e.result == "succeeded"), None)
# Latest scan result details. Loongshield returns non-zero for non-compliant
# scans, but the backend may still parse usable passed/total statistics.
latest_scan = next((e for e in scans if _has_hardening_stats(e)), None)
Comment thread
edonyzpc marked this conversation as resolved.
if latest_scan:
result = _get_result(latest_scan)
passed = result.get("passed", 0)
Expand Down Expand Up @@ -645,12 +670,10 @@ def _compute_suggestions(
# --- Hardening suggestions ---
if hardening_events:
latest = hardening_events[0] # newest-first after _group_by_category sort
if latest.result == "succeeded":
result = _get_result(latest)
if result.get("failures"):
suggestions.append(
"agent-sec-cli harden --reinforce Fix failed rules"
)
if _has_actionable_hardening_failure(latest) and (
latest.result == "succeeded" or _has_hardening_stats(latest)
):
suggestions.append("agent-sec-cli harden --reinforce Fix failed rules")

# --- Skill-ledger suggestions ---
if ledger_statuses:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,55 @@

_ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")
_RULE_STATUS_RE = re.compile(
r"\[(?P<rule_id>[\w.]+)\]\s+"
r"(?P<status>FAIL|FAILED|FAILED-TO-FIX|ERROR|ENFORCE-ERROR|DRY-RUN|MANUAL|SKIP):\s*"
r"\[(?P<rule_id>[^\]\s]+)\]\s+"
r"(?P<status>FAIL|FAILED|FIXED|FAILED-TO-FIX|ERROR|ENFORCE-ERROR|DRY-RUN|MANUAL|SKIP):\s*"
r"(?P<message>.+?)\s*$"
)
_ENGINE_ERROR_RE = re.compile(r"Engine\s+Error:\s*(?P<message>.+?)\s*$")
_VERBOSE_RULE_STATUS_RE = re.compile(
r"^\s*(?P<status>PASS|FAIL)\s+\[(?P<rule_id>[^\]\s]+)\]\s+" r"(?P<message>.+?)\s*$"
)
_ENGINE_ERROR_RE = re.compile(
r"(?:\[(?P<rule_id>[^\]\s]+)\]\s+)?Engine\s+Error:\s*" r"(?P<message>.+?)\s*$"
)


def _strip_ansi(text: str) -> str:
"""Remove ANSI colour and style sequences from process output."""
return _ANSI_RE.sub("", text)


def _parse_rule_status_line(line: str) -> dict[str, str] | None:
"""Parse supported loongshield per-rule status line formats."""
match = _RULE_STATUS_RE.search(line)
if match:
return {
"rule_id": match.group("rule_id"),
"status": match.group("status"),
"message": match.group("message").strip(),
}

match = _VERBOSE_RULE_STATUS_RE.search(line)
Comment thread
edonyzpc marked this conversation as resolved.
if not match or match.group("status") == "PASS":
return None

return {
"rule_id": match.group("rule_id"),
"status": match.group("status"),
"message": match.group("message").strip(),
}


class HardeningBackend(BaseBackend):
"""Execute `loongshield seharden` and keep structured hardening results."""

_SUMMARY_RE = re.compile(
r"SEHarden\s+Finished\.\s*"
r"(?:SEHarden\s+Finished\.|Summary:)\s*"
r"(?P<passed>\d+)\s+passed,\s*"
r"(?P<fixed>\d+)\s+fixed,\s*"
r"(?P<failed>\d+)\s+failed,\s*"
r"(?P<manual>\d+)\s+manual,\s*"
r"(?P<dry_run_pending>\d+)\s+dry-run-pending\s*/\s*"
r"(?P<total>\d+)\s+total\."
r"(?P<total>\d+)\s+total\.?"
)

def execute(
Expand Down Expand Up @@ -255,36 +281,39 @@ def _parse_output(cls, clean_output: str, data: dict[str, Any]) -> None:

entries: list[dict[str, str]] = []
for line in clean_output.splitlines():
match = _RULE_STATUS_RE.search(line)
match = _parse_rule_status_line(line)
if match:
entries.append(
{
"rule_id": match.group("rule_id"),
"status": match.group("status"),
"message": match.group("message").strip(),
}
)
entries.append(match)
continue

engine_match = _ENGINE_ERROR_RE.search(line)
if engine_match:
entries.append(
{
"rule_id": "",
"rule_id": engine_match.group("rule_id") or "",
"status": "Engine Error",
"message": engine_match.group("message").strip(),
}
)

mode = data.get("mode")
fixed_statuses = frozenset({"FAIL", "FAILED"})
fixed_statuses = frozenset({"FIXED"})
legacy_fixed_statuses = frozenset({"FAIL", "FAILED"})
all_fixed_statuses = fixed_statuses | legacy_fixed_statuses
if mode == "reinforce":
data["failures"] = [
entry for entry in entries if entry["status"] not in fixed_statuses
entry for entry in entries if entry["status"] not in all_fixed_statuses
Comment thread
edonyzpc marked this conversation as resolved.
Outdated
]
data["fixed_items"] = [
fixed_items = [
entry for entry in entries if entry["status"] in fixed_statuses
]
if not fixed_items:
fixed_items = [
entry
for entry in entries
if entry["status"] in legacy_fixed_statuses
]
data["fixed_items"] = fixed_items
else:
data["failures"] = entries

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def post_action(
event = SecurityEvent(
event_type=ctx.action,
category=_category_for(ctx.action),
result="succeeded" if result.success else "failed",
details=details,
trace_id=ctx.trace_id,
session_id=ctx.session_id,
Expand Down
34 changes: 23 additions & 11 deletions src/agent-sec-core/tests/e2e/cli/test_events_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import json
import time

import pytest

# Import shared helpers from conftest.py
from .conftest import iso_now, require_loongshield, run_cli

Expand Down Expand Up @@ -428,8 +426,8 @@ def test_json_output_completeness(self):
- Whether harden ran in scan or reinforce mode
- The actual output from the harden command

We verify core fields that ALWAYS exist, and make statistical
fields (passed/failed/total) conditional.
We verify core fields that ALWAYS exist, and make seharden summary
statistics conditional.
"""
since = iso_now()
time.sleep(0.05)
Expand Down Expand Up @@ -467,9 +465,8 @@ def test_json_output_completeness(self):
result_data = event["details"]["result"]
assert "argv" in result_data or "mode" in result_data

# Statistical fields (passed/failed/total) only present if loongshield
# is installed and harden completed successfully
# When loongshield is missing, harden may exit 127 without stats
# Statistical fields are present when loongshield emits a parseable
# seharden summary. A non-compliant scan may still exit 1 with stats.
if "passed" in result_data:
# If one statistical field exists, all should exist
assert (
Expand All @@ -485,7 +482,7 @@ def test_harden_event_with_loongshield_stats(self):
"""TC-006 (extended): When loongshield is installed, verify full stats.

This test validates that when loongshield is available, the harden
event contains complete statistical data (passed/failed/total fields).
event contains complete parsed seharden summary statistics.
"""
require_loongshield()

Expand All @@ -502,18 +499,34 @@ def test_harden_event_with_loongshield_stats(self):
events = json.loads(result.stdout)
assert len(events) == 1

# With loongshield, statistical fields should be present
# With loongshield, statistical fields should be present when seharden
# emits a parseable summary.
result_data = events[0]["details"]["result"]
assert "passed" in result_data, "Expected 'passed' field with loongshield"
assert "failed" in result_data, "Expected 'failed' field with loongshield"
assert "fixed" in result_data, "Expected 'fixed' field with loongshield"
assert "manual" in result_data, "Expected 'manual' field with loongshield"
assert (
"dry_run_pending" in result_data
), "Expected 'dry_run_pending' field with loongshield"
assert "total" in result_data, "Expected 'total' field with loongshield"

# Validate data types and consistency
assert isinstance(result_data["passed"], int)
assert isinstance(result_data["failed"], int)
assert isinstance(result_data["fixed"], int)
assert isinstance(result_data["manual"], int)
assert isinstance(result_data["dry_run_pending"], int)
assert isinstance(result_data["total"], int)
assert result_data["total"] > 0, "Total rules should be > 0"
assert result_data["passed"] + result_data["failed"] == result_data["total"]
assert (
result_data["passed"]
+ result_data["fixed"]
+ result_data["failed"]
+ result_data["manual"]
+ result_data["dry_run_pending"]
== result_data["total"]
)


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -564,7 +577,6 @@ class TestEventsTimeRange:

def test_last_hours_decimal_precision(self):
"""TC-008: --last-hours works with decimal values."""
since = iso_now()
time.sleep(0.05)
run_cli("harden")
time.sleep(0.1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,47 @@ def test_failed_scan(self):
# Latest harden failed -> needs_attention (not critical)
assert "Needs attention" in output

def test_failed_scan_with_stats_still_shows_compliance(self):
events = [
_make_event(
event_type="harden",
category="hardening",
result="failed",
details={
"request": {
"args": ["--scan", "--config", "agentos_baseline", "--verbose"]
},
"result": {
"mode": "scan",
"config": "agentos_baseline",
"passed": 20,
"failed": 3,
"total": 23,
"failures": [
{
"rule_id": "SEC-001",
"status": "FAIL",
"message": "issue",
}
],
"fixed": 0,
"manual": 0,
"dry_run_pending": 0,
"fixed_items": [],
},
},
timestamp=_ts_minutes_ago(5),
)
]

output = format_summary(events, "last 24 hours")

assert "Scans performed: 1 (succeeded: 0, failed: 1)" in output
assert "Latest scan result:" in output
assert "Compliance: 20/23 rules passed (87.0%)" in output
assert "Latest scan failed" not in output
assert "agent-sec-cli harden --reinforce" in output


# ---------------------------------------------------------------------------
# Test: asset verify summary
Expand Down Expand Up @@ -914,6 +955,38 @@ def test_no_suggestion_when_latest_harden_failed(self):
output = format_summary(events, "last 24 hours")
assert "agent-sec-cli harden --reinforce" not in output

def test_no_reinforce_suggestion_when_failed_harden_has_parser_failure(self):
events = [
_make_event(
event_type="harden",
category="hardening",
result="failed",
details={
"request": {"args": ["--scan"]},
"result": {
"mode": "scan",
"passed": 22,
"failed": 1,
"total": 23,
"failures": [
{
"rule_id": "",
"status": "UNKNOWN",
"message": "Summary reports non-pass rules but details failed.",
}
],
},
},
timestamp=_ts_minutes_ago(5),
)
]

output = format_summary(events, "last 24 hours")

assert "Compliance: 22/23 rules passed" in output
assert "Latest scan failed" not in output
assert "agent-sec-cli harden --reinforce" not in output

def test_no_suggestion_when_no_hardening_events(self):
"""Only sandbox events — no suggestion at all."""
events = [
Expand Down
Loading
Loading