Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ ignore = [
"src/endpoints/metrics/drift/jensen_shannon.py" = ["TRY300", "TRY301"]
"src/endpoints/metrics/drift/compare_means.py" = ["C901", "TRY301"]
# === TEST FILES ===
"tests/**" = ["S101", "PT019", "SLF001"]
"tests/**" = ["S101", "PT019", "SLF001"] # Common test-specific ignores
"tests/core/metrics/test_fairness.py" = ["N803", "N806"]
"tests/endpoints/metrics/drift/factory.py" = ["PLR0913", "C901", "PLR0915"]
"tests/endpoints/test_upload_endpoint_maria.py" = ["S105"]
"tests/endpoints/test_upload_endpoint_pvc.py" = ["PLR0913"]
"tests/service/data/test_utils.py" = ["PLR0913", "UP037"] # UP037: Keep quoted annotations for optional protobuf imports
"tests/service/serialization/test_rows.py" = ["PLR2004"]
"tests/service/test_health_checks.py" = ["S106", "ANN001", "PLR2004", "FBT003"] # Health check test-specific ignores

[tool.pytest.ini_options]
asyncio_mode = "strict"
Expand Down
72 changes: 61 additions & 11 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@

# Middleware
from src.middleware.gzip_middleware import GzipRequestMiddleware
from src.service.health_checks import (
STATUS_OK,
perform_liveness_checks,
perform_readiness_checks,
)
from src.service.prometheus.shared_prometheus_scheduler import (
get_shared_prometheus_scheduler,
)
Expand Down Expand Up @@ -195,6 +200,34 @@ async def root() -> dict[str, str]:
return {"message": "Welcome to TrustyAI Explainability Service"}


@app.get("/q/health")
async def general_health() -> JSONResponse:
"""General health endpoint (optional).

Combines readiness and liveness checks for comprehensive health status.
Useful for debugging and manual health checks.

:return: JSON response with status ("healthy" or "unhealthy")
HTTP 200 if healthy, HTTP 503 if unhealthy
"""
readiness_status, readiness_checks = perform_readiness_checks()
liveness_status, liveness_checks = perform_liveness_checks()

# Overall status is healthy only if both readiness and liveness pass
is_healthy = readiness_status == STATUS_OK and liveness_status == STATUS_OK

response_body = {
"status": "healthy" if is_healthy else "unhealthy",
"checks": {
"readiness": readiness_checks,
"liveness": liveness_checks,
},
}

status_code = HTTPStatus.OK if is_healthy else HTTPStatus.SERVICE_UNAVAILABLE
return JSONResponse(content=response_body, status_code=status_code)
Comment on lines +203 to +228

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Health endpoint payloads diverge from the declared Quarkus contract.

Lines 192-217, 231-244, and 248-263 return custom status strings (healthy/ready/alive) and mixed keys (details vs checks) instead of a consistent UP|DOWN + checks format. This is an API contract break for consumers expecting SmallRye-compatible health payloads.

Also applies to: 231-244, 248-263

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main.py` around lines 192 - 217, The health endpoints (general_health and
the readiness/liveness handlers that call perform_readiness_checks and
perform_liveness_checks) currently return custom status strings
("healthy"/"unhealthy"/"ready"/"alive") and inconsistent keys
("checks"/"details"), breaking the SmallRye/Quarkus contract; update the
response bodies to use a consistent top-level "status" with values "UP" or
"DOWN" (map STATUS_OK -> "UP", otherwise "DOWN") and ensure the same key name
("checks") is used across all endpoints, preserving the existing per-check
payloads from readiness_checks/liveness_checks; adjust status_code logic to
remain HTTP 200 for "UP" and HTTP 503 for "DOWN" and update JSONResponse
construction in general_health and the readiness/liveness endpoint functions
accordingly.



@app.get("/q/metrics")
async def metrics(_request: Request) -> Response:
"""Prometheus metrics endpoint.
Expand All @@ -210,19 +243,35 @@ async def metrics(_request: Request) -> Response:
async def readiness_probe() -> JSONResponse:
"""Kubernetes readiness probe endpoint.

:return: JSON response indicating service is ready
:return: JSON response with status ("ready" or "not_ready")
HTTP 200 if ready, HTTP 503 if not ready
"""
return JSONResponse(content={"status": "ready"}, status_code=HTTPStatus.OK)
status, checks = perform_readiness_checks()
is_ready = status == STATUS_OK

response_body = {"status": "ready" if is_ready else "not_ready", "checks": checks}

status_code = HTTPStatus.OK if is_ready else HTTPStatus.SERVICE_UNAVAILABLE
return JSONResponse(content=response_body, status_code=status_code)


# Liveness probe endpoint
@app.get("/q/health/live")
async def liveness_probe() -> JSONResponse:
"""Kubernetes liveness probe endpoint.

:return: JSON response indicating service is alive
Lightweight check - if we can respond, we're alive.

:return: JSON response with status ("alive")
HTTP 200 if alive
"""
return JSONResponse(content={"status": "live"}, status_code=HTTPStatus.OK)
status, checks = perform_liveness_checks()
is_alive = status == STATUS_OK

response_body = {"status": "alive" if is_alive else "dead", "checks": checks}

status_code = HTTPStatus.OK if is_alive else HTTPStatus.SERVICE_UNAVAILABLE
return JSONResponse(content=response_body, status_code=status_code)


def get_tls_config() -> dict[str, Any] | None:
Expand Down Expand Up @@ -255,31 +304,32 @@ async def run_server() -> None:

# Configure server settings
host_https = "0.0.0.0" # noqa: S104 # intentional: Kubernetes service binding
host_http = (
"127.0.0.1" # Keep loopback-only for security (kube-rbac-proxy forwards here)
)
host_http = "0.0.0.0" # noqa: S104 # intentional: Kubernetes health probes
http_port = int(os.getenv("HTTP_PORT", "8080"))
ssl_port = int(os.getenv("SSL_PORT", "4443"))

# Create hypercorn config
config = Config()

# HTTP for kube-rbac-proxy (plain HTTP on insecure_bind)
config.insecure_bind = [f"{host_http}:{http_port}"]
logger.info("Binding HTTP on %s:%s for kube-rbac-proxy", host_http, http_port)

# Configure for HTTP/1.1 compatibility and proper keep-alive
config.h11_max_incomplete_size = 16 * 1024 * 1024 # 16MB for large requests
config.keep_alive_timeout = float(os.getenv("KEEP_ALIVE", "75"))

# Optional HTTPS (direct access on bind)
if tls_config:
# HTTPS on bind (external access)
config.bind = [f"{host_https}:{ssl_port}"]
config.certfile = tls_config["ssl_certfile"]
config.keyfile = tls_config["ssl_keyfile"]
# HTTP on insecure_bind (health probes and kube-rbac-proxy)
config.insecure_bind = [f"{host_http}:{http_port}"]
logger.info("Binding HTTPS on %s:%s for direct access", host_https, ssl_port)
logger.info("Binding HTTP on %s:%s for health probes", host_http, http_port)
logger.info("TrustyAI service running with dual HTTP/HTTPS protocol support")
else:
# HTTP only on bind (no TLS available)
config.bind = [f"{host_http}:{http_port}"]
logger.info("Binding HTTP on %s:%s for health probes", host_http, http_port)
logger.info("TLS certificates not found - running HTTP only")

# Configure logging
Expand Down
92 changes: 56 additions & 36 deletions src/service/data/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,54 @@ def get_global_storage_interface(
return GlobalStorageInterface.get(force_reload=force_reload)


class MariaDBConfig:
"""MariaDB connection configuration read from environment variables.

Supports both operator (Quarkus) and direct deployment env vars.
"""

def __init__(self) -> None:
"""Read MariaDB connection parameters from environment variables."""
self.user = os.environ.get("DATABASE_USERNAME") or os.environ.get(
"QUARKUS_DATASOURCE_USERNAME"
)
self.password = os.environ.get("DATABASE_PASSWORD") or os.environ.get(
"QUARKUS_DATASOURCE_PASSWORD"
)
self.host = os.environ.get("DATABASE_HOST") or os.environ.get(
"DATABASE_SERVICE"
)
self.database = os.environ.get("DATABASE_DATABASE") or os.environ.get(
"DATABASE_NAME"
)
port_str = os.environ.get("DATABASE_PORT", "3306")
try:
self.port = int(port_str)
except ValueError as e:
msg = f"Invalid DATABASE_PORT value '{port_str}': must be a valid integer"
raise ValueError(msg) from e

ssl_ca_path = os.environ.get("DATABASE_TLS_CA_CERT", "/etc/tls/db/ca.crt")
self.ssl_ca = ssl_ca_path if Path(ssl_ca_path).exists() else None

def validate(self) -> None:
"""Raise ValueError if required env vars are missing."""
missing = []
if not self.user:
missing.append("DATABASE_USERNAME or QUARKUS_DATASOURCE_USERNAME")
if not self.password:
missing.append("DATABASE_PASSWORD or QUARKUS_DATASOURCE_PASSWORD")
if not self.host:
missing.append("DATABASE_HOST or DATABASE_SERVICE")
if not self.database:
missing.append("DATABASE_DATABASE or DATABASE_NAME")
if missing:
msg = (
f"MariaDB storage requires environment variables: {', '.join(missing)}"
)
raise ValueError(msg)


def get_storage_interface() -> MariaDBStorage | PVCStorage:
"""Create a new storage interface based on environment configuration.

Expand All @@ -64,47 +112,19 @@ def get_storage_interface() -> MariaDBStorage | PVCStorage:
MariaDBStorage,
)

# Parse DATABASE_ATTEMPT_MIGRATION with tolerance for boolean strings
migration_str = os.environ.get("DATABASE_ATTEMPT_MIGRATION", "0").lower()
attempt_migration = migration_str in ("1", "true", "yes", "on")

# Support both operator env vars and direct deployment env vars
# Operator (Quarkus-based): QUARKUS_DATASOURCE_USERNAME/PASSWORD, DATABASE_SERVICE/NAME
# Direct deployment: DATABASE_USERNAME/PASSWORD, DATABASE_HOST/DATABASE
user = os.environ.get("DATABASE_USERNAME") or os.environ.get(
"QUARKUS_DATASOURCE_USERNAME"
)
password = os.environ.get("DATABASE_PASSWORD") or os.environ.get(
"QUARKUS_DATASOURCE_PASSWORD"
)
host = os.environ.get("DATABASE_HOST") or os.environ.get("DATABASE_SERVICE")
database = os.environ.get("DATABASE_DATABASE") or os.environ.get(
"DATABASE_NAME"
)

# Validate required parameters before constructing MariaDBStorage
missing = []
if not user:
missing.append("DATABASE_USERNAME or QUARKUS_DATASOURCE_USERNAME")
if not password:
missing.append("DATABASE_PASSWORD or QUARKUS_DATASOURCE_PASSWORD")
if not host:
missing.append("DATABASE_HOST or DATABASE_SERVICE")
if not database:
missing.append("DATABASE_DATABASE or DATABASE_NAME")
if missing:
msg = f"MariaDB storage requires environment variables: {', '.join(missing)}"
raise ValueError(msg)

ssl_ca = os.environ.get("DATABASE_TLS_CA_CERT", "/etc/tls/db/ca.crt")
config = MariaDBConfig()
config.validate()

return MariaDBStorage(
user=user,
password=password,
host=host,
port=int(os.environ.get("DATABASE_PORT", "3306")),
database=database,
ssl_ca=ssl_ca if Path(ssl_ca).exists() else None,
user=config.user,
password=config.password,
host=config.host,
port=config.port,
database=config.database,
ssl_ca=config.ssl_ca,
attempt_migration=attempt_migration,
)
except ImportError as e:
Expand Down
5 changes: 5 additions & 0 deletions src/service/data/storage/maria/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(
port: int,
database: str | None,
ssl_ca: str | None = None,
connect_timeout: int | None = None,
) -> None:
"""Initialize connection manager with database credentials.

Expand All @@ -51,13 +52,15 @@ def __init__(
:param port: Database port
:param database: Database name
:param ssl_ca: Path to CA certificate for TLS connection
:param connect_timeout: Connection timeout in seconds (None = driver default)
"""
self.user = user
self.password = password
self.host = host
self.port = port
self.database = database
self.ssl_ca = ssl_ca
self.connect_timeout = connect_timeout

def __enter__(self) -> tuple[mariadb.Connection, mariadb.Cursor]:
"""Enter context manager and establish database connection."""
Expand All @@ -71,6 +74,8 @@ def __enter__(self) -> tuple[mariadb.Connection, mariadb.Cursor]:
if self.ssl_ca:
connect_kwargs["ssl_ca"] = self.ssl_ca
connect_kwargs["ssl_verify_cert"] = True
if self.connect_timeout is not None:
connect_kwargs["connect_timeout"] = self.connect_timeout
self.conn = mariadb.connect(**connect_kwargs)
return self.conn, self.conn.cursor()

Expand Down
Loading
Loading