Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ concurrency:
permissions:
contents: read
pull-requests: write
issues: write

jobs:
golangci-lint:
Expand Down Expand Up @@ -137,6 +138,7 @@ jobs:
permissions:
contents: write
pull-requests: write
issues: write
services:
postgres:
image: pgvector/pgvector:pg16
Expand Down
32 changes: 17 additions & 15 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package main

import (
"context"
"log"
"strings"
"time"

Expand All @@ -23,6 +22,7 @@ import (
"github.com/marco-spagn/pcmi/internal/event"
grpcserver "github.com/marco-spagn/pcmi/internal/grpc"
"github.com/marco-spagn/pcmi/internal/handler"
"github.com/marco-spagn/pcmi/internal/log"
metrics "github.com/marco-spagn/pcmi/internal/metrics"
"github.com/marco-spagn/pcmi/internal/middleware"
"github.com/marco-spagn/pcmi/internal/model"
Expand All @@ -45,31 +45,33 @@ func skipTracePath(c *fiber.Ctx) bool {
}

func main() {
log.Println("🚀 PCMI API " + version.Tag + " starting...")
log.Info("PCMI API starting", "version", version.Tag)

// --- Fail-fast: carica e valida config prima di aprire qualsiasi connessione ---
cfg := config.Load()
if err := cfg.Validate(config.APIRequiredFields...); err != nil {
log.Fatalf("❌ FATAL: %v", err)
log.Fatal("config validation failed", "err", err)
}
addSource := cfg.LogSource == "1" || cfg.LogSource == "true"
log.Configure(cfg.LogFormat, cfg.LogLevel, addSource)
if cfg.EncryptionKey != "" {
if err := pcmicrypto.InitKey(cfg.EncryptionKey); err != nil {
log.Fatalf("❌ FATAL encryption key: %v", err)
log.Fatal("encryption key initialization failed", "err", err)
}
}
log.Printf("✅ Config loaded (DB=%s, Redis=%s, Port=%s)", cfg.DatabaseURL[:min(len(cfg.DatabaseURL), 40)], cfg.RedisAddr, cfg.APIPort)
log.Info("config loaded", "db", log.Mask(cfg.DatabaseURL, 40), "redis", cfg.RedisAddr, "port", cfg.APIPort)
middleware.LogMetricsScrapeAuthState(cfg.MetricsScrapeToken)

ctx := context.Background()
shutdownTelemetry, err := telemetry.Init(ctx, cfg, "pcmi-api")
if err != nil {
log.Fatalf("telemetry: %v", err)
log.Fatal("telemetry init failed", "err", err)
}
defer func() {
sdCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if e := shutdownTelemetry(sdCtx); e != nil {
log.Printf("telemetry shutdown: %v", e)
log.Error("telemetry shutdown failed", "err", e)
}
}()

Expand All @@ -85,7 +87,7 @@ func main() {
repo := repository.NewMemoryRepository(db, pools.Read)
embed, err := embedding.NewFromConfig(cfg)
if err != nil {
log.Fatalf("❌ FATAL embedding provider: %v", err)
log.Fatal("embedding provider init failed", "err", err)
}
dedupMode, _ := model.ParseDedupMode(cfg.DedupMode)
memSvc := service.NewMemoryService(repo, embed, dedupMode)
Expand All @@ -107,10 +109,10 @@ func main() {

handler.RegisterReadyRoutes(app, db)
if err := handler.SetupMemoryRoutes(app, db, pools.Read, cfg); err != nil {
log.Fatalf("❌ FATAL memory routes: %v", err)
log.Fatal("memory routes setup failed", "err", err)
}
if err := handler.SetupSessionRoutes(app, db, pools.Read, cfg); err != nil {
log.Fatalf("❌ FATAL session routes: %v", err)
log.Fatal("session routes setup failed", "err", err)
}
handler.SetupAdminRoutes(app, db)
handler.SetupDistillationPolicyRoutes(app, db)
Expand All @@ -121,16 +123,16 @@ func main() {

grpcserver.Start(db, pools.Read, memSvc, cfg)

log.Printf("✅ PCMI API %s started on port %s (/v1/ready per readiness)", version.Tag, cfg.APIPort)
log.Info("PCMI API started", "version", version.Tag, "port", cfg.APIPort)
if pools.Read != nil {
log.Println("📖 DATABASE_READ_URL attivo: carico di lettura su replica")
log.Info("DATABASE_READ_URL active — read traffic routed to replica")
}
addr := ":" + cfg.APIPort
if cfg.TLSCertFile != "" && cfg.TLSKeyFile != "" {
log.Printf("🔒 TLS enabled (cert=%s)", cfg.TLSCertFile)
log.Fatal(app.ListenTLS(addr, cfg.TLSCertFile, cfg.TLSKeyFile))
log.Info("TLS enabled", "cert", cfg.TLSCertFile)
log.Fatal("ListenTLS failed", "err", app.ListenTLS(addr, cfg.TLSCertFile, cfg.TLSKeyFile))
} else {
log.Fatal(app.Listen(addr))
log.Fatal("Listen failed", "err", app.Listen(addr))
}
}

Expand Down
26 changes: 14 additions & 12 deletions cmd/migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package main
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"sort"
Expand All @@ -16,10 +15,13 @@ import (

"github.com/jackc/pgx/v5/pgxpool"
"github.com/marco-spagn/pcmi/internal/config"
"github.com/marco-spagn/pcmi/internal/log"
)

func main() {
cfg := config.Load()
addSource := cfg.LogSource == "1" || cfg.LogSource == "true"
log.Configure(cfg.LogFormat, cfg.LogLevel, addSource)
if cfg.DatabaseURL == "" {
log.Fatal("DATABASE_URL is required")
}
Expand All @@ -30,7 +32,7 @@ func main() {

pool, err := pgxpool.New(ctx, cfg.DatabaseURL)
if err != nil {
log.Fatalf("connect: %v", err)
log.Fatal("database connection failed", "err", err)
}
defer pool.Close()

Expand All @@ -40,14 +42,14 @@ func main() {
filename TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)`); err != nil {
log.Fatalf("create schema_migrations: %v", err)
log.Fatal("create schema_migrations table failed", "err", err)
}

// Read already-applied migrations.
rows, err := pool.Query(ctx,
`SELECT filename FROM schema_migrations ORDER BY filename`)
if err != nil {
log.Fatalf("query applied: %v", err)
log.Fatal("query applied migrations failed", "err", err)
}
applied := map[string]bool{}
for rows.Next() {
Expand All @@ -60,7 +62,7 @@ func main() {
// Collect .sql files in lexicographic order.
entries, err := os.ReadDir(migrationsDir)
if err != nil {
log.Fatalf("read migrations dir %q: %v", migrationsDir, err)
log.Fatal("read migrations directory failed", "dir", migrationsDir, "err", err)
}
var files []string
for _, e := range entries {
Expand All @@ -73,30 +75,30 @@ func main() {
appliedCount := 0
for _, fname := range files {
if applied[fname] {
log.Printf("⏭ skip %s (already applied)", fname)
log.Info("migration already applied, skipping", "file", fname)
continue
}
sql, err := os.ReadFile(filepath.Join(migrationsDir, fname))
if err != nil {
log.Fatalf("read %s: %v", fname, err)
log.Fatal("read migration file failed", "file", fname, "err", err)
}
tx, err := pool.Begin(ctx)
if err != nil {
log.Fatalf("begin tx for %s: %v", fname, err)
log.Fatal("begin transaction failed", "file", fname, "err", err)
}
if _, err := tx.Exec(ctx, string(sql)); err != nil {
_ = tx.Rollback(ctx)
log.Fatalf("❌ apply %s: %v", fname, err)
log.Fatal("apply migration failed", "file", fname, "err", err)
}
if _, err := tx.Exec(ctx,
`INSERT INTO schema_migrations (filename) VALUES ($1)`, fname); err != nil {
_ = tx.Rollback(ctx)
log.Fatalf("record %s: %v", fname, err)
log.Fatal("record migration failed", "file", fname, "err", err)
}
if err := tx.Commit(ctx); err != nil {
log.Fatalf("commit %s: %v", fname, err)
log.Fatal("commit migration failed", "file", fname, "err", err)
}
log.Printf("✅ applied %s", fname)
log.Info("applied migration", "file", fname)
appliedCount++
}
fmt.Printf("\n🎉 Migrations: %d applied, %d skipped.\n",
Expand Down
42 changes: 22 additions & 20 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/marco-spagn/pcmi/internal/database"
"github.com/marco-spagn/pcmi/internal/embedding"
"github.com/marco-spagn/pcmi/internal/event"
"github.com/marco-spagn/pcmi/internal/log"
"github.com/marco-spagn/pcmi/internal/metrics"
"github.com/marco-spagn/pcmi/internal/telemetry"
"github.com/marco-spagn/pcmi/internal/version"
Expand All @@ -31,25 +31,27 @@ import (
const workerTracerName = "github.com/marco-spagn/pcmi/cmd/worker"

func main() {
log.Println("🚀 PCMI Worker starting...")
log.Info("PCMI Worker starting")

// --- Fail-fast: carica e valida config prima di aprire qualsiasi connessione ---
cfg := config.Load()
if err := cfg.Validate(config.WorkerRequiredFields...); err != nil {
log.Fatalf("❌ FATAL: %v", err)
log.Fatal("config validation failed", "err", err)
}
log.Printf("✅ Config loaded (DB=%s, Redis=%s)", cfg.DatabaseURL[:min(len(cfg.DatabaseURL), 40)], cfg.RedisAddr)
addSource := cfg.LogSource == "1" || cfg.LogSource == "true"
log.Configure(cfg.LogFormat, cfg.LogLevel, addSource)
log.Info("config loaded", "db", log.Mask(cfg.DatabaseURL, 40), "redis", cfg.RedisAddr)

ctxTelemetry := context.Background()
shutdownTelemetry, err := telemetry.Init(ctxTelemetry, cfg, "pcmi-worker")
if err != nil {
log.Fatalf("telemetry: %v", err)
log.Fatal("telemetry init failed", "err", err)
}
defer func() {
sdCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if e := shutdownTelemetry(sdCtx); e != nil {
log.Printf("telemetry shutdown: %v", e)
log.Error("telemetry shutdown failed", "err", e)
}
}()

Expand All @@ -74,34 +76,34 @@ func main() {
metrics.WorkerRegistry,
promhttp.HandlerOpts{EnableOpenMetrics: false},
))
log.Println("💓 Worker HTTP on :8081 (/health, /metrics)")
log.Info("Worker HTTP listening", "addr", ":8081", "endpoints", "/health, /metrics")
if err := http.ListenAndServe(":8081", mux); err != nil {
log.Printf("Health server error: %v", err)
log.Error("health server error", "err", err)
}
}()

backend := event.EventBackend()
log.Printf("✅ Redis connected, event backend=%s", backend)
log.Info("redis connected", "event_backend", backend)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

prov, err := embedding.NewFromConfig(cfg)
if err != nil {
log.Fatalf("❌ FATAL embedding provider: %v", err)
log.Fatal("embedding provider init failed", "err", err)
}
if prov != nil {
ew := worker.NewEmbeddingWorker(db, prov)
go ew.Start(ctx)
log.Println("✅ Embedding worker started")
log.Info("embedding worker started")
} else {
log.Println("⚠️ OPENAI_API_KEY unset embedding worker disabled")
log.Warn("OPENAI_API_KEY unset, embedding worker disabled")
}

distWorker := worker.NewDistillationWorker(db, cfg)
var policyEngine *worker.DistillationPolicyEngine
if cfg.DistillationPolicyDisabled {
log.Println("ℹ️ Distillation policy engine disabled (DISTILLATION_POLICY_DISABLED)")
log.Warn("distillation policy engine disabled", "reason", "DISTILLATION_POLICY_DISABLED")
} else {
policyEngine = worker.NewDistillationPolicyEngine(db, distWorker)
go policyEngine.Start(ctx)
Expand Down Expand Up @@ -136,13 +138,13 @@ func main() {
case event.EventMemoryStored, event.EventMemoryUpdated:
path, _ := evt.Payload["path"].(string)
if policyEngine != nil {
log.Printf("📨 [REDIS] %s id=%v tenant=%s path=%s → distillation policy", evt.Type, evt.Payload["id"], tenantID, path)
log.Info("redis event routed to distillation policy", "event_type", evt.Type, "id", evt.Payload["id"], "tenant", tenantID, "path", path)
policyEngine.OnMemoryEvent(tenantID, path)
}
consolidationWorker.TriggerForMemory(tenantID, path)
case event.EventMemoryRefineRequested:
prefix, _ := evt.Payload["path_prefix"].(string)
log.Printf("📨 [REDIS] refine.requested tenant=%s prefix=%s", tenantID, prefix)
log.Info("redis event refine requested", "tenant", tenantID, "prefix", prefix)
distWorker.TriggerForPrefix(tenantID, prefix)
}
span.End()
Expand All @@ -158,7 +160,7 @@ func main() {
} else {
consumer := event.NewWorkerStreamConsumer()
if err := consumer.EnsureGroup(ctx); err != nil {
log.Fatalf("❌ stream consumer group: %v", err)
log.Fatal("stream consumer group creation failed", "err", err)
}
streamHandler := func(_ context.Context, evt event.Event, streamID string) error {
handleMemoryEvent(evt, streamID)
Expand All @@ -167,20 +169,20 @@ func main() {
consumer.StartPendingRecovery(ctx, streamHandler)
go func() {
if err := consumer.Consume(ctx, streamHandler); err != nil && ctx.Err() == nil {
log.Printf("❌ stream consumer stopped: %v", err)
log.Error("stream consumer stopped unexpectedly", "err", err)
}
}()
log.Printf("✅ Subscribed to stream %s (group %s)", event.StreamKey, event.WorkerConsumerGroup)
log.Info("subscribed to stream", "stream", event.StreamKey, "group", event.WorkerConsumerGroup)
}

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan

log.Println("🛑 Shutting down worker...")
log.Info("shutting down worker")
cancel()
time.Sleep(2 * time.Second)
log.Println("👋 Worker stopped")
log.Info("worker stopped")
}

func min(a, b int) int {
Expand Down
9 changes: 9 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ type Config struct {

// Dedup (PCMI-011): default ingest dedup mode when tenant/request omit it.
DedupMode string

// Logging
LogFormat string // "json" (default) or "text"
LogLevel string // "info" (default) | "debug" | "warn" | "error"
LogSource string // "1" | "true" to enable source file:line in every record
}

// APIConfig returns the subset of fields required by the API service.
Expand Down Expand Up @@ -135,6 +140,10 @@ func Load() *Config {
OTELServiceName: strings.TrimSpace(os.Getenv("OTEL_SERVICE_NAME")),

DedupMode: envOr("DEDUP_MODE", "none"),

LogFormat: envOr("PCMI_LOG_FORMAT", ""),
LogLevel: envOr("PCMI_LOG_LEVEL", ""),
LogSource: strings.TrimSpace(os.Getenv("PCMI_LOG_SOURCE")),
}
return cfg
}
Expand Down
Loading