diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 560b1a44..ebbb4261 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,6 +42,7 @@ concurrency: permissions: contents: read pull-requests: write + issues: write jobs: golangci-lint: @@ -137,6 +138,7 @@ jobs: permissions: contents: write pull-requests: write + issues: write services: postgres: image: pgvector/pgvector:pg16 diff --git a/cmd/api/main.go b/cmd/api/main.go index 91cfac96..4335a5d0 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -7,7 +7,6 @@ package main import ( "context" - "log" "strings" "time" @@ -23,6 +22,7 @@ import ( "github.com/marco-spagn/pcmi/internal/event" grpcserver "github.com/marco-spagn/pcmi/internal/grpc" "github.com/marco-spagn/pcmi/internal/handler" + "github.com/marco-spagn/pcmi/internal/log" metrics "github.com/marco-spagn/pcmi/internal/metrics" "github.com/marco-spagn/pcmi/internal/middleware" "github.com/marco-spagn/pcmi/internal/model" @@ -45,31 +45,33 @@ func skipTracePath(c *fiber.Ctx) bool { } func main() { - log.Println("๐Ÿš€ PCMI API " + version.Tag + " starting...") + log.Info("PCMI API starting", "version", version.Tag) // --- Fail-fast: carica e valida config prima di aprire qualsiasi connessione --- cfg := config.Load() if err := cfg.Validate(config.APIRequiredFields...); err != nil { - log.Fatalf("โŒ FATAL: %v", err) + log.Fatal("config validation failed", "err", err) } + addSource := cfg.LogSource == "1" || cfg.LogSource == "true" + log.Configure(cfg.LogFormat, cfg.LogLevel, addSource) if cfg.EncryptionKey != "" { if err := pcmicrypto.InitKey(cfg.EncryptionKey); err != nil { - log.Fatalf("โŒ FATAL encryption key: %v", err) + log.Fatal("encryption key initialization failed", "err", err) } } - log.Printf("โœ… Config loaded (DB=%s, Redis=%s, Port=%s)", cfg.DatabaseURL[:min(len(cfg.DatabaseURL), 40)], cfg.RedisAddr, cfg.APIPort) + log.Info("config loaded", "db", log.Mask(cfg.DatabaseURL, 40), "redis", cfg.RedisAddr, "port", cfg.APIPort) middleware.LogMetricsScrapeAuthState(cfg.MetricsScrapeToken) ctx := context.Background() shutdownTelemetry, err := telemetry.Init(ctx, cfg, "pcmi-api") if err != nil { - log.Fatalf("telemetry: %v", err) + log.Fatal("telemetry init failed", "err", err) } defer func() { sdCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if e := shutdownTelemetry(sdCtx); e != nil { - log.Printf("telemetry shutdown: %v", e) + log.Error("telemetry shutdown failed", "err", e) } }() @@ -85,7 +87,7 @@ func main() { repo := repository.NewMemoryRepository(db, pools.Read) embed, err := embedding.NewFromConfig(cfg) if err != nil { - log.Fatalf("โŒ FATAL embedding provider: %v", err) + log.Fatal("embedding provider init failed", "err", err) } dedupMode, _ := model.ParseDedupMode(cfg.DedupMode) memSvc := service.NewMemoryService(repo, embed, dedupMode) @@ -107,10 +109,10 @@ func main() { handler.RegisterReadyRoutes(app, db) if err := handler.SetupMemoryRoutes(app, db, pools.Read, cfg); err != nil { - log.Fatalf("โŒ FATAL memory routes: %v", err) + log.Fatal("memory routes setup failed", "err", err) } if err := handler.SetupSessionRoutes(app, db, pools.Read, cfg); err != nil { - log.Fatalf("โŒ FATAL session routes: %v", err) + log.Fatal("session routes setup failed", "err", err) } handler.SetupAdminRoutes(app, db) handler.SetupDistillationPolicyRoutes(app, db) @@ -121,16 +123,16 @@ func main() { grpcserver.Start(db, pools.Read, memSvc, cfg) - log.Printf("โœ… PCMI API %s started on port %s (/v1/ready per readiness)", version.Tag, cfg.APIPort) + log.Info("PCMI API started", "version", version.Tag, "port", cfg.APIPort) if pools.Read != nil { - log.Println("๐Ÿ“– DATABASE_READ_URL attivo: carico di lettura su replica") + log.Info("DATABASE_READ_URL active โ€” read traffic routed to replica") } addr := ":" + cfg.APIPort if cfg.TLSCertFile != "" && cfg.TLSKeyFile != "" { - log.Printf("๐Ÿ”’ TLS enabled (cert=%s)", cfg.TLSCertFile) - log.Fatal(app.ListenTLS(addr, cfg.TLSCertFile, cfg.TLSKeyFile)) + log.Info("TLS enabled", "cert", cfg.TLSCertFile) + log.Fatal("ListenTLS failed", "err", app.ListenTLS(addr, cfg.TLSCertFile, cfg.TLSKeyFile)) } else { - log.Fatal(app.Listen(addr)) + log.Fatal("Listen failed", "err", app.Listen(addr)) } } diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index 89c99ea7..9f64ca45 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -7,7 +7,6 @@ package main import ( "context" "fmt" - "log" "os" "path/filepath" "sort" @@ -16,10 +15,13 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/marco-spagn/pcmi/internal/config" + "github.com/marco-spagn/pcmi/internal/log" ) func main() { cfg := config.Load() + addSource := cfg.LogSource == "1" || cfg.LogSource == "true" + log.Configure(cfg.LogFormat, cfg.LogLevel, addSource) if cfg.DatabaseURL == "" { log.Fatal("DATABASE_URL is required") } @@ -30,7 +32,7 @@ func main() { pool, err := pgxpool.New(ctx, cfg.DatabaseURL) if err != nil { - log.Fatalf("connect: %v", err) + log.Fatal("database connection failed", "err", err) } defer pool.Close() @@ -40,14 +42,14 @@ func main() { filename TEXT PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW() )`); err != nil { - log.Fatalf("create schema_migrations: %v", err) + log.Fatal("create schema_migrations table failed", "err", err) } // Read already-applied migrations. rows, err := pool.Query(ctx, `SELECT filename FROM schema_migrations ORDER BY filename`) if err != nil { - log.Fatalf("query applied: %v", err) + log.Fatal("query applied migrations failed", "err", err) } applied := map[string]bool{} for rows.Next() { @@ -60,7 +62,7 @@ func main() { // Collect .sql files in lexicographic order. entries, err := os.ReadDir(migrationsDir) if err != nil { - log.Fatalf("read migrations dir %q: %v", migrationsDir, err) + log.Fatal("read migrations directory failed", "dir", migrationsDir, "err", err) } var files []string for _, e := range entries { @@ -73,30 +75,30 @@ func main() { appliedCount := 0 for _, fname := range files { if applied[fname] { - log.Printf("โญ skip %s (already applied)", fname) + log.Info("migration already applied, skipping", "file", fname) continue } sql, err := os.ReadFile(filepath.Join(migrationsDir, fname)) if err != nil { - log.Fatalf("read %s: %v", fname, err) + log.Fatal("read migration file failed", "file", fname, "err", err) } tx, err := pool.Begin(ctx) if err != nil { - log.Fatalf("begin tx for %s: %v", fname, err) + log.Fatal("begin transaction failed", "file", fname, "err", err) } if _, err := tx.Exec(ctx, string(sql)); err != nil { _ = tx.Rollback(ctx) - log.Fatalf("โŒ apply %s: %v", fname, err) + log.Fatal("apply migration failed", "file", fname, "err", err) } if _, err := tx.Exec(ctx, `INSERT INTO schema_migrations (filename) VALUES ($1)`, fname); err != nil { _ = tx.Rollback(ctx) - log.Fatalf("record %s: %v", fname, err) + log.Fatal("record migration failed", "file", fname, "err", err) } if err := tx.Commit(ctx); err != nil { - log.Fatalf("commit %s: %v", fname, err) + log.Fatal("commit migration failed", "file", fname, "err", err) } - log.Printf("โœ… applied %s", fname) + log.Info("applied migration", "file", fname) appliedCount++ } fmt.Printf("\n๐ŸŽ‰ Migrations: %d applied, %d skipped.\n", diff --git a/cmd/worker/main.go b/cmd/worker/main.go index d3bb26f8..3ad8abfe 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -6,7 +6,6 @@ package main import ( "context" "fmt" - "log" "net/http" "os" "os/signal" @@ -22,6 +21,7 @@ import ( "github.com/marco-spagn/pcmi/internal/database" "github.com/marco-spagn/pcmi/internal/embedding" "github.com/marco-spagn/pcmi/internal/event" + "github.com/marco-spagn/pcmi/internal/log" "github.com/marco-spagn/pcmi/internal/metrics" "github.com/marco-spagn/pcmi/internal/telemetry" "github.com/marco-spagn/pcmi/internal/version" @@ -31,25 +31,27 @@ import ( const workerTracerName = "github.com/marco-spagn/pcmi/cmd/worker" func main() { - log.Println("๐Ÿš€ PCMI Worker starting...") + log.Info("PCMI Worker starting") // --- Fail-fast: carica e valida config prima di aprire qualsiasi connessione --- cfg := config.Load() if err := cfg.Validate(config.WorkerRequiredFields...); err != nil { - log.Fatalf("โŒ FATAL: %v", err) + log.Fatal("config validation failed", "err", err) } - log.Printf("โœ… Config loaded (DB=%s, Redis=%s)", cfg.DatabaseURL[:min(len(cfg.DatabaseURL), 40)], cfg.RedisAddr) + addSource := cfg.LogSource == "1" || cfg.LogSource == "true" + log.Configure(cfg.LogFormat, cfg.LogLevel, addSource) + log.Info("config loaded", "db", log.Mask(cfg.DatabaseURL, 40), "redis", cfg.RedisAddr) ctxTelemetry := context.Background() shutdownTelemetry, err := telemetry.Init(ctxTelemetry, cfg, "pcmi-worker") if err != nil { - log.Fatalf("telemetry: %v", err) + log.Fatal("telemetry init failed", "err", err) } defer func() { sdCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if e := shutdownTelemetry(sdCtx); e != nil { - log.Printf("telemetry shutdown: %v", e) + log.Error("telemetry shutdown failed", "err", e) } }() @@ -74,34 +76,34 @@ func main() { metrics.WorkerRegistry, promhttp.HandlerOpts{EnableOpenMetrics: false}, )) - log.Println("๐Ÿ’“ Worker HTTP on :8081 (/health, /metrics)") + log.Info("Worker HTTP listening", "addr", ":8081", "endpoints", "/health, /metrics") if err := http.ListenAndServe(":8081", mux); err != nil { - log.Printf("Health server error: %v", err) + log.Error("health server error", "err", err) } }() backend := event.EventBackend() - log.Printf("โœ… Redis connected, event backend=%s", backend) + log.Info("redis connected", "event_backend", backend) ctx, cancel := context.WithCancel(context.Background()) defer cancel() prov, err := embedding.NewFromConfig(cfg) if err != nil { - log.Fatalf("โŒ FATAL embedding provider: %v", err) + log.Fatal("embedding provider init failed", "err", err) } if prov != nil { ew := worker.NewEmbeddingWorker(db, prov) go ew.Start(ctx) - log.Println("โœ… Embedding worker started") + log.Info("embedding worker started") } else { - log.Println("โš ๏ธ OPENAI_API_KEY unset โ€” embedding worker disabled") + log.Warn("OPENAI_API_KEY unset, embedding worker disabled") } distWorker := worker.NewDistillationWorker(db, cfg) var policyEngine *worker.DistillationPolicyEngine if cfg.DistillationPolicyDisabled { - log.Println("โ„น๏ธ Distillation policy engine disabled (DISTILLATION_POLICY_DISABLED)") + log.Warn("distillation policy engine disabled", "reason", "DISTILLATION_POLICY_DISABLED") } else { policyEngine = worker.NewDistillationPolicyEngine(db, distWorker) go policyEngine.Start(ctx) @@ -136,13 +138,13 @@ func main() { case event.EventMemoryStored, event.EventMemoryUpdated: path, _ := evt.Payload["path"].(string) if policyEngine != nil { - log.Printf("๐Ÿ“จ [REDIS] %s id=%v tenant=%s path=%s โ†’ distillation policy", evt.Type, evt.Payload["id"], tenantID, path) + log.Info("redis event routed to distillation policy", "event_type", evt.Type, "id", evt.Payload["id"], "tenant", tenantID, "path", path) policyEngine.OnMemoryEvent(tenantID, path) } consolidationWorker.TriggerForMemory(tenantID, path) case event.EventMemoryRefineRequested: prefix, _ := evt.Payload["path_prefix"].(string) - log.Printf("๐Ÿ“จ [REDIS] refine.requested tenant=%s prefix=%s", tenantID, prefix) + log.Info("redis event refine requested", "tenant", tenantID, "prefix", prefix) distWorker.TriggerForPrefix(tenantID, prefix) } span.End() @@ -158,7 +160,7 @@ func main() { } else { consumer := event.NewWorkerStreamConsumer() if err := consumer.EnsureGroup(ctx); err != nil { - log.Fatalf("โŒ stream consumer group: %v", err) + log.Fatal("stream consumer group creation failed", "err", err) } streamHandler := func(_ context.Context, evt event.Event, streamID string) error { handleMemoryEvent(evt, streamID) @@ -167,20 +169,20 @@ func main() { consumer.StartPendingRecovery(ctx, streamHandler) go func() { if err := consumer.Consume(ctx, streamHandler); err != nil && ctx.Err() == nil { - log.Printf("โŒ stream consumer stopped: %v", err) + log.Error("stream consumer stopped unexpectedly", "err", err) } }() - log.Printf("โœ… Subscribed to stream %s (group %s)", event.StreamKey, event.WorkerConsumerGroup) + log.Info("subscribed to stream", "stream", event.StreamKey, "group", event.WorkerConsumerGroup) } sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan - log.Println("๐Ÿ›‘ Shutting down worker...") + log.Info("shutting down worker") cancel() time.Sleep(2 * time.Second) - log.Println("๐Ÿ‘‹ Worker stopped") + log.Info("worker stopped") } func min(a, b int) int { diff --git a/internal/config/config.go b/internal/config/config.go index 592562db..eb6c5f0c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -76,6 +76,11 @@ type Config struct { // Dedup (PCMI-011): default ingest dedup mode when tenant/request omit it. DedupMode string + + // Logging + LogFormat string // "json" (default) or "text" + LogLevel string // "info" (default) | "debug" | "warn" | "error" + LogSource string // "1" | "true" to enable source file:line in every record } // APIConfig returns the subset of fields required by the API service. @@ -135,6 +140,10 @@ func Load() *Config { OTELServiceName: strings.TrimSpace(os.Getenv("OTEL_SERVICE_NAME")), DedupMode: envOr("DEDUP_MODE", "none"), + + LogFormat: envOr("PCMI_LOG_FORMAT", ""), + LogLevel: envOr("PCMI_LOG_LEVEL", ""), + LogSource: strings.TrimSpace(os.Getenv("PCMI_LOG_SOURCE")), } return cfg } diff --git a/internal/event/redis.go b/internal/event/redis.go index fd430912..b392f277 100644 --- a/internal/event/redis.go +++ b/internal/event/redis.go @@ -4,11 +4,12 @@ import ( "context" "encoding/json" "errors" - "log" "os" "time" "github.com/redis/go-redis/v9" + + "github.com/marco-spagn/pcmi/internal/log" ) // ErrRedisNotInitialized is returned when RedisClient was never set (InitRedis). @@ -46,15 +47,14 @@ func InitRedis(addr string) { for attempt := 1; attempt <= maxAttempts; attempt++ { _, err := RedisClient.Ping(ctx).Result() if err == nil { - log.Println("โœ… Connected to Redis") + log.Info("connected to redis") return } if attempt == maxAttempts { // Only fatal after all retries exhausted. - log.Fatalf("โŒ Failed to connect to Redis after %d attempts: %v", maxAttempts, err) + log.Fatal("failed to connect to redis", "attempts", maxAttempts, "err", err) } - log.Printf("โณ Redis not ready (attempt %d/%d): %v โ€” retrying in %s", - attempt, maxAttempts, err, backoff) + log.Warn("redis not ready, retrying", "attempt", attempt, "max_attempts", maxAttempts, "err", err, "backoff", backoff) time.Sleep(backoff) backoff *= 2 if backoff > 16*time.Second { @@ -81,10 +81,10 @@ func publishStream(eventType string, payload map[string]any) error { pub := NewStreamPublisher(RedisClient, StreamKey) streamID, err := pub.Publish(ctx, eventType, payload) if err != nil { - log.Printf("โŒ Failed to XADD event: %v", err) + log.Error("failed to XADD event", "err", err) return err } - log.Printf("๐Ÿ“ฃ [REDIS STREAM] Published event: %s id=%s", eventType, streamID) + log.Debug("[redis stream] event published", "type", eventType, "id", streamID) notifyWebhook(eventType, payload) return nil } @@ -102,11 +102,11 @@ func publishPubSub(eventType string, payload map[string]any) error { err = RedisClient.Publish(ctx, "memory_events", data).Err() if err != nil { - log.Printf("โŒ Failed to publish event: %v", err) + log.Error("failed to publish event", "err", err) return err } - log.Printf("๐Ÿ“ฃ [REDIS] Published event: %s", eventType) + log.Debug("[redis pubsub] event published", "type", eventType) notifyWebhook(eventType, payload) return nil } @@ -141,7 +141,7 @@ func pubsubSubscribe(parent context.Context) <-chan Event { pubsub := RedisClient.Subscribe(parent, "memory_events") if _, err := pubsub.Receive(parent); err != nil { - log.Printf("โŒ Failed to confirm Redis SUBSCRIBE: %v", err) + log.Error("failed to confirm redis subscribe", "err", err) _ = pubsub.Close() close(ch) return ch @@ -160,7 +160,7 @@ func pubsubSubscribe(parent context.Context) <-chan Event { } var evt Event if err := json.Unmarshal([]byte(msg.Payload), &evt); err != nil { - log.Printf("โŒ Failed to unmarshal event: %v", err) + log.Error("failed to unmarshal event", "err", err) continue } select { diff --git a/internal/event/stream.go b/internal/event/stream.go index 9c8ba155..ff49cacf 100644 --- a/internal/event/stream.go +++ b/internal/event/stream.go @@ -5,11 +5,12 @@ import ( "encoding/json" "errors" "fmt" - "log" "strings" "time" "github.com/redis/go-redis/v9" + + "github.com/marco-spagn/pcmi/internal/log" ) const ( @@ -132,7 +133,7 @@ func (c *StreamConsumer) Consume(ctx context.Context, handler StreamHandler) err if err == redis.Nil { continue } - log.Printf("โŒ stream XREADGROUP: %v", err) + log.Error("stream XREADGROUP failed", "err", err) time.Sleep(500 * time.Millisecond) continue } @@ -140,16 +141,16 @@ func (c *StreamConsumer) Consume(ctx context.Context, handler StreamHandler) err for _, msg := range stream.Messages { evt, parseErr := decodeStreamMessage(msg) if parseErr != nil { - log.Printf("โŒ stream decode %s: %v", msg.ID, parseErr) + log.Error("stream decode failed", "id", msg.ID, "err", parseErr) _ = c.ack(ctx, msg.ID) continue } if err := handler(ctx, evt, msg.ID); err != nil { - log.Printf("โš ๏ธ stream handler %s: %v (no ACK)", msg.ID, err) + log.Warn("stream handler failed, no ACK", "id", msg.ID, "err", err) continue } if ackErr := c.ack(ctx, msg.ID); ackErr != nil { - log.Printf("โŒ stream XACK %s: %v", msg.ID, ackErr) + log.Error("stream XACK failed", "id", msg.ID, "err", ackErr) } else { IncStreamAck() } @@ -221,7 +222,7 @@ func streamSubscribe(parent context.Context) <-chan Event { return } if err != redis.Nil { - log.Printf("โŒ stream XREAD: %v", err) + log.Error("stream XREAD failed", "err", err) time.Sleep(500 * time.Millisecond) } continue @@ -231,7 +232,7 @@ func streamSubscribe(parent context.Context) <-chan Event { lastID = msg.ID evt, err := decodeStreamMessage(msg) if err != nil { - log.Printf("โŒ stream subscribe decode: %v", err) + log.Error("stream subscribe decode failed", "err", err) continue } select { diff --git a/internal/event/stream_consumer.go b/internal/event/stream_consumer.go index 71ae866b..88343f35 100644 --- a/internal/event/stream_consumer.go +++ b/internal/event/stream_consumer.go @@ -3,10 +3,11 @@ package event import ( "context" "encoding/json" - "log" "time" "github.com/redis/go-redis/v9" + + "github.com/marco-spagn/pcmi/internal/log" ) const ( @@ -43,7 +44,7 @@ func (c *StreamConsumer) recoverPending(ctx context.Context, handler StreamHandl Count: pendingClaimBatch, }).Result() if err != nil { - log.Printf("โš ๏ธ stream XPENDING: %v", err) + log.Warn("stream XPENDING failed", "err", err) return } SetStreamPending(len(pending)) @@ -64,7 +65,7 @@ func (c *StreamConsumer) recoverPending(ctx context.Context, handler StreamHandl Messages: ids, }).Result() if err != nil { - log.Printf("โš ๏ธ stream XCLAIM: %v", err) + log.Warn("stream XCLAIM failed", "err", err) return } for _, msg := range claimed { @@ -74,16 +75,16 @@ func (c *StreamConsumer) recoverPending(ctx context.Context, handler StreamHandl } evt, parseErr := decodeStreamMessage(msg) if parseErr != nil { - log.Printf("โŒ pending decode %s: %v", msg.ID, parseErr) + log.Error("pending decode failed", "id", msg.ID, "err", parseErr) _ = c.ack(ctx, msg.ID) continue } if err := handler(ctx, evt, msg.ID); err != nil { - log.Printf("โš ๏ธ pending handler %s: %v", msg.ID, err) + log.Warn("pending handler failed", "id", msg.ID, "err", err) continue } if ackErr := c.ack(ctx, msg.ID); ackErr != nil { - log.Printf("โŒ pending XACK %s: %v", msg.ID, ackErr) + log.Error("pending XACK failed", "id", msg.ID, "err", ackErr) } else { IncStreamAck() } @@ -106,11 +107,11 @@ func (c *StreamConsumer) moveToDLQ(ctx context.Context, msg redis.XMessage) { Values: values, }).Result() if dlqErr != nil { - log.Printf("โŒ stream DLQ XADD %s: %v", msg.ID, dlqErr) + log.Error("stream DLQ XADD failed", "id", msg.ID, "err", dlqErr) return } IncStreamDLQ() if ackErr := c.ack(ctx, msg.ID); ackErr != nil { - log.Printf("โŒ stream DLQ XACK %s: %v", msg.ID, ackErr) + log.Error("stream DLQ XACK failed", "id", msg.ID, "err", ackErr) } } diff --git a/internal/grpc/server.go b/internal/grpc/server.go index 5883b2ad..2222650b 100644 --- a/internal/grpc/server.go +++ b/internal/grpc/server.go @@ -4,12 +4,12 @@ import ( "context" "crypto/sha256" "encoding/hex" - "log" "net" "strings" "github.com/jackc/pgx/v5/pgxpool" "github.com/marco-spagn/pcmi/internal/config" + "github.com/marco-spagn/pcmi/internal/log" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -301,15 +301,15 @@ func BuildServerOptions(cfg *config.Config) []grpc.ServerOption { return opts } if cert == "" || key == "" { - log.Printf("โš ๏ธ gRPC TLS partially configured (cert=%q, key=%q) โ€” falling back to plain TCP", cert, key) + log.Warn("gRPC TLS partially configured, falling back to plain TCP", "cert", cert, "key", key) return opts } creds, err := credentials.NewServerTLSFromFile(cert, key) if err != nil { - log.Printf("โš ๏ธ gRPC TLS load failed (%v) โ€” falling back to plain TCP", err) + log.Warn("gRPC TLS load failed, falling back to plain TCP", "err", err) return opts } - log.Printf("๐Ÿ”’ gRPC TLS enabled (cert=%s)", cert) + log.Info("gRPC TLS enabled", "cert", cert) return append(opts, grpc.Creds(creds)) } @@ -322,7 +322,7 @@ func Start(dbWrite, dbRead *pgxpool.Pool, memSvc *service.MemoryService, cfg *co port := ResolveGRPCPort(cfg) lis, err := net.Listen("tcp", ":"+port) if err != nil { - log.Printf("gRPC listen failed: %v", err) + log.Error("gRPC listen failed", "err", err) return } srv := grpc.NewServer(BuildServerOptions(cfg)...) @@ -342,9 +342,9 @@ func Start(dbWrite, dbRead *pgxpool.Pool, memSvc *service.MemoryService, cfg *co pcmiv1.RegisterAdminServiceServer(srv, newAdminServer(dbWrite)) pcmiv1.RegisterMetricsServiceServer(srv, newMetricsServer(dbWrite)) go func() { - log.Printf("โœ… PCMI gRPC server on :%s (MemoryService + AdminService + MetricsService)", port) + log.Info("gRPC server started", "port", port) if err := srv.Serve(lis); err != nil { - log.Printf("gRPC serve: %v", err) + log.Error("gRPC serve failed", "err", err) } }() } diff --git a/internal/handler/distilled_handler.go b/internal/handler/distilled_handler.go index 9879e830..2dd46461 100644 --- a/internal/handler/distilled_handler.go +++ b/internal/handler/distilled_handler.go @@ -5,12 +5,12 @@ import ( "database/sql" "encoding/json" "fmt" - "log" "time" "github.com/gofiber/fiber/v2" "github.com/jackc/pgx/v5" + "github.com/marco-spagn/pcmi/internal/log" "github.com/marco-spagn/pcmi/internal/middleware" "github.com/marco-spagn/pcmi/internal/model" "github.com/marco-spagn/pcmi/internal/repository" @@ -48,7 +48,7 @@ func (h *DistilledHandler) Get(c *fiber.Ctx) error { return c.Status(400).JSON(fiber.Map{"error": "after_id is not supported for distilled listings; use cursor"}) } - log.Printf("๐Ÿ“ก [DISTILLED] tenant=%s path_prefix=%s", tenantID, pathPrefix) + log.Debug("[distilled] get", "tenant", tenantID, "path_prefix", pathPrefix) ctx := context.Background() var total int @@ -56,7 +56,7 @@ func (h *DistilledHandler) Get(c *fiber.Ctx) error { `SELECT COUNT(*) FROM distilled_knowledge WHERE tenant_id = $1::uuid AND path <@ $2::ltree`, tenantID, pathPrefix, ).Scan(&total); err != nil { - log.Printf("โŒ [DISTILLED] count: %v", err) + log.Error("[distilled] count failed", "err", err) return c.Status(500).JSON(fiber.Map{"error": err.Error()}) } @@ -79,7 +79,7 @@ func (h *DistilledHandler) Get(c *fiber.Ctx) error { rows, err := h.db.Query(ctx, q, args...) if err != nil { - log.Printf("โŒ [DISTILLED] query: %v", err) + log.Error("[distilled] query failed", "err", err) return c.Status(500).JSON(fiber.Map{"error": err.Error()}) } defer rows.Close() @@ -98,7 +98,7 @@ func (h *DistilledHandler) Get(c *fiber.Ctx) error { for rows.Next() { var row distilledRow if err := rows.Scan(&row.id, &row.path, &row.summary, &row.insightsRaw, &row.confidence, &row.distilledAt, &row.sourceIDs, &row.version); err != nil { - log.Printf("โŒ [DISTILLED] scan: %v", err) + log.Error("[distilled] row scan failed", "err", err) continue } scanned = append(scanned, row) diff --git a/internal/log/log.go b/internal/log/log.go new file mode 100644 index 00000000..b1f63bb3 --- /dev/null +++ b/internal/log/log.go @@ -0,0 +1,322 @@ +// Package log provides a shared structured logger for the entire PCMI codebase. +// All components should import this package and use its exported functions instead of +// the standard library log package. +// +// Defaults applied in init(): JSON format, INFO level, no source file. +// Override at runtime by calling Configure(cfg) after loading config. +// +// Env vars are read exclusively by internal/config โ€” PCMI_LOG_FORMAT, +// PCMI_LOG_LEVEL, PCMI_LOG_SOURCE. +// +// Trace correlation: the *Context family (InfoContext, WarnContext, ErrorContext) +// automatically injects trace_id and span_id when OpenTelemetry is active. +package log + +import ( + "context" + "io" + "os" + "strings" + "sync/atomic" + "syscall" + + "log/slog" + + "go.opentelemetry.io/otel/trace" +) + +// lg is the global shared logger, protected by atomic.Pointer for safe concurrent +// reads after SetLevel / SetFormat / Configure etc. are called. +var lg atomic.Pointer[slog.Logger] + +// cfg holds the current config-driven settings. Starts nil; populated by Configure(). +var cfg *LogConfig + +type LogConfig struct { + Format string // "json" (default) or "text" + Level string // "info" (default) | "debug" | "warn" | "error" + AddSource bool +} + +func init() { + lg.Store(buildLogger(nil)) +} + +// Configure updates the logger with settings from config. Call once after +// loading config. Safe to call multiple times or before init() completes. +func Configure(format, level string, addSource bool) { + cfg = &LogConfig{ + Format: format, + Level: level, + AddSource: addSource, + } + lg.Store(buildLogger(cfg)) +} + +// buildLogger creates the slog.Logger based on current config or defaults. +func buildLogger(c *LogConfig) *slog.Logger { + return slog.New(buildHandler(c)) +} + +// buildHandler creates the slog.Handler based on current config or defaults. +func buildHandler(c *LogConfig) slog.Handler { + out := os.Stderr + + format := "json" // default + if c != nil && c.Format != "" { + format = c.Format + } else if isTTY(out.Fd()) { + format = "text" + } + + level := slog.LevelInfo + if c != nil && c.Level != "" { + if parsed := parseLevel(c.Level); parsed != 0 { + level = parsed + } + } + + addSource := c != nil && c.AddSource + + h := slog.HandlerOptions{ + Level: level, + AddSource: addSource, + } + + var base slog.Handler + if format == "json" { + base = slog.NewJSONHandler(out, &h) + } else { + base = slog.NewTextHandler(out, &h) + } + return newOTelHandler(base) +} + +// isTTY returns true when fd is a terminal. +func isTTY(fd uintptr) bool { + _, _, err := syscall.Syscall(syscall.SYS_IOCTL, fd, syscall.TCGETS, 0) + return err == 0 +} + +func parseLevel(raw string) slog.Level { + switch strings.ToLower(raw) { + case "debug": + return slog.LevelDebug + case "info": + return slog.LevelInfo + case "warn": + return slog.LevelWarn + case "error": + return slog.LevelError + default: + return 0 + } +} + +// formatHandler creates a JSON or text slog.Handler writing to w. +func formatHandler(format string, w io.Writer, c *LogConfig) slog.Handler { + level := slog.LevelInfo + if c != nil && c.Level != "" { + if parsed := parseLevel(c.Level); parsed != 0 { + level = parsed + } + } + addSource := false + if c != nil { + addSource = c.AddSource + } + h := slog.HandlerOptions{ + Level: level, + AddSource: addSource, + } + if format == "json" { + return slog.NewJSONHandler(w, &h) + } + return slog.NewTextHandler(w, &h) +} + +// SetFormat switches the logger handler format at runtime. Callers should invoke +// this once during startup if they want to override the auto-detected TTY behaviour. +func SetFormat(format string) { + lg.Store(slog.New(newOTelHandler(formatHandler(format, os.Stderr, cfg)))) +} + +// SetLevel dynamically changes the log level at runtime (concurrent-safe). +func SetLevel(level slog.Level) { + current := lg.Load() + if current == nil { + return + } + lg.Store(slog.New(newOTelHandler(levelOverride(current.Handler(), level)))) +} + +// levelOverride wraps a handler to enforce a new minimum level. +func levelOverride(h slog.Handler, lvl slog.Level) slog.Handler { + return &lvlHandler{Handler: h, level: lvl} +} + +type lvlHandler struct { + slog.Handler + level slog.Level +} + +func (l *lvlHandler) Enabled(_ context.Context, level slog.Level) bool { + return level >= l.level +} + +// L returns the shared logger. Callers should use L() directly rather than +// storing a reference, so that runtime level changes take effect concurrently. +func L() *slog.Logger { + return lg.Load() +} + +// Debug logs at DEBUG level. +func Debug(msg string, args ...any) { + lg.Load().Debug(msg, args...) +} + +// Info logs at INFO level. +func Info(msg string, args ...any) { + lg.Load().Info(msg, args...) +} + +// Warn logs at WARN level. +func Warn(msg string, args ...any) { + lg.Load().Warn(msg, args...) +} + +// Error logs at ERROR level. +func Error(msg string, args ...any) { + lg.Load().Error(msg, args...) +} + +// Fatal logs at ERROR level and exits with code 1. +func Fatal(msg string, args ...any) { + lg.Load().Error(msg, args...) + os.Exit(1) +} + +// InfoContext logs at INFO level, enriching the record with trace_id/span_id +// when OpenTelemetry is active on ctx. +func InfoContext(ctx context.Context, msg string, args ...any) { + lg.Load().Info(msg, append(traceArgs(ctx), args...)...) +} + +// WarnContext logs at WARN level, enriching the record with trace_id/span_id. +func WarnContext(ctx context.Context, msg string, args ...any) { + lg.Load().Warn(msg, append(traceArgs(ctx), args...)...) +} + +// ErrorContext logs at ERROR level, enriching the record with trace_id/span_id. +func ErrorContext(ctx context.Context, msg string, args ...any) { + lg.Load().Error(msg, append(traceArgs(ctx), args...)...) +} + +// DebugContext logs at DEBUG level, enriching the record with trace_id/span_id. +func DebugContext(ctx context.Context, msg string, args ...any) { + lg.Load().Debug(msg, append(traceArgs(ctx), args...)...) +} + +// traceArgs returns trace_id and span_id key-value pairs when ctx carries a +// sampled OTel span. Returns nil otherwise for zero-allocation append(). +func traceArgs(ctx context.Context) []any { + span := trace.SpanFromContext(ctx) + if !span.SpanContext().IsSampled() { + return nil + } + sc := span.SpanContext() + return []any{"trace_id", sc.TraceID().String(), "span_id", sc.SpanID().String()} +} + +// Mask truncates a sensitive string for safe logging. +// +// When the string is <= 8 characters it returns the first character followed by "***" +// (e.g. "abc" -> "a***"). For longer strings it shows the first and last 4 characters +// with "***" in between (e.g. "mysecretvalue" -> "mys****value"). +// +// URL-like strings (containing "://") also attempt to redact the credentials block +// "user:pass@" if present: +// +// "postgres://user:pass@host.db" -> "postgres://***@host.db" +func Mask(s string, prefix int) string { + s = strings.TrimSpace(s) + if len(s) == 0 { + return "***" + } + + if strings.Contains(s, "://") { + if masked := maskURL(s); masked != "" { + return masked + } + } + + if len(s) <= 8 { + return s[:1] + "***" + } + prefix = clamp(prefix, 4, 20) + suffix := "" + if len(s) > prefix+4 { + suffix = s[len(s)-4:] + } + return s[:prefix] + "***" + suffix +} + +// maskURL redacts the credentials portion of a URL-like string. +func maskURL(s string) string { + idx := strings.Index(s, "://") + if idx < 0 { + return "" + } + before := s[:idx+3] // include "://" + after := s[idx+3:] + atPos := strings.Index(after, "@") + if atPos <= 0 { + return "" + } + return before + "***@" + after[atPos+1:] +} + +func clamp(v, min, max int) int { + if v < min { + return min + } + if v > max { + return max + } + return v +} + +// ------ OTel-aware slog.Handler wrapper ------ + +// otelHandler wraps another slog.Handler and injects trace_id/span_id when the +// context passed to Handle carries an active OpenTelemetry span. +type otelHandler struct { + base slog.Handler +} + +func newOTelHandler(base slog.Handler) slog.Handler { + return &otelHandler{base: base} +} + +func (h *otelHandler) Enabled(ctx context.Context, level slog.Level) bool { + return h.base.Enabled(ctx, level) +} + +func (h *otelHandler) Handle(ctx context.Context, r slog.Record) error { + span := trace.SpanFromContext(ctx) + if span.SpanContext().IsSampled() { + sc := span.SpanContext() + r = r.Clone() + r.AddAttrs(slog.String("trace_id", sc.TraceID().String()), + slog.String("span_id", sc.SpanID().String())) + } + return h.base.Handle(ctx, r) +} + +func (h *otelHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + return &otelHandler{base: h.base.WithAttrs(attrs)} +} + +func (h *otelHandler) WithGroup(name string) slog.Handler { + return &otelHandler{base: h.base.WithGroup(name)} +} diff --git a/internal/log/log_test.go b/internal/log/log_test.go new file mode 100644 index 00000000..153e8c3c --- /dev/null +++ b/internal/log/log_test.go @@ -0,0 +1,362 @@ +package log + +import ( + "bytes" + "context" + "encoding/json" + "io" + "os" + "strings" + "sync" + "testing" + + "log/slog" +) + +// โ”€โ”€โ”€ Mask โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestMaskEmpty(t *testing.T) { + if got := Mask("", 4); got != "***" { + t.Errorf("expected \"***\", got %q", got) + } +} + +func TestMaskWhitespacesOnly(t *testing.T) { + if got := Mask(" ", 4); got != "***" { + t.Errorf("expected \"***\", got %q", got) + } +} + +func TestMaskShort(t *testing.T) { + if got := Mask("abc", 4); got != "a***" { + t.Errorf("expected \"a***\", got %q", got) + } +} + +func TestMaskLeq8(t *testing.T) { + if got := Mask("abcdefg", 4); got != "a***" { + t.Errorf("expected \"a***\", got %q", got) + } +} + +func TestMaskLong(t *testing.T) { + got := Mask("mysecretkeyvalue", 4) + if !strings.HasPrefix(got, "mys") || !strings.Contains(got, "***") { + t.Errorf("expected prefix+***+suffix, got %q", got) + } +} + +func TestMaskURLCredentialsRedacted(t *testing.T) { + got := Mask("postgres://user:pass@host.db", 4) + if strings.Contains(got, "user:pass") { + t.Errorf("credentials not redacted: %q", got) + } + if !strings.Contains(got, "***@") { + t.Errorf("expected credentials redacted with ***@: %q", got) + } +} + +func TestMaskURLNoCredentials(t *testing.T) { + got := Mask("https://example.com/path", 4) + if strings.Contains(got, "***@") { + t.Errorf("should not contain ***@ for URL without credentials: %q", got) + } +} + +// โ”€โ”€โ”€ parseLevel โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestParseLevelDebug(t *testing.T) { + if lvl := parseLevel("debug"); lvl != slog.LevelDebug { + t.Errorf("expected Debug, got %v", lvl) + } +} + +func TestParseLevelInfo(t *testing.T) { + if lvl := parseLevel("info"); lvl != slog.LevelInfo { + t.Errorf("expected Info, got %v", lvl) + } +} + +func TestParseLevelWarn(t *testing.T) { + if lvl := parseLevel("WARN"); lvl != slog.LevelWarn { + t.Errorf("expected Warn, got %v", lvl) + } +} + +func TestParseLevelError(t *testing.T) { + if lvl := parseLevel("error"); lvl != slog.LevelError { + t.Errorf("expected Error, got %v", lvl) + } +} + +func TestParseLevelInvalid(t *testing.T) { + if lvl := parseLevel("bogus"); lvl != 0 { + t.Errorf("expected 0 for invalid level, got %v", lvl) + } +} + +// โ”€โ”€โ”€ clamp โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestClampMiddle(t *testing.T) { + if got := clamp(5, 3, 10); got != 5 { + t.Errorf("expected 5, got %d", got) + } +} + +func TestClampBelowMin(t *testing.T) { + if got := clamp(1, 3, 10); got != 3 { + t.Errorf("expected 3, got %d", got) + } +} + +func TestClampAboveMax(t *testing.T) { + if got := clamp(20, 3, 10); got != 10 { + t.Errorf("expected 10, got %d", got) + } +} + +// โ”€โ”€โ”€ maskURL โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestMaskURLWithCreds(t *testing.T) { + got := maskURL("postgres://user:pass@localhost.db") + if got != "postgres://***@localhost.db" { + t.Errorf("expected \"postgres://***@localhost.db\", got %q", got) + } +} + +func TestMaskURLNoAt(t *testing.T) { + got := maskURL("https://example.com") + if got != "" { + t.Errorf("expected empty for URL without @, got %q", got) + } +} + +func TestMaskURLNoSchema(t *testing.T) { + got := maskURL("notaweburl") + if got != "" { + t.Errorf("expected empty for string without ://, got %q", got) + } +} + +func TestMaskURLEmptyAfterColonSlashSlash(t *testing.T) { + got := maskURL("://") + if got != "" { + t.Errorf("expected empty for \"://\", got %q", got) + } +} + +// โ”€โ”€โ”€ L returns non-nil โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestLNonNil(t *testing.T) { + if got := L(); got == nil { + t.Error("L() returned nil") + } +} + +// โ”€โ”€โ”€ SetLevel concurrent safety (race detector clean) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestSetLevelConcurrent(t *testing.T) { + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(2) + go func() { SetLevel(slog.LevelDebug); wg.Done() }() + go func() { Info("concurrent-info"); wg.Done() }() + } + wg.Wait() +} + +// โ”€โ”€โ”€ SetLevel actually changes output โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestSetLevelFiltersDebug(t *testing.T) { + lg.Store(slog.New(newOTelHandler(slog.NewJSONHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelInfo})))) + + b := &bytes.Buffer{} + lg.Store(slog.New(newOTelHandler(jsonHandler(b, slog.LevelInfo)))) + + lg.Load().Debug("hidden") + if got := b.String(); got != "" { + t.Errorf("expected debug to be filtered at info level, got %q", got) + } + + SetLevel(slog.LevelDebug) + lg.Load().Debug("visible") + got := b.String() + if got == "" { + t.Error("expected debug message after SetLevel(debug)") + } + var obj map[string]any + requireJSONParse(t, got, &obj) +} + +// โ”€โ”€โ”€ JSON output contains expected keys โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestJSONOutputContainsMsgAndLevel(t *testing.T) { + b := &bytes.Buffer{} + lg.Store(slog.New(newOTelHandler(jsonHandler(b, slog.LevelInfo)))) + Info("hello", "key", "value") + + var obj map[string]any + requireJSONParse(t, b.String(), &obj) + + if lvl, _ := obj["level"].(string); lvl != "INFO" { + t.Errorf("expected level=INFO, got %q", lvl) + } + if msg, _ := obj["msg"].(string); msg != "hello" { + t.Errorf("expected msg=hello, got %q", msg) + } + if kv, _ := obj["key"].(string); kv != "value" { + t.Errorf("expected key=value, got %q", kv) + } +} + +// โ”€โ”€โ”€ Configure with JSON format โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestConfigureJSON(t *testing.T) { + b := &bytes.Buffer{} + lg.Store(slog.New(newOTelHandler(jsonHandler(b, slog.LevelInfo)))) + Info("cfg-test") + var obj map[string]any + requireJSONParse(t, b.String(), &obj) + if msg, _ := obj["msg"].(string); msg != "cfg-test" { + t.Errorf("expected msg=cfg-test, got %q", msg) + } +} + +// โ”€โ”€โ”€ Configure with text format โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestConfigureText(t *testing.T) { + b := &bytes.Buffer{} + lg.Store(slog.New(newOTelHandler(formatHandler("text", b, nil)))) + Info("text-test") + got := b.String() + if !strings.Contains(got, "INFO") || !strings.Contains(got, "text-test") { + t.Errorf("expected text format output, got %q", got) + } +} + +// โ”€โ”€โ”€ Configure with debug level โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestConfigureDebugLevel(t *testing.T) { + b := &bytes.Buffer{} + Configure("json", "debug", false) + lg.Store(slog.New(newOTelHandler(buildHandler(cfg)))) + lg.Store(slog.New(newOTelHandler(jsonHandler(b, slog.LevelDebug)))) + Debug("debug-test") + var obj map[string]any + requireJSONParse(t, b.String(), &obj) + if lvl, _ := obj["level"].(string); lvl != "DEBUG" { + t.Errorf("expected level=DEBUG, got %q", lvl) + } +} + +// โ”€โ”€โ”€ SetFormat runtime switch โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestSetFormat(t *testing.T) { + SetFormat("text") + SetFormat("json") + l := L() + if l == nil { + t.Fatal("L() returned nil after SetFormat") + } +} + +// โ”€โ”€โ”€ Fatal wrapper โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestFatalExits(t *testing.T) { + orig := os.Args + defer func() { os.Args = orig }() + os.Args = []string{os.Args[0]} + + // Fatal calls os.Exit(1) -- we can't directly test it from this process, + // but we can verify that the error log is emitted before exit. + b := &bytes.Buffer{} + lg.Store(slog.New(newOTelHandler(jsonHandler(b, slog.LevelError)))) + + // We can't call Fatal directly because it calls os.Exit(1). + // Instead verify the handler at error level works. + lg.Load().Error("fatal-test") + var obj map[string]any + requireJSONParse(t, b.String(), &obj) + if msg, _ := obj["msg"].(string); msg != "fatal-test" { + t.Errorf("expected msg=fatal-test, got %q", msg) + } +} + +// โ”€โ”€โ”€ traceArgs โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestTraceArgsNoSpan(t *testing.T) { + ctx := context.Background() + got := traceArgs(ctx) + if got != nil { + t.Errorf("expected nil for background context, got %v", got) + } +} + +// โ”€โ”€โ”€ InfoContext passes context โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestInfoContext(t *testing.T) { + ctx := context.Background() + b := &bytes.Buffer{} + lg.Store(slog.New(newOTelHandler(jsonHandler(b, slog.LevelInfo)))) + InfoContext(ctx, "ctx-test", "k", "v") + var obj map[string]any + requireJSONParse(t, b.String(), &obj) + if msg, _ := obj["msg"].(string); msg != "ctx-test" { + t.Errorf("expected msg=ctx-test, got %q", msg) + } +} + +// โ”€โ”€โ”€ WarnContext โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestWarnContext(t *testing.T) { + ctx := context.Background() + b := &bytes.Buffer{} + lg.Store(slog.New(newOTelHandler(jsonHandler(b, slog.LevelInfo)))) + WarnContext(ctx, "warn-ctx") + var obj map[string]any + requireJSONParse(t, b.String(), &obj) + if lvl, _ := obj["level"].(string); lvl != "WARN" { + t.Errorf("expected level=WARN, got %q", lvl) + } +} + +// โ”€โ”€โ”€ ErrorContext โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestErrorContext(t *testing.T) { + ctx := context.Background() + b := &bytes.Buffer{} + lg.Store(slog.New(newOTelHandler(jsonHandler(b, slog.LevelInfo)))) + ErrorContext(ctx, "err-ctx") + var obj map[string]any + requireJSONParse(t, b.String(), &obj) + if lvl, _ := obj["level"].(string); lvl != "ERROR" { + t.Errorf("expected level=ERROR, got %q", lvl) + } +} + +// โ”€โ”€โ”€ DebugContext โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func TestDebugContext(t *testing.T) { + ctx := context.Background() + b := &bytes.Buffer{} + lg.Store(slog.New(newOTelHandler(jsonHandler(b, slog.LevelDebug)))) + DebugContext(ctx, "debug-ctx") + var obj map[string]any + requireJSONParse(t, b.String(), &obj) + if lvl, _ := obj["level"].(string); lvl != "DEBUG" { + t.Errorf("expected level=DEBUG, got %q", lvl) + } +} + +// โ”€โ”€โ”€ Helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +func jsonHandler(w io.Writer, level slog.Level) slog.Handler { + return slog.NewJSONHandler(w, &slog.HandlerOptions{Level: level}) +} + +func requireJSONParse(t *testing.T, raw string, v interface{}) { + t.Helper() + if err := json.NewDecoder(bytes.NewReader([]byte(raw))).Decode(v); err != nil { + t.Fatalf("failed to parse JSON: %v (raw=%q)", err, raw) + } +} diff --git a/internal/middleware/apikey.go b/internal/middleware/apikey.go index 50fa5460..ad6fce86 100644 --- a/internal/middleware/apikey.go +++ b/internal/middleware/apikey.go @@ -4,13 +4,14 @@ import ( "context" "crypto/sha256" "encoding/hex" - "log" "time" "github.com/gofiber/fiber/v2" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" + + "github.com/marco-spagn/pcmi/internal/log" ) // apiKeyDB is satisfied by *pgxpool.Pool and pgxmock pools in tests. @@ -74,13 +75,13 @@ func apiKeyMiddleware(db apiKeyDB) fiber.Handler { // a DB hiccup the request proceeds without tenant isolation, risking // cross-tenant data exposure. Return 503 instead. if _, err := db.Exec(c.Context(), "SELECT set_tenant_context($1::uuid)", tenantID); err != nil { - log.Printf("โŒ set_tenant_context failed for tenant=%s: %v", tenantID, err) + log.Error("set_tenant_context failed", "tenant", tenantID, "err", err) return c.Status(503).JSON(fiber.Map{"error": "tenant context unavailable, please retry"}) } touchAPIKeyLastUsed(db, apiKeyID, c.IP()) - log.Printf("๐Ÿ”‘ API Key authenticated โ†’ tenant=%s, role=%s", tenantID, role) + log.Debug("api key authenticated", "tenant", tenantID, "role", role) return c.Next() } } diff --git a/internal/middleware/audit.go b/internal/middleware/audit.go index 8bf5ca23..583acd83 100644 --- a/internal/middleware/audit.go +++ b/internal/middleware/audit.go @@ -2,11 +2,12 @@ package middleware import ( "context" - "log" "time" "github.com/gofiber/fiber/v2" "github.com/jackc/pgx/v5/pgxpool" + + "github.com/marco-spagn/pcmi/internal/log" ) type AuditMiddleware struct { @@ -57,10 +58,10 @@ func (m *AuditMiddleware) Middleware() fiber.Handler { ) if dbErr != nil { - log.Printf("โš ๏ธ Audit log failed: %v", dbErr) + log.Warn("audit log insert failed", "err", dbErr) } - log.Printf("๐Ÿ“Š Audit: %s %s [%d] %s", c.Method(), c.Path(), c.Response().StatusCode(), time.Since(start)) + log.Debug("api request audited", "method", c.Method(), "path", c.Path(), "status", c.Response().StatusCode(), "duration", time.Since(start)) return err } } diff --git a/internal/middleware/idempotency.go b/internal/middleware/idempotency.go index fae22e5b..dd17a4af 100644 --- a/internal/middleware/idempotency.go +++ b/internal/middleware/idempotency.go @@ -3,13 +3,13 @@ package middleware import ( "context" "encoding/json" - "log" "strings" "github.com/gofiber/fiber/v2" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" + "github.com/marco-spagn/pcmi/internal/log" "github.com/marco-spagn/pcmi/internal/repository" ) @@ -49,7 +49,7 @@ func NewIdempotencyMiddleware(cache IdempotencyCache) fiber.Handler { cached, ok, err := cache.Get(c.Context(), tenantID, key) if err != nil { - log.Printf("โŒ idempotency lookup: %v", err) + log.Error("idempotency lookup failed", "err", err) return c.Status(500).JSON(fiber.Map{"error": "idempotency lookup failed"}) } if ok { @@ -69,7 +69,7 @@ func NewIdempotencyMiddleware(cache IdempotencyCache) fiber.Handler { return nil } if putErr := cache.Put(c.Context(), tenantID, key, body); putErr != nil { - log.Printf("โš ๏ธ idempotency cache store: %v", putErr) + log.Warn("idempotency cache store failed", "err", putErr) } return nil } diff --git a/internal/middleware/metrics_auth.go b/internal/middleware/metrics_auth.go index c99bd739..126e343c 100644 --- a/internal/middleware/metrics_auth.go +++ b/internal/middleware/metrics_auth.go @@ -1,10 +1,11 @@ package middleware import ( - "log" "strings" "github.com/gofiber/fiber/v2" + + "github.com/marco-spagn/pcmi/internal/log" ) const metricsBearerPrefix = "Bearer " @@ -12,7 +13,7 @@ const metricsBearerPrefix = "Bearer " // LogMetricsScrapeAuthState logs a startup warning when METRICS_SCRAPE_TOKEN is unset. func LogMetricsScrapeAuthState(scrapeToken string) { if scrapeToken == "" { - log.Println("โš ๏ธ WARNING: METRICS_SCRAPE_TOKEN not set; GET /metrics is open without authentication") + log.Warn("METRICS_SCRAPE_TOKEN not set, GET /metrics is open without authentication") } } diff --git a/internal/middleware/ratelimit.go b/internal/middleware/ratelimit.go index 8fb263bf..4d828390 100644 --- a/internal/middleware/ratelimit.go +++ b/internal/middleware/ratelimit.go @@ -1,7 +1,6 @@ package middleware import ( - "log" "strings" "time" @@ -9,6 +8,7 @@ import ( "github.com/gofiber/fiber/v2/middleware/limiter" "github.com/marco-spagn/pcmi/internal/config" "github.com/marco-spagn/pcmi/internal/event" + "github.com/marco-spagn/pcmi/internal/log" pcmiratelimit "github.com/marco-spagn/pcmi/internal/ratelimit" "github.com/redis/go-redis/v9" ) @@ -38,9 +38,7 @@ func RateLimitMiddleware(cfg *config.Config) fiber.Handler { // FIX-10: warn operators that in-memory rate limiting does not share // counters across API replicas. With N replicas the effective limit is // N ร— configured limit. This is silent in production โ€” log it clearly. - log.Println("โš ๏ธ Rate limiter: using in-memory backend (RATE_LIMIT_BACKEND=memory)." + - " Counters are NOT shared across replicas. For multi-replica deployments" + - " set RATE_LIMIT_BACKEND=redis to enforce accurate per-key limits.") + log.Warn("rate limiter using in-memory backend, counters NOT shared across replicas. Set RATE_LIMIT_BACKEND=redis for multi-replica deployments") readonlyH := newRoleLimiter(roleRPM(cfg, "readonly")) writeH := newRoleLimiter(roleRPM(cfg, "write")) diff --git a/internal/service/memory_service.go b/internal/service/memory_service.go index 80939f92..35bcab2d 100644 --- a/internal/service/memory_service.go +++ b/internal/service/memory_service.go @@ -3,12 +3,12 @@ package service import ( "context" "fmt" - "log" "strings" "time" "github.com/marco-spagn/pcmi/internal/embedding" "github.com/marco-spagn/pcmi/internal/event" + "github.com/marco-spagn/pcmi/internal/log" "github.com/marco-spagn/pcmi/internal/model" "github.com/marco-spagn/pcmi/internal/repository" ) @@ -37,7 +37,7 @@ func NewMemoryService(repo repository.MemoryRepo, embedder embedding.Provider, d func (s *MemoryService) Store(ctx context.Context, req *model.StoreRequest, tenantID string) (*StoreResult, error) { path := strings.TrimSpace(req.Path) - log.Printf("๐Ÿ“ฅ [SERVICE] Store โ€” tenant=%s path=%s", tenantID, path) + log.Debug("[service] store", "tenant", tenantID, "path", path) if res, handled, err := s.tryDedup(ctx, req, tenantID, path); handled { return res, err @@ -73,13 +73,13 @@ func (s *MemoryService) Store(ctx context.Context, req *model.StoreRequest, tena if supersededID != nil { eventType = event.EventMemoryUpdated payload["superseded_id"] = *supersededID - log.Printf("๐Ÿ“ฃ [REDIS] memory.updated id=%d version=%d superseded=%d", id, version, *supersededID) + log.Debug("[redis] memory.updated", "id", id, "version", version, "superseded", *supersededID) } else { - log.Printf("๐Ÿ“ฃ [REDIS] memory.stored id=%d version=%d", id, version) + log.Debug("[redis] memory.stored", "id", id, "version", version) } if err := event.PublishEvent(eventType, payload); err != nil { - log.Printf("โŒ [REDIS] publish: %v", err) + log.Error("[redis] publish failed", "err", err) } return &StoreResult{ @@ -139,19 +139,19 @@ func (s *MemoryService) tryDedup(ctx context.Context, req *model.StoreRequest, t switch mode { case model.DedupModeSkip: if samePath { - log.Printf("โญ๏ธ [DEDUP] skip tenant=%s path=%s id=%d", tenantID, path, existing.ID) + log.Debug("[dedup] skip", "tenant", tenantID, "path", path, "id", existing.ID) return makeResult(existing, model.StoreActionSkipped, ""), true, nil } return nil, false, nil case model.DedupModeLink: if samePath { - log.Printf("โญ๏ธ [DEDUP] skip (link mode, same path) tenant=%s path=%s id=%d", tenantID, path, existing.ID) + log.Debug("[dedup] skip link mode same path", "tenant", tenantID, "path", path, "id", existing.ID) return makeResult(existing, model.StoreActionSkipped, ""), true, nil } if err := s.repo.UpsertDedupLink(ctx, tenantID, path, existing.Path); err != nil { return nil, true, err } - log.Printf("๐Ÿ”— [DEDUP] link %s -> %s tenant=%s", path, existing.Path, tenantID) + log.Info("[dedup] linked", "path", path, "target", existing.Path, "tenant", tenantID) return makeResult(existing, model.StoreActionLinked, path), true, nil case model.DedupModeMerge: if !samePath { @@ -161,7 +161,7 @@ func (s *MemoryService) tryDedup(ctx context.Context, req *model.StoreRequest, t if err != nil { return nil, true, err } - log.Printf("๐Ÿ”€ [DEDUP] merge metadata tenant=%s path=%s id=%d", tenantID, path, merged.ID) + log.Info("[dedup] merged metadata", "tenant", tenantID, "path", path, "id", merged.ID) return makeResult(merged, model.StoreActionMerged, ""), true, nil default: return nil, false, nil @@ -173,7 +173,7 @@ func (s *MemoryService) Retrieve(ctx context.Context, req *model.RetrieveRequest if q := strings.TrimSpace(req.Query); q != "" && s.embedder != nil { emb, err := s.embedder.Generate(ctx, q) if err != nil { - log.Printf("โš ๏ธ semantic retrieve fallback (embedding error): %v", err) + log.Warn("semantic retrieve fallback", "err", err) } else { queryEmbedding = emb } diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 5dd17ed6..ec5a5992 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -5,11 +5,11 @@ package telemetry import ( "context" "fmt" - "log" "strings" "time" "github.com/marco-spagn/pcmi/internal/config" + "github.com/marco-spagn/pcmi/internal/log" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/propagation" @@ -74,6 +74,6 @@ func Init(ctx context.Context, cfg *config.Config, defaultServiceName string) (s sdktrace.WithResource(res), ) otel.SetTracerProvider(tp) - log.Printf("OpenTelemetry: OTLP/HTTP traces โ†’ %s (service=%s)", endpoint, serviceName) + log.Info("otel configured", "endpoint", endpoint, "service", serviceName) return tp.Shutdown, nil } diff --git a/internal/webhook/dispatcher.go b/internal/webhook/dispatcher.go index 46a1b010..c2c11fa0 100644 --- a/internal/webhook/dispatcher.go +++ b/internal/webhook/dispatcher.go @@ -3,13 +3,13 @@ package webhook import ( "context" "encoding/json" - "log" "net/http" "sync" "time" "github.com/jackc/pgx/v5/pgxpool" + "github.com/marco-spagn/pcmi/internal/log" "github.com/marco-spagn/pcmi/internal/metrics" ) @@ -73,7 +73,7 @@ func (d *Dispatcher) NotifyMatching(tenantID, eventType string, payload map[stri defer cancel() eps, err := d.listEndpoints(ctx, tenantID, eventType) if err != nil { - log.Printf("webhook list: %v", err) + log.Error("webhook list endpoints failed", "err", err) return } body, err := json.Marshal(map[string]any{ @@ -87,7 +87,7 @@ func (d *Dispatcher) NotifyMatching(tenantID, eventType string, payload map[stri } for _, ep := range eps { if err := d.enqueue(ctx, tenantID, ep, eventType, payload, body); err != nil { - log.Printf("webhook enqueue %s: %v", ep.ID, err) + log.Error("webhook enqueue failed", "endpoint", ep.ID, "err", err) } } }() @@ -174,7 +174,7 @@ func (d *Dispatcher) processPending() { LIMIT 20 FOR UPDATE SKIP LOCKED`) if err != nil { - log.Printf("webhook pending query: %v", err) + log.Error("webhook pending delivery query failed", "err", err) return } defer rows.Close() @@ -200,7 +200,7 @@ func (d *Dispatcher) processPending() { batch = append(batch, pd) } if err := rows.Err(); err != nil { - log.Printf("webhook pending scan: %v", err) + log.Error("webhook pending delivery scan failed", "err", err) return } @@ -215,7 +215,7 @@ func (d *Dispatcher) attemptDelivery(ctx context.Context, pd pendingDelivery) { // different tenant set by the previous caller. if pd.TenantID != "" { if _, err := d.db.Exec(ctx, "SELECT set_tenant_context($1::uuid)", pd.TenantID); err != nil { - log.Printf("webhook attempt: set_tenant_context failed for delivery %s: %v", pd.ID, err) + log.Error("webhook set_tenant_context failed", "delivery", pd.ID, "err", err) return } } @@ -235,7 +235,7 @@ func (d *Dispatcher) attemptDelivery(ctx context.Context, pd pendingDelivery) { UPDATE webhook_deliveries SET status = 'dead_letter', attempts = $2, last_error = $3 WHERE id = $1::uuid`, pd.ID, attempts, errMsg) - log.Printf("webhook %s dead-letter after %d attempts: %v", pd.ID, attempts, err) + log.Error("webhook delivery dead-lettered", "id", pd.ID, "attempts", attempts, "err", err) metrics.IncWebhookDeadLetter() return } diff --git a/internal/worker/consolidation.go b/internal/worker/consolidation.go index 79ead268..915ad76c 100644 --- a/internal/worker/consolidation.go +++ b/internal/worker/consolidation.go @@ -3,13 +3,13 @@ package worker import ( "context" "fmt" - "log" "strings" "sync" "time" "github.com/jackc/pgx/v5/pgxpool" + "github.com/marco-spagn/pcmi/internal/log" "github.com/marco-spagn/pcmi/internal/model" "github.com/marco-spagn/pcmi/internal/repository" ) @@ -53,7 +53,7 @@ func (w *ConsolidationWorker) TriggerForMemory(tenantID, path string) { defer cancel() _, _ = w.db.Exec(ctx, "SELECT set_tenant_context($1::uuid)", tenantID) if err := w.tryConsolidate(ctx, tenantID, prefix); err != nil { - log.Printf("โš ๏ธ consolidation trigger: %v", err) + log.Warn("consolidation trigger failed", "err", err) } }() } @@ -154,7 +154,7 @@ func (w *ConsolidationWorker) runConsolidation(ctx context.Context, tenantID, pr if err != nil { return err } - log.Printf("โœ… consolidated tenant=%s prefix=%s entry_id=%d version=%d sources=%d", tenantID, prefix, id, ver, len(ids)) + log.Info("memories consolidated", "tenant", tenantID, "prefix", prefix, "entry_id", id, "version", ver, "sources", len(ids)) return nil } diff --git a/internal/worker/distillation.go b/internal/worker/distillation.go index 6f26d4a1..17f7fe2f 100644 --- a/internal/worker/distillation.go +++ b/internal/worker/distillation.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "errors" - "log" "strings" "time" @@ -12,6 +11,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/marco-spagn/pcmi/internal/config" "github.com/marco-spagn/pcmi/internal/event" + "github.com/marco-spagn/pcmi/internal/log" "github.com/marco-spagn/pcmi/internal/metrics" "github.com/sashabaranov/go-openai" ) @@ -39,9 +39,9 @@ func NewDistillationWorker(db *pgxpool.Pool, cfg *config.Config) *DistillationWo concurrency = distillationConcurrencyFrom(cfg.DistillationConcurrency) } if apiKey == "" { - log.Println("โš ๏ธ OPENAI_API_KEY unset โ€” distillation LLM calls will fail") + log.Warn("OPENAI_API_KEY unset, distillation LLM calls will fail") } - log.Printf("๐Ÿ”ง Distillation concurrency limit: %d parallel LLM jobs", concurrency) + log.Info("distillation worker initialized", "concurrency", concurrency) return &DistillationWorker{ db: db, openai: openai.NewClient(apiKey), @@ -97,7 +97,7 @@ func (w *DistillationWorker) runDistillationJob(tenantID, memoryPath string) { } func (w *DistillationWorker) Start(ctx context.Context) { - log.Printf("๐Ÿš€ Distillation Engine v1.7 started โ€” Redis-driven + fallback timer (batch_size=%d)", w.batchSize) + log.Info("distillation engine started", "batch_size", w.batchSize) ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() @@ -107,10 +107,10 @@ func (w *DistillationWorker) Start(ctx context.Context) { for { select { case <-ctx.Done(): - log.Println("๐Ÿ›‘ Distillation worker stopped") + log.Info("distillation worker stopped") return case <-ticker.C: - log.Println("โฐ Fallback timer: periodic distillation for default tenant") + log.Debug("distillation fallback timer tick") w.runDistillationJob(defaultTenant, "root.test") } } @@ -129,17 +129,17 @@ func (w *DistillationWorker) runDistillationJobWithPrefix(tenantID, pathPrefix s defer cancel() if _, err := w.db.Exec(ctx, "SELECT set_tenant_context($1::uuid)", tenantID); err != nil { - log.Printf("โŒ distillation set tenant: %v", err) + log.Error("distillation set tenant failed", "err", err) metrics.ObserveDistillationJob(time.Since(start).Seconds(), "error") return } batchSize := w.batchSize - log.Printf("๐Ÿ”„ Distillation job tenant=%s path_prefix=%s batch_size=%d", tenantID, pathPrefix, batchSize) + log.Info("distillation job started", "tenant", tenantID, "path_prefix", pathPrefix, "batch_size", batchSize) rows, err := w.db.Query(ctx, distillationSourceEntriesSQL(), tenantID, pathPrefix, batchSize) if err != nil { - log.Printf("โŒ distillation query error: %v", err) + log.Error("distillation query failed", "err", err) return } defer rows.Close() @@ -168,7 +168,7 @@ func (w *DistillationWorker) runDistillationJobWithPrefix(tenantID, pathPrefix s } if len(entries) < 1 { - log.Printf("๐Ÿ“Š No memories under %s โ€” distillation skipped", pathPrefix) + log.Info("distillation skipped, no source memories", "path_prefix", pathPrefix) metrics.ObserveDistillationJob(time.Since(start).Seconds(), "skipped") return } @@ -176,12 +176,12 @@ func (w *DistillationWorker) runDistillationJobWithPrefix(tenantID, pathPrefix s metrics.ObserveDistillationSources(len(entries)) if w.apiKey == "" { - log.Println("โš ๏ธ Skipping LLM distillation (no OPENAI_API_KEY)") + log.Warn("skipping LLM distillation, no OPENAI_API_KEY set") metrics.ObserveDistillationJob(time.Since(start).Seconds(), "skipped") return } - log.Printf("๐Ÿง  Distilling %d raw memories under %s...", len(entries), pathPrefix) + log.Info("distilling memories", "count", len(entries), "path_prefix", pathPrefix) prompt := `Summarize these memories into higher-order knowledge. Return ONLY valid JSON: @@ -207,7 +207,7 @@ Return ONLY valid JSON: }, }) if err != nil { - log.Printf("โŒ LLM distillation error: %v", err) + log.Error("LLM distillation error", "err", err) metrics.ObserveDistillationJob(time.Since(start).Seconds(), "error") return } @@ -215,13 +215,13 @@ Return ONLY valid JSON: // BUG-FIX-1: guard against empty Choices (OpenAI API can return 0 choices on // content-filter or rate-limit soft errors without returning an HTTP error code). if len(resp.Choices) == 0 { - log.Printf("โŒ LLM returned 0 choices (finish_reason may be 'content_filter')") + log.Error("LLM returned 0 choices (finish_reason may be content_filter)") metrics.ObserveDistillationJob(time.Since(start).Seconds(), "error") return } result, err := parseDistillationLLMResponse(resp.Choices[0].Message.Content) if err != nil { - log.Printf("โŒ JSON parse error: %v (raw: %s)", err, resp.Choices[0].Message.Content) + log.Error("LLM response JSON parse failed", "err", err, "raw", resp.Choices[0].Message.Content) metrics.ObserveDistillationJob(time.Since(start).Seconds(), "error") return } @@ -234,24 +234,24 @@ Return ONLY valid JSON: dup, err := w.hasDuplicateDistillation(ctx, tenantID, distilledPath, sourceIDs) if err != nil { - log.Printf("โŒ distillation dedup check: %v", err) + log.Error("distillation dedup check failed", "err", err) return } if dup { - log.Printf("โญ๏ธ Distillation skipped (duplicate sources) path=%s tenant=%s", distilledPath, tenantID) + log.Info("distillation skipped, duplicate sources", "path", distilledPath, "tenant", tenantID) metrics.ObserveDistillationJob(time.Since(start).Seconds(), "duplicate") return } version, err := nextDistilledVersion(ctx, w.db, tenantID, distilledPath) if err != nil { - log.Printf("โŒ distillation version: %v", err) + log.Error("distillation version lookup failed", "err", err) return } insightsJSON, err := json.Marshal(result.Insights) if err != nil { - log.Printf("โŒ insights marshal: %v", err) + log.Error("distillation insights marshal failed", "err", err) return } @@ -270,7 +270,7 @@ Return ONLY valid JSON: version, ).Scan(&distilledID) if err != nil { - log.Printf("โŒ insert distilled error: %v", err) + log.Error("insert distilled knowledge failed", "err", err) return } @@ -281,12 +281,11 @@ Return ONLY valid JSON: "version": version, "sources": len(sourceIDs), }); err != nil { - log.Printf("โš ๏ธ distilled event publish: %v", err) + log.Warn("distilled event publish failed", "err", err) } metrics.ObserveDistillationJob(time.Since(start).Seconds(), "ok") - log.Printf("โœ… Distillation saved id=%d at %s v%d (tenant=%s, sources=%d)", - distilledID, distilledPath, version, tenantID, len(sourceIDs)) + log.Info("distillation saved", "id", distilledID, "path", distilledPath, "version", version, "tenant", tenantID, "sources", len(sourceIDs)) // FIX-4: update distillation_runs row to 'completed' when called from // the policy engine (runID > 0). Standalone Redis-triggered calls pass @@ -333,6 +332,6 @@ func (w *DistillationWorker) markRunCompleted(ctx context.Context, tenantID stri )`, tenantID, tenantID, distilledID) if err != nil { // Non-fatal: the distillate was saved; only the audit row is wrong. - log.Printf("โš ๏ธ markRunCompleted: %v", err) + log.Warn("markRunCompleted failed", "err", err) } } diff --git a/internal/worker/distillation_helpers.go b/internal/worker/distillation_helpers.go index df05e2f9..f14bc4bb 100644 --- a/internal/worker/distillation_helpers.go +++ b/internal/worker/distillation_helpers.go @@ -2,10 +2,11 @@ package worker import ( "encoding/json" - "log" "regexp" "slices" "strings" + + "github.com/marco-spagn/pcmi/internal/log" ) // defaultDistillationBatchSize is the max number of raw memories per distillation job. @@ -20,7 +21,7 @@ func distillationBatchSizeFrom(cfgBatch int) int { return cfgBatch } if cfgBatch != 0 { - log.Printf("โš ๏ธ DISTILLATION_BATCH_SIZE=%d invalid, using default %d", cfgBatch, defaultDistillationBatchSize) + log.Warn("distillation batch size invalid, using default", "configured", cfgBatch, "default", defaultDistillationBatchSize) } return defaultDistillationBatchSize } @@ -30,7 +31,7 @@ func distillationConcurrencyFrom(cfgConcurrency int) int { return cfgConcurrency } if cfgConcurrency != 0 { - log.Printf("โš ๏ธ DISTILLATION_CONCURRENCY=%d invalid, using default %d", cfgConcurrency, defaultDistillationConcurrency) + log.Warn("distillation concurrency invalid, using default", "configured", cfgConcurrency, "default", defaultDistillationConcurrency) } return defaultDistillationConcurrency } diff --git a/internal/worker/distillation_policy.go b/internal/worker/distillation_policy.go index 068abe8b..6a82c1dd 100644 --- a/internal/worker/distillation_policy.go +++ b/internal/worker/distillation_policy.go @@ -3,13 +3,13 @@ package worker import ( "context" "errors" - "log" "strings" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + "github.com/marco-spagn/pcmi/internal/log" "github.com/marco-spagn/pcmi/internal/model" ) @@ -61,7 +61,7 @@ func (e *DistillationPolicyEngine) Start(ctx context.Context) { } ticker := time.NewTicker(e.ticker) defer ticker.Stop() - log.Println("๐Ÿš€ Distillation policy engine started (periodic age/count evaluation)") + log.Info("distillation policy engine started") for { select { case <-ctx.Done(): @@ -82,7 +82,7 @@ func (e *DistillationPolicyEngine) OnMemoryEvent(tenantID, memoryPath string) { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() if err := e.evaluatePrefix(ctx, tenantID, prefix); err != nil { - log.Printf("โš ๏ธ distillation policy evaluate: %v", err) + log.Warn("distillation policy evaluate failed", "err", err) } }() } @@ -187,20 +187,19 @@ func (e *DistillationPolicyEngine) maybeTrigger(ctx context.Context, p model.Dis } runID, err := e.beginRun(ctx, p, st.MemoryCount) if err != nil { - log.Printf("โŒ distillation policy run begin: %v", err) + log.Error("distillation policy run begin failed", "err", err) return } if err := e.markPolicyTriggered(ctx, p.ID); err != nil { - log.Printf("โš ๏ธ distillation policy last_triggered_at: %v", err) + log.Warn("distillation policy last_triggered_at update failed", "err", err) } if e.dist != nil { e.dist.TriggerForPrefix(p.TenantID, p.PathPrefix) } if err := e.completeRun(ctx, runID, "completed", nil); err != nil { - log.Printf("โš ๏ธ distillation policy run complete: %v", err) + log.Warn("distillation policy run complete update failed", "err", err) } - log.Printf("โœ… distillation policy %d triggered (%s) tenant=%s prefix=%s count=%d", - p.ID, reason, p.TenantID, p.PathPrefix, st.MemoryCount) + log.Info("distillation policy triggered", "id", p.ID, "reason", reason, "tenant", p.TenantID, "path_prefix", p.PathPrefix, "memory_count", st.MemoryCount) } func (e *DistillationPolicyEngine) beginRun(ctx context.Context, p model.DistillationPolicy, sourceCount int) (int64, error) { diff --git a/internal/worker/expiry.go b/internal/worker/expiry.go index a6c258d4..aff87835 100644 --- a/internal/worker/expiry.go +++ b/internal/worker/expiry.go @@ -2,11 +2,11 @@ package worker import ( "context" - "log" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/marco-spagn/pcmi/internal/config" + "github.com/marco-spagn/pcmi/internal/log" ) // ExpiryWorker periodically soft-closes memories whose TTL has passed. @@ -24,13 +24,13 @@ func NewExpiryWorker(db *pgxpool.Pool, cfg *config.Config) *ExpiryWorker { } func (w *ExpiryWorker) Start(ctx context.Context) { - log.Printf("๐Ÿ• Expiry worker started (interval=%s)", w.interval) + log.Info("expiry worker started", "interval", w.interval) ticker := time.NewTicker(w.interval) defer ticker.Stop() for { select { case <-ctx.Done(): - log.Println("๐Ÿ›‘ Expiry worker stopped") + log.Info("expiry worker stopped") return case <-ticker.C: w.runOnce() @@ -47,19 +47,19 @@ func (w *ExpiryWorker) runOnce() { AND metadata ? 'ttl_seconds' AND created_at + (metadata->>'ttl_seconds')::int * interval '1 second' < NOW()`) if err != nil { - log.Printf("โŒ expiry error: %v", err) + log.Error("expiry worker execution failed", "err", err) return } if tag.RowsAffected() > 0 { - log.Printf("๐Ÿ• Expired %d memories", tag.RowsAffected()) + log.Info("memories expired", "count", tag.RowsAffected()) } idemTag, err := w.db.Exec(ctx, `DELETE FROM idempotency_cache WHERE expires_at <= NOW()`) if err != nil { - log.Printf("โŒ idempotency cache cleanup: %v", err) + log.Error("idempotency cache cleanup failed", "err", err) return } if idemTag.RowsAffected() > 0 { - log.Printf("๐Ÿ• Purged %d expired idempotency cache rows", idemTag.RowsAffected()) + log.Info("idempotency cache rows purged", "count", idemTag.RowsAffected()) } } diff --git a/internal/worker/pruning.go b/internal/worker/pruning.go index 4e214c25..6167267b 100644 --- a/internal/worker/pruning.go +++ b/internal/worker/pruning.go @@ -2,11 +2,11 @@ package worker import ( "context" - "log" "time" "github.com/jackc/pgx/v5/pgxpool" "github.com/marco-spagn/pcmi/internal/config" + "github.com/marco-spagn/pcmi/internal/log" ) // PruningWorker periodically deletes superseded memory rows past retention. @@ -31,14 +31,14 @@ func NewPruningWorker(db *pgxpool.Pool, cfg *config.Config) *PruningWorker { } func (w *PruningWorker) Start(ctx context.Context) { - log.Printf("๐Ÿงน Pruning worker started (retention=%dd, interval=%s)", w.retentionDays, w.interval) + log.Info("pruning worker started", "retention_days", w.retentionDays, "interval", w.interval) ticker := time.NewTicker(w.interval) defer ticker.Stop() w.runOnce() for { select { case <-ctx.Done(): - log.Println("๐Ÿ›‘ Pruning worker stopped") + log.Info("pruning worker stopped") return case <-ticker.C: w.runOnce() @@ -51,10 +51,10 @@ func (w *PruningWorker) runOnce() { var n int err := w.db.QueryRow(ctx, "SELECT prune_superseded_memories($1)", w.retentionDays).Scan(&n) if err != nil { - log.Printf("โŒ pruning: %v", err) + log.Error("pruning execution failed", "err", err) return } if n > 0 { - log.Printf("๐Ÿงน Pruned %d superseded memory row(s)", n) + log.Info("superseded memories pruned", "count", n) } }