implemented observability

This commit is contained in:
2026-05-12 20:32:30 +02:00
parent c7df53708c
commit e360f3697e
11 changed files with 478 additions and 22 deletions
+126
View File
@@ -0,0 +1,126 @@
package handlers
import (
"net/http"
"time"
"xorm.io/xorm"
"github.com/forgeo/forgebucket/internal/events"
"github.com/forgeo/forgebucket/internal/models"
"github.com/forgeo/forgebucket/internal/observability"
)
// ── /health ───────────────────────────────────────────────────────────────────
type HealthHandler struct {
db *xorm.Engine
bus events.EventBus
}
func NewHealthHandler(db *xorm.Engine, bus events.EventBus) *HealthHandler {
return &HealthHandler{db: db, bus: bus}
}
func (h *HealthHandler) Health(w http.ResponseWriter, r *http.Request) {
status := observability.Check(h.db, h.bus)
if status.Status != "healthy" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
jsonOK(w, status)
return
}
jsonOK(w, status)
}
// ── /api/v1/repos/{owner}/{repo}/health ──────────────────────────────────────
type RepoHealthHandler struct{ db *xorm.Engine }
func NewRepoHealthHandler(db *xorm.Engine) *RepoHealthHandler {
return &RepoHealthHandler{db: db}
}
type latestDeployment struct {
EnvName string `json:"envName"`
Status string `json:"status"`
SHA string `json:"sha"`
FinishedAt *time.Time `json:"finishedAt"`
}
type repoHealthResponse struct {
CIPassRate7d float64 `json:"ciPassRate7d"`
TotalRuns7d int `json:"totalRuns7d"`
LatestRun *models.PipelineRun `json:"latestRun"`
LatestDeployments []latestDeployment `json:"latestDeployments"`
OpenDriftCount int `json:"openDriftCount"`
OpenPRCount int `json:"openPRCount"`
}
// Get returns an operational health summary for a repository.
// This feeds the repo page header: CI pass rate, latest deploy per env, drift count.
func (h *RepoHealthHandler) Get(w http.ResponseWriter, r *http.Request) {
repoID, ok := resolveRepoID(h.db, w, r)
if !ok {
return
}
since7d := time.Now().UTC().Add(-7 * 24 * time.Hour)
// CI pass rate over last 7 days.
var runs []models.PipelineRun
h.db.Where("repo_id = ? AND created_at >= ?", repoID, since7d).Find(&runs)
total := len(runs)
succeeded := 0
for _, run := range runs {
if run.Status == "succeeded" {
succeeded++
}
}
var passRate float64
if total > 0 {
passRate = float64(succeeded) / float64(total)
}
// Latest run overall.
var latestRun models.PipelineRun
var hasLatest bool
hasLatest, _ = h.db.Where("repo_id = ?", repoID).Desc("id").Limit(1).Get(&latestRun)
// Latest deployment per environment.
var envs []models.Environment
h.db.Where("repo_id = ?", repoID).Find(&envs)
deploys := make([]latestDeployment, 0, len(envs))
for _, env := range envs {
var d models.Deployment
if found, _ := h.db.Where("env_id = ?", env.ID).Desc("id").Limit(1).Get(&d); found {
deploys = append(deploys, latestDeployment{
EnvName: env.Name,
Status: string(d.Status),
SHA: d.SHA,
FinishedAt: d.FinishedAt,
})
}
}
// Open drift count (GitOpsConfigs where sync_status = 'drifted').
driftCount, _ := h.db.Where("repo_id = ? AND sync_status = 'drifted'", repoID).
Count(&models.GitOpsConfig{})
// Open PR count.
prCount, _ := h.db.Where("repo_id = ? AND status = 'open'", repoID).
Count(&models.PullRequest{})
resp := repoHealthResponse{
CIPassRate7d: passRate,
TotalRuns7d: total,
LatestDeployments: deploys,
OpenDriftCount: int(driftCount),
OpenPRCount: int(prCount),
}
if hasLatest {
resp.LatestRun = &latestRun
}
jsonOK(w, resp)
}
+14 -8
View File
@@ -14,10 +14,13 @@ import (
"github.com/gorilla/sessions"
"xorm.io/xorm"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/forgeo/forgebucket/internal/api/handlers"
"github.com/forgeo/forgebucket/internal/api/middleware"
"github.com/forgeo/forgebucket/internal/config"
"github.com/forgeo/forgebucket/internal/events"
"github.com/forgeo/forgebucket/internal/observability"
)
func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, bus events.EventBus, artifactRoot string, staticFiles fs.FS) http.Handler {
@@ -26,6 +29,7 @@ func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, bus even
r.Use(chimiddleware.Logger)
r.Use(chimiddleware.RealIP)
r.Use(chimiddleware.Recoverer)
r.Use(observability.Middleware())
r.Use(cors.Handler(cors.Options{
AllowedOrigins: []string{"http://localhost:5173", cfg.InstanceURL},
AllowedMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"},
@@ -53,9 +57,11 @@ func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, bus even
webhookH := handlers.NewWebhookHandler(engine)
prSettingsH := handlers.NewPRSettingsHandler(engine)
lfsH := handlers.NewLFSHandler(engine)
exploreH := handlers.NewExploreHandler(engine)
dashH := handlers.NewDashboardHandler(engine)
auditH := handlers.NewAuditHandler(engine)
exploreH := handlers.NewExploreHandler(engine)
dashH := handlers.NewDashboardHandler(engine)
auditH := handlers.NewAuditHandler(engine)
healthH := handlers.NewHealthHandler(engine, bus)
repoHealthH := handlers.NewRepoHealthHandler(engine)
artifactH := handlers.NewArtifactHandler(engine, artifactRoot)
runnerH := handlers.NewRunnerHandler(engine)
gitopsH := handlers.NewGitOpsHandler(engine, bus)
@@ -74,17 +80,16 @@ func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, bus even
r.Post("/git-receive-pack", gitH.ServeGit)
})
// ── Ops endpoints (root-level, no auth, standard paths for k8s/Prometheus) ──
r.Get("/health", healthH.Health)
r.Get("/metrics", promhttp.Handler().ServeHTTP)
r.Route("/api/v1", func(r chi.Router) {
// ── Public ────────────────────────────────────────────────────────────
r.Get("/explore/repos", exploreH.Repos)
r.Get("/explore/users", exploreH.Users)
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"status":"ok"}`))
})
// Generates a CSRF token + cookie. SPA calls this once on load.
r.Get("/csrf", func(w http.ResponseWriter, r *http.Request) {
token, err := middleware.NewCSRFToken(w, !cfg.Debug)
@@ -240,6 +245,7 @@ func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, bus even
r.With(csrf).Delete("/secrets/{name}", secretH.DeleteRepoSecret)
r.Get("/lfs-settings", lfsH.Get)
r.With(csrf).Put("/lfs-settings", lfsH.Update)
r.Get("/health", repoHealthH.Get)
r.Route("/environments", func(r chi.Router) {
r.Get("/", envH.ListEnvironments)
r.With(csrf).Post("/", envH.CreateEnvironment)
+4
View File
@@ -16,6 +16,7 @@ import (
type EventBus interface {
Publish(subject string, payload any) error
Subscribe(subject string, handler func(subject string, data []byte)) (func(), error)
Healthy() bool
Close()
}
@@ -63,6 +64,8 @@ func (b *NATSBus) Subscribe(subject string, handler func(subject string, data []
return func() { sub.Unsubscribe() }, nil //nolint:errcheck
}
func (b *NATSBus) Healthy() bool { return b.nc.IsConnected() }
func (b *NATSBus) Close() {
if err := b.nc.Drain(); err != nil {
log.Printf("nats: drain: %v", err)
@@ -75,6 +78,7 @@ type NoOpBus struct{}
func (NoOpBus) Publish(_ string, _ any) error { return nil }
func (NoOpBus) Subscribe(_ string, _ func(string, []byte)) (func(), error) { return func() {}, nil }
func (NoOpBus) Healthy() bool { return true }
func (NoOpBus) Close() {}
// New returns a NATSBus if url is non-empty, otherwise a NoOpBus.
+52
View File
@@ -0,0 +1,52 @@
package observability
import (
"fmt"
"xorm.io/xorm"
"github.com/forgeo/forgebucket/internal/events"
)
const Version = "0.8.0"
// HealthStatus is the response shape for GET /health.
type HealthStatus struct {
Status string `json:"status"` // "healthy" | "degraded"
Checks map[string]string `json:"checks"` // dependency name → "ok" | error message
Version string `json:"version"`
}
// Check pings each critical dependency and returns a HealthStatus.
// HTTP status should be 200 when Status=="healthy", 503 when "degraded".
func Check(db *xorm.Engine, bus events.EventBus) HealthStatus {
checks := make(map[string]string, 2)
// Database — attempt a lightweight ping.
if err := db.Ping(); err != nil {
checks["database"] = fmt.Sprintf("error: %v", err)
} else {
checks["database"] = "ok"
}
// NATS — use the Healthy() method added in Phase 3E.
if bus.Healthy() {
checks["nats"] = "ok"
} else {
checks["nats"] = "disconnected"
}
overall := "healthy"
for _, v := range checks {
if v != "ok" {
overall = "degraded"
break
}
}
return HealthStatus{
Status: overall,
Checks: checks,
Version: Version,
}
}
+172
View File
@@ -0,0 +1,172 @@
package observability
import (
"context"
"encoding/json"
"log"
"net/http"
"regexp"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/forgeo/forgebucket/internal/events"
)
// ── Metric definitions ────────────────────────────────────────────────────────
var (
HttpRequestsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "forgebucket_http_requests_total",
Help: "Total HTTP requests by method, normalized path, and status code.",
}, []string{"method", "path", "status"})
HttpRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "forgebucket_http_request_duration_seconds",
Help: "HTTP request latency by method and normalized path.",
Buckets: prometheus.DefBuckets,
}, []string{"method", "path"})
PipelineRunsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "forgebucket_pipeline_runs_total",
Help: "Pipeline runs by terminal status.",
}, []string{"status"})
DeploymentsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "forgebucket_deployments_total",
Help: "Deployments by terminal status.",
}, []string{"status"})
ActivePipelineRuns = promauto.NewGauge(prometheus.GaugeOpts{
Name: "forgebucket_active_pipeline_runs",
Help: "Pipeline runs currently in queued or running state.",
})
)
func init() {
// Pre-initialize all label combinations so the metrics are visible in
// /metrics immediately from startup (no gaps on first scrape).
for _, s := range []string{"succeeded", "failed", "cancelled"} {
PipelineRunsTotal.With(prometheus.Labels{"status": s})
}
for _, s := range []string{"pending", "success", "failure", "cancelled"} {
DeploymentsTotal.With(prometheus.Labels{"status": s})
}
}
// ── HTTP instrumentation middleware ──────────────────────────────────────────
type statusRecorder struct {
http.ResponseWriter
status int
}
func (r *statusRecorder) WriteHeader(code int) {
r.status = code
r.ResponseWriter.WriteHeader(code)
}
// Middleware records request count and latency for every HTTP request.
// Path labels are normalized to prevent high cardinality (numeric segments
// and positional path variables are replaced with placeholder tokens).
func Middleware() func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
rec := &statusRecorder{ResponseWriter: w, status: http.StatusOK}
next.ServeHTTP(rec, r)
path := normalizePath(r.URL.Path)
status := strconv.Itoa(rec.status)
elapsed := time.Since(start).Seconds()
HttpRequestsTotal.WithLabelValues(r.Method, path, status).Inc()
HttpRequestDuration.WithLabelValues(r.Method, path).Observe(elapsed)
})
}
}
// normalizePath replaces volatile path segments with placeholders so that
// Prometheus label cardinality stays bounded.
//
// Examples:
//
// /api/v1/repos/alice/myrepo/runs/42/jobs/7/logs
// → /api/v1/repos/:owner/:repo/runs/:id/jobs/:id/logs
//
// /alice/myrepo.git/info/refs
// → /:owner/:repo.git/info/refs
var reNumeric = regexp.MustCompile(`/\d+`)
func normalizePath(path string) string {
// Replace all-numeric segments first.
path = reNumeric.ReplaceAllString(path, "/:id")
// Normalize repo smart-HTTP paths: /{owner}/{repo}.git/...
path = reGitPath.ReplaceAllString(path, "/:owner/:repo.git$1")
// Normalize /api/v1/repos/{owner}/{repo}/...
path = reRepoPath.ReplaceAllString(path, "/api/v1/repos/:owner/:repo$1")
// Normalize /api/v1/workspaces/{handle}/...
path = reWorkspacePath.ReplaceAllString(path, "/api/v1/workspaces/:handle$1")
return path
}
var (
reGitPath = regexp.MustCompile(`^/[^/]+/[^/]+\.git(/.*)$`)
reRepoPath = regexp.MustCompile(`^/api/v1/repos/[^/]+/[^/]+(/.*)$`)
reWorkspacePath = regexp.MustCompile(`^/api/v1/workspaces/[^/]+(/.*)$`)
)
// ── NATS event watcher ────────────────────────────────────────────────────────
// StartNATSWatcher subscribes to pipeline and deployment NATS events and
// increments the corresponding Prometheus counters. Runs until ctx is cancelled.
func StartNATSWatcher(ctx context.Context, bus events.EventBus) {
type statusPayload struct {
Status string `json:"status"`
}
unsub1, err := bus.Subscribe("pipeline.>", func(subject string, data []byte) {
switch subject {
case events.SubjectPipelineTriggered:
ActivePipelineRuns.Inc()
case events.SubjectPipelineCompleted:
ActivePipelineRuns.Dec()
PipelineRunsTotal.WithLabelValues("succeeded").Inc()
case events.SubjectPipelineFailed:
ActivePipelineRuns.Dec()
PipelineRunsTotal.WithLabelValues("failed").Inc()
}
})
if err != nil {
log.Printf("observability: subscribe pipeline.*: %v", err)
} else {
defer unsub1()
}
unsub2, err := bus.Subscribe("deployment.>", func(subject string, data []byte) {
var p statusPayload
json.Unmarshal(data, &p) //nolint:errcheck
switch subject {
case events.SubjectDeploymentSucceeded:
DeploymentsTotal.WithLabelValues("success").Inc()
case events.SubjectDeploymentFailed:
DeploymentsTotal.WithLabelValues("failure").Inc()
case events.SubjectDeploymentStarted:
DeploymentsTotal.WithLabelValues("pending").Inc()
}
})
if err != nil {
log.Printf("observability: subscribe deployment.*: %v", err)
} else {
defer unsub2()
}
log.Printf("observability: NATS metric watcher started")
<-ctx.Done()
}