Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/20260526_112822_paul.petit.ext_HEAD.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Fixed

- AI hooks are debounced when an agent calls the same hook multiple time.
30 changes: 30 additions & 0 deletions ggshield/verticals/ai/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import re
from typing import Any, Dict, List, Sequence, Set

import filelock
from notifypy import Notify

from ggshield.core.dirs import get_cache_dir
from ggshield.core.filter import censor_match
from ggshield.core.scan import ScannerProtocol
from ggshield.core.scan import SecretProtocol as Secret
Expand Down Expand Up @@ -33,6 +35,32 @@
}


def has_already_been_seen(content: str) -> bool:
"""Return True if the payload is identical to the most recent hook call.

Some agents install hooks from multiple assistants, which can invoke ggshield
twice with the same payload.
"""
payload_hash = hashlib.sha256(content.strip().encode()).hexdigest()
debounce_path = get_cache_dir() / "latest_ai_hook.txt"
try:
debounce_path.parent.mkdir(parents=True, exist_ok=True)
except OSError:
return False

# Make sure only one process can read/write the debounce file at a time
# to avoid having the same payload being processed twice.
with filelock.FileLock(debounce_path.with_suffix(".lock")):
try:
stored = debounce_path.read_text()
except FileNotFoundError:
stored = ""
if payload_hash == stored:
return True
debounce_path.write_text(payload_hash)
return False


def lookup(data: Dict[str, Any], keys: Sequence[str], default: Any = None) -> Any:
"""Returns the value of the first key found in a dictionary."""
for key in keys:
Expand Down Expand Up @@ -231,6 +259,8 @@ def __init__(self, scanner: ScannerProtocol):

def scan(self, content: str) -> int:
"""Scan the content, print the result and return the exit code."""
if content.strip() and has_already_been_seen(content):
return 0

payloads = parse_hook_input(content)
result = self._scan_payloads(payloads)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies = [
"notify-py>=0.3.43",
"keyring>=24.0.0,<26",
"tomli>=2.4.0",
"filelock>=3.19.1",
]

[project.urls]
Expand Down
36 changes: 35 additions & 1 deletion tests/unit/verticals/ai/test_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@

from ggshield.utils.git_shell import Filemode
from ggshield.verticals.ai.agents import Agent, Claude, Codex, Copilot, Cursor, VSCode
from ggshield.verticals.ai.hooks import AIHookScanner, find_filepaths, parse_hook_input
from ggshield.verticals.ai.hooks import (
AIHookScanner,
find_filepaths,
has_already_been_seen,
parse_hook_input,
)
from ggshield.verticals.ai.mcp import send_mcp_activity
from ggshield.verticals.ai.models import EventType, HookPayload, HookResult, Tool
from ggshield.verticals.secret import SecretScanner
Expand Down Expand Up @@ -121,6 +126,20 @@ def test_with_secrets_returns_block_and_message(self):
assert "remove the secrets from your prompt" in result.message


class TestHasAlreadyBeenSeen:
def test_first_call_is_not_duplicate(self):
assert has_already_been_seen('{"hook_event_name": "PreToolUse"}') is False

def test_second_identical_call_is_duplicate(self):
content = '{"hook_event_name": "PreToolUse"}'
assert has_already_been_seen(content) is False
assert has_already_been_seen(content) is True

def test_different_payload_is_not_duplicate(self):
assert has_already_been_seen('{"prompt": "a"}') is False
assert has_already_been_seen('{"prompt": "b"}') is False


class TestAIHookScannerScan:
"""Unit tests for the AIHookScanner.scan() method."""

Expand All @@ -144,6 +163,21 @@ def test_scan_no_secrets_returns_zero(self):
code = scanner.scan(json.dumps(data))
assert code == 0

def test_scan_duplicate_payload_skips_processing(self):
"""scan() with the same payload as the previous call returns early."""
mock_scanner = _mock_scanner([])
scanner = AIHookScanner(mock_scanner)
data = {
"hook_event_name": "UserPromptSubmit",
"prompt": "hello world",
"transcript_path": "/home/user/.claude/projects/foo/session.jsonl",
"cursor_version": "1.2.3",
}
content = json.dumps(data)
assert scanner.scan(content) == 0
assert scanner.scan(content) == 0
mock_scanner.scan.assert_called_once()

@patch("ggshield.verticals.ai.hooks.AIHookScanner._send_secret_notification")
def test_scan_post_tool_use_with_secrets_sends_notification(
self, mock_notify: MagicMock
Expand Down
3 changes: 3 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading