From f962c5940ca1dca4ea12650a2bb8de12977e33be Mon Sep 17 00:00:00 2001 From: Sudip Sinha Date: Tue, 16 Jun 2026 12:38:25 +0100 Subject: [PATCH 1/4] feat(scheduler): register drift metric calculators with MetricsDirectory The Prometheus scheduler found scheduled drift metric requests but had no calculator to compute them, logging "No calculator found for metric CompareMeans" every 5 seconds indefinitely. Add calculator functions for CompareMeans, KSTest, and JensenShannon that fetch reference data via get_dataframe_by_tag and return MetricValueCarrier with per-feature values. Make _calculate_metric async to support async calculators. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Sudip Sinha --- src/endpoints/metrics/drift/compare_means.py | 46 ++++++ src/endpoints/metrics/drift/jensen_shannon.py | 50 +++++++ .../metrics/drift/kolmogorov_smirnov.py | 40 ++++++ .../prometheus/prometheus_scheduler.py | 12 +- .../metrics/drift/test_drift_calculators.py | 131 ++++++++++++++++++ 5 files changed, 275 insertions(+), 4 deletions(-) create mode 100644 tests/endpoints/metrics/drift/test_drift_calculators.py diff --git a/src/endpoints/metrics/drift/compare_means.py b/src/endpoints/metrics/drift/compare_means.py index 2b9a7e9a..ab888914 100644 --- a/src/endpoints/metrics/drift/compare_means.py +++ b/src/endpoints/metrics/drift/compare_means.py @@ -5,6 +5,7 @@ from http import HTTPStatus from typing import Any +import pandas as pd from fastapi import APIRouter, HTTPException from pydantic import BaseModel, ConfigDict, Field, model_validator @@ -18,6 +19,7 @@ from src.service.data.datasources.data_source import DataSource from src.service.data.shared_data_source import get_shared_data_source from src.service.payloads.metrics.base_metric_request import BaseMetricRequest +from src.service.prometheus.metric_value_carrier import MetricValueCarrier from src.service.prometheus.prometheus_scheduler import PrometheusScheduler from src.service.prometheus.shared_prometheus_scheduler import ( get_shared_prometheus_scheduler, @@ -483,3 +485,47 @@ async def list_meanshift_requests() -> dict[str, list[dict[str, Any]]]: """ log_deprecated_endpoint(logger, DEPRECATED_METRIC_NAME, METRIC_NAME) return await list_compare_means_requests() + + +async def calculate_compare_means_metric( + batch: pd.DataFrame, + request: BaseMetricRequest, +) -> MetricValueCarrier: + """Calculate CompareMeans metric for the Prometheus scheduler.""" + data_source = get_data_source() + reference_df = await data_source.get_dataframe_by_tag( + request.model_id, request.reference_tag + ) + fit_columns = request.fit_columns or list(batch.columns) + alpha = getattr(request, "alpha", DEFAULT_ALPHA) + equal_var = getattr(request, "equal_var", DEFAULT_EQUAL_VAR) + nan_policy = getattr(request, "nan_policy", DEFAULT_NAN_POLICY) + + named_values = {} + for feature_name in fit_columns: + if feature_name in reference_df.columns and feature_name in batch.columns: + result = CompareMeans.ttest_ind( + reference_data=reference_df[feature_name].to_numpy(), + current_data=batch[feature_name].to_numpy(), + alpha=alpha, + equal_var=equal_var, + nan_policy=nan_policy, + ) + named_values[feature_name] = result["statistic"] + return MetricValueCarrier(named_values or 0.0) + + +def _register_compare_means_calculator() -> None: + """Register the CompareMeans calculator with the metrics directory.""" + scheduler = get_prometheus_scheduler() + if scheduler and scheduler.metrics_directory: + scheduler.metrics_directory.register( + METRIC_NAME, calculate_compare_means_metric + ) + logger.info("%s calculator registered with metrics directory", METRIC_NAME) + + +try: + _register_compare_means_calculator() +except (AttributeError, TypeError) as e: + logger.warning("Could not register %s calculator on import: %s", METRIC_NAME, e) diff --git a/src/endpoints/metrics/drift/jensen_shannon.py b/src/endpoints/metrics/drift/jensen_shannon.py index 36cef0f2..17e950c5 100644 --- a/src/endpoints/metrics/drift/jensen_shannon.py +++ b/src/endpoints/metrics/drift/jensen_shannon.py @@ -5,6 +5,7 @@ from http import HTTPStatus from typing import Any, Literal, cast +import pandas as pd from fastapi import APIRouter, HTTPException from pydantic import BaseModel, ConfigDict, Field, model_validator @@ -18,6 +19,7 @@ ) from src.service.data.shared_data_source import DataSource, get_shared_data_source from src.service.payloads.metrics.base_metric_request import BaseMetricRequest +from src.service.prometheus.metric_value_carrier import MetricValueCarrier from src.service.prometheus.prometheus_scheduler import PrometheusScheduler from src.service.prometheus.shared_prometheus_scheduler import ( get_shared_prometheus_scheduler, @@ -374,3 +376,51 @@ async def list_jensenshannon_requests() -> dict[str, list[dict[str, Any]]]: ) from e else: return {"requests": requests_list} + + +async def calculate_jensenshannon_metric( + batch: pd.DataFrame, + request: BaseMetricRequest, +) -> MetricValueCarrier: + """Calculate JensenShannon metric for the Prometheus scheduler.""" + data_source = get_data_source() + reference_df = await data_source.get_dataframe_by_tag( + request.model_id, request.reference_tag + ) + fit_columns = request.fit_columns or list(batch.columns) + statistic = getattr(request, "statistic", DEFAULT_STATISTIC) + threshold = getattr(request, "threshold", DEFAULT_THRESHOLD) + method = getattr(request, "method", DEFAULT_METHOD) + grid_points = getattr(request, "grid_points", DEFAULT_GRID_POINTS) + bins = getattr(request, "bins", DEFAULT_BINS) + + named_values = {} + for feature_name in fit_columns: + if feature_name in reference_df.columns and feature_name in batch.columns: + result = JensenShannon.jensenshannon( + data_ref=reference_df[feature_name].to_numpy(), + data_cur=batch[feature_name].to_numpy(), + statistic=cast("Literal['distance', 'divergence']", statistic), + threshold=threshold, + method=cast("Literal['kde', 'hist']", method), + grid_points=grid_points, + bins=bins, + ) + named_values[feature_name] = result["Jensen-Shannon_distance"] + return MetricValueCarrier(named_values or 0.0) + + +def _register_jensenshannon_calculator() -> None: + """Register the JensenShannon calculator with the metrics directory.""" + scheduler = get_prometheus_scheduler() + if scheduler and scheduler.metrics_directory: + scheduler.metrics_directory.register( + METRIC_NAME, calculate_jensenshannon_metric + ) + logger.info("%s calculator registered with metrics directory", METRIC_NAME) + + +try: + _register_jensenshannon_calculator() +except (AttributeError, TypeError) as e: + logger.warning("Could not register %s calculator on import: %s", METRIC_NAME, e) diff --git a/src/endpoints/metrics/drift/kolmogorov_smirnov.py b/src/endpoints/metrics/drift/kolmogorov_smirnov.py index 7875fa02..0bb9aa7d 100644 --- a/src/endpoints/metrics/drift/kolmogorov_smirnov.py +++ b/src/endpoints/metrics/drift/kolmogorov_smirnov.py @@ -5,6 +5,7 @@ from http import HTTPStatus from typing import Any +import pandas as pd from fastapi import APIRouter, HTTPException from pydantic import BaseModel, ConfigDict, Field @@ -12,6 +13,7 @@ from src.service.data.datasources.data_source import DataSource from src.service.data.shared_data_source import get_shared_data_source from src.service.payloads.metrics.base_metric_request import BaseMetricRequest +from src.service.prometheus.metric_value_carrier import MetricValueCarrier from src.service.prometheus.prometheus_scheduler import PrometheusScheduler from src.service.prometheus.shared_prometheus_scheduler import ( get_shared_prometheus_scheduler, @@ -319,3 +321,41 @@ async def list_kstest_requests() -> dict[str, list[dict[str, Any]]]: ) from e else: return {"requests": requests_list} + + +async def calculate_kstest_metric( + batch: pd.DataFrame, + request: BaseMetricRequest, +) -> MetricValueCarrier: + """Calculate KSTest metric for the Prometheus scheduler.""" + data_source = get_data_source() + reference_df = await data_source.get_dataframe_by_tag( + request.model_id, request.reference_tag + ) + fit_columns = request.fit_columns or list(batch.columns) + alpha = getattr(request, "threshold_delta", 0.05) + + named_values = {} + for feature_name in fit_columns: + if feature_name in reference_df.columns and feature_name in batch.columns: + result = KolmogorovSmirnov.kstest( + reference_data=reference_df[feature_name].to_numpy(), + current_data=batch[feature_name].to_numpy(), + alpha=alpha, + ) + named_values[feature_name] = result["statistic"] + return MetricValueCarrier(named_values or 0.0) + + +def _register_kstest_calculator() -> None: + """Register the KSTest calculator with the metrics directory.""" + scheduler = get_prometheus_scheduler() + if scheduler and scheduler.metrics_directory: + scheduler.metrics_directory.register(METRIC_NAME, calculate_kstest_metric) + logger.info("%s calculator registered with metrics directory", METRIC_NAME) + + +try: + _register_kstest_calculator() +except (AttributeError, TypeError) as e: + logger.warning("Could not register %s calculator on import: %s", METRIC_NAME, e) diff --git a/src/service/prometheus/prometheus_scheduler.py b/src/service/prometheus/prometheus_scheduler.py index dbba2760..de850897 100644 --- a/src/service/prometheus/prometheus_scheduler.py +++ b/src/service/prometheus/prometheus_scheduler.py @@ -1,5 +1,6 @@ """Prometheus scheduler for periodic metric calculation and publishing.""" +import asyncio import logging import os import threading @@ -608,7 +609,7 @@ async def _process_single_request( batch = df.tail(batch_size) metric_name = request.metric_name - value = self._calculate_metric( + value = await self._calculate_metric( model_id, metric_name, batch, request, throw_errors=throw_errors ) if value is None: @@ -625,7 +626,7 @@ async def _process_single_request( throw_errors=throw_errors, ) - def _calculate_metric( + async def _calculate_metric( self, model_id: str, metric_name: str, @@ -653,8 +654,9 @@ def _calculate_metric( return None try: - return calculator(batch, request) - + result = calculator(batch, request) + if asyncio.iscoroutine(result): + result = await result except KeyError as e: self._handle_error( e, @@ -685,6 +687,8 @@ def _calculate_metric( throw_errors=throw_errors, ) return None + else: + return result def _publish_metric_value( self, diff --git a/tests/endpoints/metrics/drift/test_drift_calculators.py b/tests/endpoints/metrics/drift/test_drift_calculators.py new file mode 100644 index 00000000..e9370525 --- /dev/null +++ b/tests/endpoints/metrics/drift/test_drift_calculators.py @@ -0,0 +1,131 @@ +"""Tests for drift metric calculator functions used by the Prometheus scheduler.""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import numpy as np +import pandas as pd +import pytest + +from src.endpoints.metrics.drift.compare_means import ( + calculate_compare_means_metric, +) +from src.endpoints.metrics.drift.jensen_shannon import ( + calculate_jensenshannon_metric, +) +from src.endpoints.metrics.drift.kolmogorov_smirnov import ( + calculate_kstest_metric, +) + + +def _make_request( + model_id: str = "test-model", + reference_tag: str = "TRAINING", + fit_columns: list[str] | None = None, +) -> MagicMock: + """Create a mock metric request.""" + request = MagicMock() + request.model_id = model_id + request.reference_tag = reference_tag + request.fit_columns = fit_columns or [] + request.alpha = 0.05 + request.equal_var = True + request.nan_policy = "omit" + request.threshold_delta = 0.05 + request.statistic = "distance" + request.threshold = 0.1 + request.method = "kde" + request.grid_points = 100 + request.bins = 10 + return request + + +def _make_dataframe(n_rows: int = 50) -> pd.DataFrame: + rng = np.random.default_rng(42) + return pd.DataFrame( + { + "feature1": rng.normal(0, 1, n_rows), + "feature2": rng.normal(5, 2, n_rows), + } + ) + + +class TestCompareMeansCalculator: + """Tests for calculate_compare_means_metric.""" + + @patch("src.endpoints.metrics.drift.compare_means.get_data_source") + @pytest.mark.asyncio + async def test_returns_named_values(self, mock_get_ds: MagicMock) -> None: + """Calculator returns MetricValueCarrier with per-feature statistics.""" + ref_df = _make_dataframe() + cur_df = _make_dataframe() + mock_ds = MagicMock() + mock_ds.get_dataframe_by_tag = AsyncMock(return_value=ref_df) + mock_get_ds.return_value = mock_ds + + request = _make_request(fit_columns=["feature1", "feature2"]) + result = await calculate_compare_means_metric(cur_df, request) + + assert not result.is_single() + named = result.get_named_values() + assert "feature1" in named + assert "feature2" in named + assert isinstance(named["feature1"], float) + + @patch("src.endpoints.metrics.drift.compare_means.get_data_source") + @pytest.mark.asyncio + async def test_derives_columns_from_batch(self, mock_get_ds: MagicMock) -> None: + """When fit_columns is empty, uses batch columns.""" + ref_df = _make_dataframe() + cur_df = _make_dataframe() + mock_ds = MagicMock() + mock_ds.get_dataframe_by_tag = AsyncMock(return_value=ref_df) + mock_get_ds.return_value = mock_ds + + request = _make_request(fit_columns=[]) + result = await calculate_compare_means_metric(cur_df, request) + + named = result.get_named_values() + assert set(named.keys()) == {"feature1", "feature2"} + + +class TestKSTestCalculator: + """Tests for calculate_kstest_metric.""" + + @patch("src.endpoints.metrics.drift.kolmogorov_smirnov.get_data_source") + @pytest.mark.asyncio + async def test_returns_named_values(self, mock_get_ds: MagicMock) -> None: + """Calculator returns per-feature KS statistics.""" + ref_df = _make_dataframe() + cur_df = _make_dataframe() + mock_ds = MagicMock() + mock_ds.get_dataframe_by_tag = AsyncMock(return_value=ref_df) + mock_get_ds.return_value = mock_ds + + request = _make_request(fit_columns=["feature1"]) + result = await calculate_kstest_metric(cur_df, request) + + assert not result.is_single() + named = result.get_named_values() + assert "feature1" in named + + +class TestJensenShannonCalculator: + """Tests for calculate_jensenshannon_metric.""" + + @patch("src.endpoints.metrics.drift.jensen_shannon.get_data_source") + @pytest.mark.asyncio + async def test_returns_named_values(self, mock_get_ds: MagicMock) -> None: + """Calculator returns per-feature JS distance values.""" + ref_df = _make_dataframe() + cur_df = _make_dataframe() + mock_ds = MagicMock() + mock_ds.get_dataframe_by_tag = AsyncMock(return_value=ref_df) + mock_get_ds.return_value = mock_ds + + request = _make_request(fit_columns=["feature1"]) + result = await calculate_jensenshannon_metric(cur_df, request) + + assert not result.is_single() + named = result.get_named_values() + assert "feature1" in named + assert named["feature1"] >= 0 From 8c4877bd282eee11d6bab75c953b50dc61c715db Mon Sep 17 00:00:00 2001 From: Sudip Sinha Date: Tue, 16 Jun 2026 15:32:28 +0100 Subject: [PATCH 2/4] fix(prometheus): uppercase METRIC_NAME constants and preserve deprecated names MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three Prometheus sub-problems: A) Deprecated schedule_meanshift overwrites metric_name to CompareMeans, so the Prometheus gauge is trustyai_comparemeans instead of trustyai_meanshift. Fix: set metric_name to DEPRECATED_METRIC_NAME before delegating, and only set METRIC_NAME if not already set. B) METRIC_NAME constants used mixed case (KSTest, CompareMeans) but Java uses uppercase (KSTEST, COMPAREMEANS). The integration test checks metric_name.upper(), causing case mismatch. Fix: uppercase all METRIC_NAME constants. C) KSTestStreaming calculator is missing — noted for PR #99 branch. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Sudip Sinha --- src/endpoints/metrics/drift/compare_means.py | 8 +++++--- src/endpoints/metrics/drift/jensen_shannon.py | 2 +- src/endpoints/metrics/drift/kolmogorov_smirnov.py | 6 +++--- tests/endpoints/metrics/drift/test_compare_means.py | 4 ++-- tests/endpoints/metrics/drift/test_jensen_shannon.py | 4 ++-- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/endpoints/metrics/drift/compare_means.py b/src/endpoints/metrics/drift/compare_means.py index ab888914..aa33f89f 100644 --- a/src/endpoints/metrics/drift/compare_means.py +++ b/src/endpoints/metrics/drift/compare_means.py @@ -30,8 +30,8 @@ logger = logging.getLogger(__name__) # Metric name constants -METRIC_NAME = "CompareMeans" -DEPRECATED_METRIC_NAME = "Meanshift" # Legacy name for backwards compatibility +METRIC_NAME = "COMPAREMEANS" +DEPRECATED_METRIC_NAME = "MEANSHIFT" # Default parameter values DEFAULT_BATCH_SIZE = 100 @@ -275,7 +275,8 @@ async def schedule_compare_means(request: CompareMeansMetricRequest) -> dict[str logger.info("Scheduling %s computation with ID: %s.", METRIC_NAME, request_id) # Set metric name automatically - request.metric_name = METRIC_NAME + if not request.metric_name: + request.metric_name = METRIC_NAME # Register with the scheduler (this will reconcile the request and store it) await scheduler.register(request.metric_name, request_id, request) @@ -462,6 +463,7 @@ async def schedule_meanshift(request: MeanshiftMetricRequest) -> dict[str, str]: compare_means_request = CompareMeansMetricRequest.model_validate( request.model_dump(exclude_none=True) ) + compare_means_request.metric_name = DEPRECATED_METRIC_NAME return await schedule_compare_means(compare_means_request) diff --git a/src/endpoints/metrics/drift/jensen_shannon.py b/src/endpoints/metrics/drift/jensen_shannon.py index 17e950c5..a3f280cd 100644 --- a/src/endpoints/metrics/drift/jensen_shannon.py +++ b/src/endpoints/metrics/drift/jensen_shannon.py @@ -29,7 +29,7 @@ logger = logging.getLogger(__name__) # Metric name constant -METRIC_NAME = "JensenShannon" +METRIC_NAME = "JENSENSHANNON" def get_prometheus_scheduler() -> PrometheusScheduler: diff --git a/src/endpoints/metrics/drift/kolmogorov_smirnov.py b/src/endpoints/metrics/drift/kolmogorov_smirnov.py index 0bb9aa7d..36b4c7bc 100644 --- a/src/endpoints/metrics/drift/kolmogorov_smirnov.py +++ b/src/endpoints/metrics/drift/kolmogorov_smirnov.py @@ -23,7 +23,7 @@ logger = logging.getLogger(__name__) # Metric name constant -METRIC_NAME = "KSTest" +METRIC_NAME = "KSTEST" def get_prometheus_scheduler() -> PrometheusScheduler: @@ -201,8 +201,8 @@ async def schedule_kstest(request: KSTestMetricRequest) -> dict[str, str]: request_id = uuid.uuid4() logger.info("Scheduling %s computation with ID: %s.", METRIC_NAME, request_id) - # Set metric name automatically - request.metric_name = METRIC_NAME + if not request.metric_name: + request.metric_name = METRIC_NAME # Register with the scheduler (this will reconcile the request and store it) await scheduler.register(request.metric_name, request_id, request) diff --git a/tests/endpoints/metrics/drift/test_compare_means.py b/tests/endpoints/metrics/drift/test_compare_means.py index 7ac3ab5b..86aef657 100644 --- a/tests/endpoints/metrics/drift/test_compare_means.py +++ b/tests/endpoints/metrics/drift/test_compare_means.py @@ -638,13 +638,13 @@ def test_deprecated_schedule_endpoint_with_omitted_optional_fields(self) -> None test_retrieve_default_tags_with_none_metric_name = ( factory.make_retrieve_default_tags_with_none_metric_name_test( request_class=CompareMeansMetricRequest, - expected_metric_name="CompareMeans", + expected_metric_name="COMPAREMEANS", ) ) test_retrieve_default_tags_called_directly_by_prometheus_publisher = ( factory.make_retrieve_default_tags_called_directly_by_prometheus_publisher_test( request_class=CompareMeansMetricRequest, - expected_metric_name="CompareMeans", + expected_metric_name="COMPAREMEANS", ) ) diff --git a/tests/endpoints/metrics/drift/test_jensen_shannon.py b/tests/endpoints/metrics/drift/test_jensen_shannon.py index a86d2901..c53b2bf9 100644 --- a/tests/endpoints/metrics/drift/test_jensen_shannon.py +++ b/tests/endpoints/metrics/drift/test_jensen_shannon.py @@ -347,7 +347,7 @@ class TestJensenShannonEndpoints: "src.endpoints.metrics.drift.jensen_shannon", fromlist=["JensenShannonMetricRequest"], ).JensenShannonMetricRequest, - expected_metric_name="JensenShannon", + expected_metric_name="JENSENSHANNON", ) ) @@ -357,7 +357,7 @@ class TestJensenShannonEndpoints: "src.endpoints.metrics.drift.jensen_shannon", fromlist=["JensenShannonMetricRequest"], ).JensenShannonMetricRequest, - expected_metric_name="JensenShannon", + expected_metric_name="JENSENSHANNON", ) ) From ef7b9a7b978de0c2ad36647f80f17af84d26573c Mon Sep 17 00:00:00 2001 From: Sudip Sinha Date: Tue, 16 Jun 2026 16:12:46 +0100 Subject: [PATCH 3/4] fix(prometheus): deprecated list/delete query by deprecated metric name Deprecated list and delete endpoints delegated to canonical versions that query scheduler by METRIC_NAME, but requests were stored under DEPRECATED_METRIC_NAME. Parameterize list/delete functions to accept metric_name, pass DEPRECATED_METRIC_NAME from deprecated wrappers. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Sudip Sinha --- src/endpoints/metrics/drift/compare_means.py | 19 ++++++++++++------- src/endpoints/metrics/drift/jensen_shannon.py | 12 ++++++++---- .../metrics/drift/kolmogorov_smirnov.py | 12 ++++++++---- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/src/endpoints/metrics/drift/compare_means.py b/src/endpoints/metrics/drift/compare_means.py index aa33f89f..07c5ef01 100644 --- a/src/endpoints/metrics/drift/compare_means.py +++ b/src/endpoints/metrics/drift/compare_means.py @@ -297,7 +297,9 @@ async def schedule_compare_means(request: CompareMeansMetricRequest) -> dict[str @router.delete("/metrics/drift/comparemeans/request") -async def delete_compare_means_schedule(schedule: ScheduleId) -> dict[str, str]: +async def delete_compare_means_schedule( + schedule: ScheduleId, metric_name: str = METRIC_NAME +) -> dict[str, str]: """Delete a recurring computation of CompareMeans metric.""" # Get the scheduler and validate availability scheduler = get_prometheus_scheduler() @@ -319,7 +321,7 @@ async def delete_compare_means_schedule(schedule: ScheduleId) -> dict[str, str]: logger.info("Deleting %s schedule: %s", METRIC_NAME, schedule.requestId) # Delete from scheduler - await scheduler.delete(METRIC_NAME, request_uuid) + await scheduler.delete(metric_name, request_uuid) except HTTPException: raise @@ -342,7 +344,9 @@ async def delete_compare_means_schedule(schedule: ScheduleId) -> dict[str, str]: @router.get("/metrics/drift/comparemeans/requests") -async def list_compare_means_requests() -> dict[str, list[dict[str, Any]]]: +async def list_compare_means_requests( + metric_name: str = METRIC_NAME, +) -> dict[str, list[dict[str, Any]]]: """List the currently scheduled computations of CompareMeans metric.""" # Get the scheduler and validate availability scheduler = get_prometheus_scheduler() @@ -353,8 +357,7 @@ async def list_compare_means_requests() -> dict[str, list[dict[str, Any]]]: ) try: - # Get all requests for CompareMeans - requests = scheduler.get_requests(METRIC_NAME) + requests = scheduler.get_requests(metric_name) # Convert to list format expected by client requests_list = [] @@ -475,7 +478,9 @@ async def delete_meanshift_schedule(schedule: ScheduleId) -> dict[str, str]: /metrics/drift/comparemeans/request instead. """ log_deprecated_endpoint(logger, DEPRECATED_METRIC_NAME, METRIC_NAME) - return await delete_compare_means_schedule(schedule) + return await delete_compare_means_schedule( + schedule, metric_name=DEPRECATED_METRIC_NAME + ) @router.get("/metrics/drift/meanshift/requests", deprecated=True) @@ -486,7 +491,7 @@ async def list_meanshift_requests() -> dict[str, list[dict[str, Any]]]: /metrics/drift/comparemeans/requests instead. """ log_deprecated_endpoint(logger, DEPRECATED_METRIC_NAME, METRIC_NAME) - return await list_compare_means_requests() + return await list_compare_means_requests(metric_name=DEPRECATED_METRIC_NAME) async def calculate_compare_means_metric( diff --git a/src/endpoints/metrics/drift/jensen_shannon.py b/src/endpoints/metrics/drift/jensen_shannon.py index a3f280cd..c18b93a9 100644 --- a/src/endpoints/metrics/drift/jensen_shannon.py +++ b/src/endpoints/metrics/drift/jensen_shannon.py @@ -271,7 +271,9 @@ async def schedule_jensenshannon(request: JensenShannonMetricRequest) -> dict[st @router.delete("/metrics/drift/jensenshannon/request") -async def delete_jensenshannon_schedule(schedule: ScheduleId) -> dict[str, str]: +async def delete_jensenshannon_schedule( + schedule: ScheduleId, metric_name: str = METRIC_NAME +) -> dict[str, str]: """Delete a recurring computation of Jensen-Shannon metric.""" # Get the scheduler and validate availability scheduler = get_prometheus_scheduler() @@ -293,7 +295,7 @@ async def delete_jensenshannon_schedule(schedule: ScheduleId) -> dict[str, str]: logger.info("Deleting %s schedule: %s", METRIC_NAME, schedule.requestId) # Delete from scheduler - await scheduler.delete(METRIC_NAME, request_uuid) + await scheduler.delete(metric_name, request_uuid) except HTTPException: raise @@ -316,7 +318,9 @@ async def delete_jensenshannon_schedule(schedule: ScheduleId) -> dict[str, str]: @router.get("/metrics/drift/jensenshannon/requests") -async def list_jensenshannon_requests() -> dict[str, list[dict[str, Any]]]: +async def list_jensenshannon_requests( + metric_name: str = METRIC_NAME, +) -> dict[str, list[dict[str, Any]]]: """List the currently scheduled computations of Jensen-Shannon metric.""" # Get the scheduler and validate availability scheduler = get_prometheus_scheduler() @@ -328,7 +332,7 @@ async def list_jensenshannon_requests() -> dict[str, list[dict[str, Any]]]: try: # Get all requests for JensenShannon - requests = scheduler.get_requests(METRIC_NAME) + requests = scheduler.get_requests(metric_name) # Convert to list format expected by client requests_list = [] diff --git a/src/endpoints/metrics/drift/kolmogorov_smirnov.py b/src/endpoints/metrics/drift/kolmogorov_smirnov.py index 36b4c7bc..3ca9f14d 100644 --- a/src/endpoints/metrics/drift/kolmogorov_smirnov.py +++ b/src/endpoints/metrics/drift/kolmogorov_smirnov.py @@ -221,7 +221,9 @@ async def schedule_kstest(request: KSTestMetricRequest) -> dict[str, str]: @router.delete("/metrics/drift/kstest/request") -async def delete_kstest_schedule(schedule: ScheduleId) -> dict[str, str]: +async def delete_kstest_schedule( + schedule: ScheduleId, metric_name: str = METRIC_NAME +) -> dict[str, str]: """Delete a recurring computation of KSTest metric.""" # Get the scheduler and validate availability scheduler = get_prometheus_scheduler() @@ -243,7 +245,7 @@ async def delete_kstest_schedule(schedule: ScheduleId) -> dict[str, str]: logger.info("Deleting %s schedule: %s", METRIC_NAME, schedule.requestId) # Delete from scheduler - await scheduler.delete(METRIC_NAME, request_uuid) + await scheduler.delete(metric_name, request_uuid) except HTTPException: raise @@ -266,7 +268,9 @@ async def delete_kstest_schedule(schedule: ScheduleId) -> dict[str, str]: @router.get("/metrics/drift/kstest/requests") -async def list_kstest_requests() -> dict[str, list[dict[str, Any]]]: +async def list_kstest_requests( + metric_name: str = METRIC_NAME, +) -> dict[str, list[dict[str, Any]]]: """List the currently scheduled computations of KSTest metric.""" # Get the scheduler and validate availability scheduler = get_prometheus_scheduler() @@ -278,7 +282,7 @@ async def list_kstest_requests() -> dict[str, list[dict[str, Any]]]: try: # Get all requests for KSTest - requests = scheduler.get_requests(METRIC_NAME) + requests = scheduler.get_requests(metric_name) # Convert to list format expected by client requests_list = [] From 585a9089fc8538ab6a3c6484634812b29aeb8c6c Mon Sep 17 00:00:00 2001 From: Sudip Sinha Date: Tue, 16 Jun 2026 18:36:06 +0100 Subject: [PATCH 4/4] fix(prometheus): register calculator under both canonical and deprecated names Deprecated endpoints store requests under DEPRECATED_METRIC_NAME but the calculator was only registered under METRIC_NAME. The scheduler couldn't find a calculator for "MEANSHIFT" requests. Register under both names so deprecated scheduled metrics compute correctly. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Sudip Sinha --- src/endpoints/metrics/drift/compare_means.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/endpoints/metrics/drift/compare_means.py b/src/endpoints/metrics/drift/compare_means.py index 07c5ef01..7c98606c 100644 --- a/src/endpoints/metrics/drift/compare_means.py +++ b/src/endpoints/metrics/drift/compare_means.py @@ -529,6 +529,9 @@ def _register_compare_means_calculator() -> None: scheduler.metrics_directory.register( METRIC_NAME, calculate_compare_means_metric ) + scheduler.metrics_directory.register( + DEPRECATED_METRIC_NAME, calculate_compare_means_metric + ) logger.info("%s calculator registered with metrics directory", METRIC_NAME)