Files
ForgeBucket/internal/observability/metrics.go
T
2026-05-12 20:32:30 +02:00

173 lines
5.7 KiB
Go

package observability
import (
"context"
"encoding/json"
"log"
"net/http"
"regexp"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/forgeo/forgebucket/internal/events"
)
// ── Metric definitions ────────────────────────────────────────────────────────
var (
HttpRequestsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "forgebucket_http_requests_total",
Help: "Total HTTP requests by method, normalized path, and status code.",
}, []string{"method", "path", "status"})
HttpRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "forgebucket_http_request_duration_seconds",
Help: "HTTP request latency by method and normalized path.",
Buckets: prometheus.DefBuckets,
}, []string{"method", "path"})
PipelineRunsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "forgebucket_pipeline_runs_total",
Help: "Pipeline runs by terminal status.",
}, []string{"status"})
DeploymentsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "forgebucket_deployments_total",
Help: "Deployments by terminal status.",
}, []string{"status"})
ActivePipelineRuns = promauto.NewGauge(prometheus.GaugeOpts{
Name: "forgebucket_active_pipeline_runs",
Help: "Pipeline runs currently in queued or running state.",
})
)
func init() {
// Pre-initialize all label combinations so the metrics are visible in
// /metrics immediately from startup (no gaps on first scrape).
for _, s := range []string{"succeeded", "failed", "cancelled"} {
PipelineRunsTotal.With(prometheus.Labels{"status": s})
}
for _, s := range []string{"pending", "success", "failure", "cancelled"} {
DeploymentsTotal.With(prometheus.Labels{"status": s})
}
}
// ── HTTP instrumentation middleware ──────────────────────────────────────────
type statusRecorder struct {
http.ResponseWriter
status int
}
func (r *statusRecorder) WriteHeader(code int) {
r.status = code
r.ResponseWriter.WriteHeader(code)
}
// Middleware records request count and latency for every HTTP request.
// Path labels are normalized to prevent high cardinality (numeric segments
// and positional path variables are replaced with placeholder tokens).
func Middleware() func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
rec := &statusRecorder{ResponseWriter: w, status: http.StatusOK}
next.ServeHTTP(rec, r)
path := normalizePath(r.URL.Path)
status := strconv.Itoa(rec.status)
elapsed := time.Since(start).Seconds()
HttpRequestsTotal.WithLabelValues(r.Method, path, status).Inc()
HttpRequestDuration.WithLabelValues(r.Method, path).Observe(elapsed)
})
}
}
// normalizePath replaces volatile path segments with placeholders so that
// Prometheus label cardinality stays bounded.
//
// Examples:
//
// /api/v1/repos/alice/myrepo/runs/42/jobs/7/logs
// → /api/v1/repos/:owner/:repo/runs/:id/jobs/:id/logs
//
// /alice/myrepo.git/info/refs
// → /:owner/:repo.git/info/refs
var reNumeric = regexp.MustCompile(`/\d+`)
func normalizePath(path string) string {
// Replace all-numeric segments first.
path = reNumeric.ReplaceAllString(path, "/:id")
// Normalize repo smart-HTTP paths: /{owner}/{repo}.git/...
path = reGitPath.ReplaceAllString(path, "/:owner/:repo.git$1")
// Normalize /api/v1/repos/{owner}/{repo}/...
path = reRepoPath.ReplaceAllString(path, "/api/v1/repos/:owner/:repo$1")
// Normalize /api/v1/workspaces/{handle}/...
path = reWorkspacePath.ReplaceAllString(path, "/api/v1/workspaces/:handle$1")
return path
}
var (
reGitPath = regexp.MustCompile(`^/[^/]+/[^/]+\.git(/.*)$`)
reRepoPath = regexp.MustCompile(`^/api/v1/repos/[^/]+/[^/]+(/.*)$`)
reWorkspacePath = regexp.MustCompile(`^/api/v1/workspaces/[^/]+(/.*)$`)
)
// ── NATS event watcher ────────────────────────────────────────────────────────
// StartNATSWatcher subscribes to pipeline and deployment NATS events and
// increments the corresponding Prometheus counters. Runs until ctx is cancelled.
func StartNATSWatcher(ctx context.Context, bus events.EventBus) {
type statusPayload struct {
Status string `json:"status"`
}
unsub1, err := bus.Subscribe("pipeline.>", func(subject string, data []byte) {
switch subject {
case events.SubjectPipelineTriggered:
ActivePipelineRuns.Inc()
case events.SubjectPipelineCompleted:
ActivePipelineRuns.Dec()
PipelineRunsTotal.WithLabelValues("succeeded").Inc()
case events.SubjectPipelineFailed:
ActivePipelineRuns.Dec()
PipelineRunsTotal.WithLabelValues("failed").Inc()
}
})
if err != nil {
log.Printf("observability: subscribe pipeline.*: %v", err)
} else {
defer unsub1()
}
unsub2, err := bus.Subscribe("deployment.>", func(subject string, data []byte) {
var p statusPayload
json.Unmarshal(data, &p) //nolint:errcheck
switch subject {
case events.SubjectDeploymentSucceeded:
DeploymentsTotal.WithLabelValues("success").Inc()
case events.SubjectDeploymentFailed:
DeploymentsTotal.WithLabelValues("failure").Inc()
case events.SubjectDeploymentStarted:
DeploymentsTotal.WithLabelValues("pending").Inc()
}
})
if err != nil {
log.Printf("observability: subscribe deployment.*: %v", err)
} else {
defer unsub2()
}
log.Printf("observability: NATS metric watcher started")
<-ctx.Done()
}