package observability import ( "context" "encoding/json" "log" "net/http" "regexp" "strconv" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/forgeo/forgebucket/internal/events" ) // ── Metric definitions ──────────────────────────────────────────────────────── var ( HttpRequestsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "forgebucket_http_requests_total", Help: "Total HTTP requests by method, normalized path, and status code.", }, []string{"method", "path", "status"}) HttpRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "forgebucket_http_request_duration_seconds", Help: "HTTP request latency by method and normalized path.", Buckets: prometheus.DefBuckets, }, []string{"method", "path"}) PipelineRunsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "forgebucket_pipeline_runs_total", Help: "Pipeline runs by terminal status.", }, []string{"status"}) DeploymentsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "forgebucket_deployments_total", Help: "Deployments by terminal status.", }, []string{"status"}) ActivePipelineRuns = promauto.NewGauge(prometheus.GaugeOpts{ Name: "forgebucket_active_pipeline_runs", Help: "Pipeline runs currently in queued or running state.", }) ) func init() { // Pre-initialize all label combinations so the metrics are visible in // /metrics immediately from startup (no gaps on first scrape). for _, s := range []string{"succeeded", "failed", "cancelled"} { PipelineRunsTotal.With(prometheus.Labels{"status": s}) } for _, s := range []string{"pending", "success", "failure", "cancelled"} { DeploymentsTotal.With(prometheus.Labels{"status": s}) } } // ── HTTP instrumentation middleware ────────────────────────────────────────── type statusRecorder struct { http.ResponseWriter status int } func (r *statusRecorder) WriteHeader(code int) { r.status = code r.ResponseWriter.WriteHeader(code) } // Middleware records request count and latency for every HTTP request. // Path labels are normalized to prevent high cardinality (numeric segments // and positional path variables are replaced with placeholder tokens). func Middleware() func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() rec := &statusRecorder{ResponseWriter: w, status: http.StatusOK} next.ServeHTTP(rec, r) path := normalizePath(r.URL.Path) status := strconv.Itoa(rec.status) elapsed := time.Since(start).Seconds() HttpRequestsTotal.WithLabelValues(r.Method, path, status).Inc() HttpRequestDuration.WithLabelValues(r.Method, path).Observe(elapsed) }) } } // normalizePath replaces volatile path segments with placeholders so that // Prometheus label cardinality stays bounded. // // Examples: // // /api/v1/repos/alice/myrepo/runs/42/jobs/7/logs // → /api/v1/repos/:owner/:repo/runs/:id/jobs/:id/logs // // /alice/myrepo.git/info/refs // → /:owner/:repo.git/info/refs var reNumeric = regexp.MustCompile(`/\d+`) func normalizePath(path string) string { // Replace all-numeric segments first. path = reNumeric.ReplaceAllString(path, "/:id") // Normalize repo smart-HTTP paths: /{owner}/{repo}.git/... path = reGitPath.ReplaceAllString(path, "/:owner/:repo.git$1") // Normalize /api/v1/repos/{owner}/{repo}/... path = reRepoPath.ReplaceAllString(path, "/api/v1/repos/:owner/:repo$1") // Normalize /api/v1/workspaces/{handle}/... path = reWorkspacePath.ReplaceAllString(path, "/api/v1/workspaces/:handle$1") return path } var ( reGitPath = regexp.MustCompile(`^/[^/]+/[^/]+\.git(/.*)$`) reRepoPath = regexp.MustCompile(`^/api/v1/repos/[^/]+/[^/]+(/.*)$`) reWorkspacePath = regexp.MustCompile(`^/api/v1/workspaces/[^/]+(/.*)$`) ) // ── NATS event watcher ──────────────────────────────────────────────────────── // StartNATSWatcher subscribes to pipeline and deployment NATS events and // increments the corresponding Prometheus counters. Runs until ctx is cancelled. func StartNATSWatcher(ctx context.Context, bus events.EventBus) { type statusPayload struct { Status string `json:"status"` } unsub1, err := bus.Subscribe("pipeline.>", func(subject string, data []byte) { switch subject { case events.SubjectPipelineTriggered: ActivePipelineRuns.Inc() case events.SubjectPipelineCompleted: ActivePipelineRuns.Dec() PipelineRunsTotal.WithLabelValues("succeeded").Inc() case events.SubjectPipelineFailed: ActivePipelineRuns.Dec() PipelineRunsTotal.WithLabelValues("failed").Inc() } }) if err != nil { log.Printf("observability: subscribe pipeline.*: %v", err) } else { defer unsub1() } unsub2, err := bus.Subscribe("deployment.>", func(subject string, data []byte) { var p statusPayload json.Unmarshal(data, &p) //nolint:errcheck switch subject { case events.SubjectDeploymentSucceeded: DeploymentsTotal.WithLabelValues("success").Inc() case events.SubjectDeploymentFailed: DeploymentsTotal.WithLabelValues("failure").Inc() case events.SubjectDeploymentStarted: DeploymentsTotal.WithLabelValues("pending").Inc() } }) if err != nil { log.Printf("observability: subscribe deployment.*: %v", err) } else { defer unsub2() } log.Printf("observability: NATS metric watcher started") <-ctx.Done() }