@@ -119,3 +119,187 @@ def gate_check(
119119 if metrics .get (key , float ("inf" )) > max_val :
120120 return False
121121 return True
122+
123+
124+ # ---------------------------------------------------------------------------
125+ # Complementary metrics (#66 §2-3) — comfort and an off-road proxy.
126+ # These extend the displacement metrics already provided by
127+ # ``compute_open_loop_metrics`` above; they need no ground-truth trajectory
128+ # (comfort) or no other-agent labels (off-road), which L2D lacks.
129+ # ---------------------------------------------------------------------------
130+
131+ # nuPlan comfort bounds (the full set from nuplan-devkit `ego_is_comfortable`).
132+ COMFORT_THRESHOLDS = {
133+ "lon_accel_max" : 2.40 , # m/s^2 upper bound on longitudinal accel
134+ "lon_accel_min" : - 4.05 , # m/s^2 lower bound (braking)
135+ "lat_accel" : 4.89 , # m/s^2 |lateral accel|
136+ "yaw_rate" : 0.95 , # rad/s |yaw rate|
137+ "yaw_accel" : 1.93 , # rad/s^2 |yaw acceleration|
138+ "lon_jerk" : 4.13 , # m/s^3 |longitudinal jerk|
139+ "mag_jerk" : 8.37 , # m/s^3 |jerk magnitude| = sqrt(lon_jerk^2 + lat_jerk^2)
140+ }
141+
142+
143+ def compute_comfort_metrics (
144+ pred_accel : np .ndarray ,
145+ pred_curv : np .ndarray ,
146+ initial_speed : np .ndarray ,
147+ dt : float = 0.1 ,
148+ thresholds : dict [str , float ] = COMFORT_THRESHOLDS ,
149+ ) -> dict [str , float ]:
150+ """Comfort metrics from the ``(a, κ)`` outputs vs the nuPlan bounds (#66 §3).
151+
152+ Mirrors nuplan-devkit's ``ego_is_comfortable`` set — no ground truth needed.
153+ With the per-step speed ``v[t] = v0 + Σ a·dt`` (clamped ≥ 0):
154+ * longitudinal acceleration ``a`` — two-sided bound ``[min, max]``
155+ * lateral acceleration ``v² κ`` — ``|·|`` bound
156+ * yaw rate ``v κ`` — ``|·|`` bound
157+ * yaw acceleration ``Δ(v κ)/dt`` — ``|·|`` bound
158+ * longitudinal jerk ``Δa/dt`` — ``|·|`` bound
159+ * jerk magnitude ``√(lon_jerk² + lat_jerk²)`` — bound (this is the
160+ 8.37 m/s³ threshold; *not* lateral jerk)
161+
162+ Reports the batch-mean of each per-sample peak, a per-metric violation rate,
163+ and the overall ``comfort_violation_rate`` (fraction of samples exceeding ANY
164+ bound).
165+
166+ Args:
167+ pred_accel, pred_curv: ``(B, T)`` predicted action signals.
168+ initial_speed: ``(B,)`` speed at the prediction start.
169+ """
170+ accel = np .asarray (pred_accel , dtype = np .float64 ) # (B, T)
171+ curv = np .asarray (pred_curv , dtype = np .float64 ) # (B, T)
172+ v0 = np .asarray (initial_speed , dtype = np .float64 )[:, None ]
173+
174+ v = np .clip (v0 + np .cumsum (accel , axis = 1 ) * dt , 0.0 , None ) # (B, T)
175+ lat_accel = v ** 2 * curv # (B, T)
176+ yaw_rate = v * curv # (B, T)
177+ lon_jerk = np .diff (accel , axis = 1 ) / dt # (B, T-1)
178+ lat_jerk = np .diff (lat_accel , axis = 1 ) / dt # (B, T-1)
179+ yaw_accel = np .diff (yaw_rate , axis = 1 ) / dt # (B, T-1)
180+ mag_jerk = np .hypot (lon_jerk , lat_jerk ) # (B, T-1)
181+
182+ out : dict [str , float ] = {}
183+ violated = np .zeros (accel .shape [0 ], dtype = bool )
184+
185+ # Longitudinal acceleration: asymmetric two-sided bound.
186+ lon_max , lon_min = accel .max (axis = 1 ), accel .min (axis = 1 )
187+ out ["max_lon_accel" ] = float (lon_max .mean ())
188+ out ["min_lon_accel" ] = float (lon_min .mean ())
189+ lon_exceed = (lon_max > thresholds ["lon_accel_max" ]) | (lon_min < thresholds ["lon_accel_min" ])
190+ out ["lon_accel_violation_rate" ] = float (lon_exceed .mean ())
191+ violated |= lon_exceed
192+
193+ # Magnitude-bounded quantities.
194+ abs_peaks = {
195+ "lat_accel" : np .abs (lat_accel ).max (axis = 1 ),
196+ "yaw_rate" : np .abs (yaw_rate ).max (axis = 1 ),
197+ "yaw_accel" : np .abs (yaw_accel ).max (axis = 1 ),
198+ "lon_jerk" : np .abs (lon_jerk ).max (axis = 1 ),
199+ "mag_jerk" : mag_jerk .max (axis = 1 ),
200+ }
201+ for name , peak in abs_peaks .items ():
202+ out [f"max_{ name } " ] = float (peak .mean ())
203+ exceed = peak > thresholds [name ]
204+ out [f"{ name } _violation_rate" ] = float (exceed .mean ())
205+ violated |= exceed
206+
207+ out ["comfort_violation_rate" ] = float (violated .mean ())
208+ return out
209+
210+
211+ def _erode_drivable (mask : np .ndarray , iterations : int ) -> np .ndarray :
212+ """Shrink the drivable area by ``iterations`` pixels (4-neighbour erosion).
213+
214+ A cell stays drivable only if it and its 4 neighbours are drivable (cells
215+ outside the grid count as non-drivable), so after ``k`` iterations any cell
216+ within Manhattan distance ``k`` of the boundary is removed. Pure-numpy, no
217+ scipy. Used to require a safety margin from the road edge.
218+ """
219+ eroded = np .asarray (mask , dtype = bool )
220+ for _ in range (max (0 , int (iterations ))):
221+ nb = eroded .copy ()
222+ nb [1 :, :] &= eroded [:- 1 , :] # up neighbour
223+ nb [:- 1 , :] &= eroded [1 :, :] # down neighbour
224+ nb [:, 1 :] &= eroded [:, :- 1 ] # left neighbour
225+ nb [:, :- 1 ] &= eroded [:, 1 :] # right neighbour
226+ nb [0 , :] = nb [- 1 , :] = nb [:, 0 ] = nb [:, - 1 ] = False # border = off-road
227+ eroded = nb
228+ return eroded
229+
230+
231+ def offroad_rate (
232+ positions : np .ndarray ,
233+ drivable_mask : np .ndarray ,
234+ meters_per_pixel : float ,
235+ center_px : tuple [int , int ] | None = None ,
236+ headings : np .ndarray | None = None ,
237+ ego_size : tuple [float , float ] | None = None ,
238+ dilation_px : int = 0 ,
239+ ) -> float :
240+ """Off-road proxy for collision rate when agents are unlabelled (#66 §2).
241+
242+ L2D has no other-agent annotations, so we use the BEV drivable mask: a
243+ trajectory is off-road if it leaves the drivable area. By default this checks
244+ the trajectory **centre** point (lightweight). For drivable-area *compliance*
245+ the ego footprint matters — a corner can leave the road while the centre stays
246+ inside — so pass ``ego_size`` to check the four footprint corners, and/or
247+ ``dilation_px`` to require a safety margin from the boundary.
248+
249+ Args:
250+ positions: ``(B, T, 2)`` integrated ``(x_forward, y_left)`` in metres.
251+ drivable_mask: ``(H, W)`` boolean BEV; True = drivable.
252+ meters_per_pixel: BEV resolution.
253+ center_px: ego pixel ``(row, col)``; defaults to the grid centre.
254+ Convention (matches the repo's BEV rendering): forward +x → up
255+ (decreasing row), left +y → left (decreasing col).
256+ headings: optional ``(B, T)`` heading per pose (rad) to orient the
257+ footprint. If ``None`` and ``ego_size`` is given, heading is taken
258+ from the finite-difference travel direction.
259+ ego_size: optional ``(length, width)`` in metres. When given, the four
260+ footprint corners are checked instead of only the centre.
261+ dilation_px: erode the drivable mask by this many pixels first (require a
262+ margin from the boundary). 0 = off.
263+
264+ Returns:
265+ Fraction of trajectories that leave the drivable area.
266+ """
267+ mask = _erode_drivable (drivable_mask , dilation_px ) if dilation_px > 0 \
268+ else np .asarray (drivable_mask , dtype = bool )
269+ H , W = mask .shape
270+ cr , cc = center_px if center_px is not None else (H // 2 , W // 2 )
271+ pos = np .asarray (positions , dtype = np .float64 )
272+ B , T , _ = pos .shape
273+
274+ if ego_size is None :
275+ query = pos [:, :, None , :] # (B, T, 1, 2)
276+ else :
277+ length , width = ego_size
278+ corners = np .array ([ # ego frame
279+ [length / 2 , width / 2 ], [length / 2 , - width / 2 ],
280+ [- length / 2 , width / 2 ], [- length / 2 , - width / 2 ],
281+ ]) # (4, 2)
282+ if headings is not None :
283+ theta = np .asarray (headings , dtype = np .float64 )
284+ elif T >= 2 :
285+ d = np .diff (pos , axis = 1 )
286+ d = np .concatenate ([d [:, :1 , :], d ], axis = 1 ) # (B, T, 2)
287+ theta = np .arctan2 (d [..., 1 ], d [..., 0 ]) # (B, T)
288+ else :
289+ theta = np .zeros ((B , T ))
290+ cos , sin = np .cos (theta ), np .sin (theta ) # (B, T)
291+ cx , cy = corners [:, 0 ], corners [:, 1 ] # (4,)
292+ qx = pos [..., 0 :1 ] + cos [..., None ] * cx - sin [..., None ] * cy # (B, T, 4)
293+ qy = pos [..., 1 :2 ] + sin [..., None ] * cx + cos [..., None ] * cy # (B, T, 4)
294+ query = np .stack ([qx , qy ], axis = - 1 ) # (B, T, 4, 2)
295+
296+ offroad = 0
297+ for i in range (B ):
298+ rows = np .round (cr - query [i , ..., 0 ] / meters_per_pixel ).astype (int )
299+ cols = np .round (cc - query [i , ..., 1 ] / meters_per_pixel ).astype (int )
300+ inside = (rows >= 0 ) & (rows < H ) & (cols >= 0 ) & (cols < W )
301+ on_road = inside .copy () # OOB = off-road
302+ on_road [inside ] = mask [rows [inside ], cols [inside ]]
303+ if not on_road .all ():
304+ offroad += 1
305+ return offroad / max (B , 1 )
0 commit comments