33from __future__ import annotations
44
55import json
6+ import os
67from pathlib import Path
78from typing import Any
89
@@ -17,6 +18,17 @@ def _read_json(path: Path) -> dict[str, Any] | None:
1718 return payload if isinstance (payload , dict ) else None
1819
1920
21+ def _is_safe_subpath (base : Path , subpath : str ) -> bool :
22+ """Validate that the given subpath resolves strictly within the base directory."""
23+ if not subpath :
24+ return False
25+ base_str = os .path .abspath (str (base ))
26+ if not base_str .endswith (os .sep ):
27+ base_str += os .sep
28+ target_str = os .path .abspath (os .path .join (base_str , str (subpath )))
29+ return target_str .startswith (base_str )
30+
31+
2032def discover_spec8_pack_candidates (
2133 data_root : str | Path = "data/scenarios" ,
2234 output_root : str | Path = "output" ,
@@ -56,9 +68,13 @@ def load_spec8_flow_artifacts(
5668 data_root : str | Path = "data/scenarios" ,
5769 output_root : str | Path = "output" ,
5870 output_pack_id : str | None = None ,
59- ) -> dict [str , Any ]:
71+ ) -> dict [str , Any ] | None :
6072 data_base = Path (data_root )
6173 output_base = Path (output_root )
74+
75+ if not _is_safe_subpath (data_base , pack_id ):
76+ return None
77+
6278 data_pack = data_base / pack_id
6379
6480 situation = _read_json (data_pack / "situation.json" )
@@ -69,6 +85,8 @@ def load_spec8_flow_artifacts(
6985 generation_trace = _read_json (data_pack / "generation" / "log.json" )
7086
7187 resolved_output_pack = output_pack_id or pack_id
88+ if not _is_safe_subpath (output_base , resolved_output_pack ):
89+ return None
7290 result = _read_json (output_base / resolved_output_pack / "result.json" )
7391 explanation = _read_json (output_base / resolved_output_pack / "explanation.json" )
7492
0 commit comments