implemented NATS event bus, websocket hub upgrade, and audit log
This commit is contained in:
@@ -26,6 +26,11 @@ INSTANCE_NAME=ForgeBucket
|
||||
# OIDC_CLIENT_ID=
|
||||
# OIDC_CLIENT_SECRET=
|
||||
|
||||
# ─── Event Bus (NATS) ────────────────────────────────────────────────────────
|
||||
# Leave empty to disable event publishing (no-op mode).
|
||||
# Start NATS with: make docker-up
|
||||
NATS_URL=nats://localhost:4222
|
||||
|
||||
# ─── Dev only ─────────────────────────────────────────────────────────────────
|
||||
# Set to true to disable Secure cookies and enable verbose logging
|
||||
DEBUG=false
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/forgeo/forgebucket/internal/config"
|
||||
"github.com/forgeo/forgebucket/internal/db"
|
||||
gitdomain "github.com/forgeo/forgebucket/internal/domain/git"
|
||||
"github.com/forgeo/forgebucket/internal/events"
|
||||
"github.com/forgeo/forgebucket/internal/models/migrations"
|
||||
"github.com/forgeo/forgebucket/web"
|
||||
)
|
||||
@@ -42,6 +43,12 @@ func main() {
|
||||
|
||||
gitdomain.SetRepoRoot(cfg.RepoRoot)
|
||||
|
||||
bus, err := events.New(cfg.NATSUrl)
|
||||
if err != nil {
|
||||
log.Fatalf("events: %v", err)
|
||||
}
|
||||
defer bus.Close()
|
||||
|
||||
store := sessions.NewCookieStore([]byte(cfg.SessionSecret))
|
||||
store.Options = &sessions.Options{
|
||||
Path: "/",
|
||||
@@ -51,7 +58,7 @@ func main() {
|
||||
SameSite: http.SameSiteLaxMode,
|
||||
}
|
||||
|
||||
handler := api.New(cfg, engine, store, web.FS())
|
||||
handler := api.New(cfg, engine, store, bus, web.FS())
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: fmt.Sprintf(":%s", cfg.Port),
|
||||
|
||||
@@ -4,6 +4,19 @@ version: "3.9"
|
||||
# Production: docker compose -f docker-compose.prod.yml up
|
||||
|
||||
services:
|
||||
nats:
|
||||
image: nats:2-alpine
|
||||
restart: unless-stopped
|
||||
command: ["-js", "-m", "8222"]
|
||||
ports:
|
||||
- "4222:4222" # client connections
|
||||
- "8222:8222" # monitoring HTTP
|
||||
healthcheck:
|
||||
test: ["CMD", "wget", "--spider", "-q", "http://localhost:8222/healthz"]
|
||||
interval: 5s
|
||||
timeout: 5s
|
||||
retries: 10
|
||||
|
||||
postgres:
|
||||
image: postgres:18
|
||||
restart: unless-stopped
|
||||
|
||||
@@ -17,7 +17,12 @@ require (
|
||||
github.com/goccy/go-json v0.10.5 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/gorilla/securecookie v1.1.2 // indirect
|
||||
github.com/klauspost/compress v1.18.5 // indirect
|
||||
github.com/nats-io/nats.go v1.52.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.15 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/syndtr/goleveldb v1.0.0 // indirect
|
||||
golang.org/x/sys v0.43.0 // indirect
|
||||
golang.org/x/tools v0.43.0 // indirect
|
||||
xorm.io/builder v0.3.13 // indirect
|
||||
)
|
||||
|
||||
@@ -34,12 +34,20 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
||||
github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE=
|
||||
github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
|
||||
github.com/lib/pq v1.12.3 h1:tTWxr2YLKwIvK90ZXEw8GP7UFHtcbTtty8zsI+YjrfQ=
|
||||
github.com/lib/pq v1.12.3/go.mod h1:/p+8NSbOcwzAEI7wiMXFlgydTwcgTr3OSKMsD2BitpA=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs=
|
||||
github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/nats-io/nats.go v1.52.0 h1:n3avV4VBsCgsdwh71TppsTwtv+QdPs7ntSKM8qJLGsc=
|
||||
github.com/nats-io/nats.go v1.52.0/go.mod h1:26HypzazeOkyO3/mqd1zZd53STJN0EjCYF9Uy2ZOBno=
|
||||
github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4=
|
||||
github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
||||
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"xorm.io/xorm"
|
||||
|
||||
"github.com/forgeo/forgebucket/internal/api/middleware"
|
||||
"github.com/forgeo/forgebucket/internal/models"
|
||||
)
|
||||
|
||||
type AuditHandler struct{ db *xorm.Engine }
|
||||
|
||||
func NewAuditHandler(db *xorm.Engine) *AuditHandler { return &AuditHandler{db: db} }
|
||||
|
||||
// List returns recent audit log entries. Admin-only.
|
||||
// Query params: limit (default 50, max 200), before (ID cursor for pagination),
|
||||
// actor_id, method, since (RFC3339).
|
||||
func (h *AuditHandler) List(w http.ResponseWriter, r *http.Request) {
|
||||
isAdmin, _ := r.Context().Value(middleware.ContextKeyIsAdmin).(bool)
|
||||
if !isAdmin {
|
||||
jsonError(w, "admin access required", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
q := r.URL.Query()
|
||||
|
||||
limit := 50
|
||||
if l, err := strconv.Atoi(q.Get("limit")); err == nil && l > 0 {
|
||||
if l > 200 {
|
||||
l = 200
|
||||
}
|
||||
limit = l
|
||||
}
|
||||
|
||||
sess := h.db.Desc("id").Limit(limit)
|
||||
|
||||
if before, err := strconv.ParseInt(q.Get("before"), 10, 64); err == nil && before > 0 {
|
||||
sess = sess.And("id < ?", before)
|
||||
}
|
||||
if actorID, err := strconv.ParseInt(q.Get("actor_id"), 10, 64); err == nil && actorID > 0 {
|
||||
sess = sess.And("actor_id = ?", actorID)
|
||||
}
|
||||
if method := q.Get("method"); method != "" {
|
||||
sess = sess.And("method = ?", method)
|
||||
}
|
||||
if since := q.Get("since"); since != "" {
|
||||
if t, err := time.Parse(time.RFC3339, since); err == nil {
|
||||
sess = sess.And("occurred_at >= ?", t.UTC())
|
||||
}
|
||||
}
|
||||
|
||||
var entries []models.AuditLog
|
||||
if err := sess.Find(&entries); err != nil {
|
||||
jsonError(w, "could not fetch audit log", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if entries == nil {
|
||||
entries = []models.AuditLog{}
|
||||
}
|
||||
jsonOK(w, entries)
|
||||
}
|
||||
@@ -1,18 +1,74 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"nhooyr.io/websocket"
|
||||
"nhooyr.io/websocket/wsjson"
|
||||
|
||||
"github.com/forgeo/forgebucket/internal/events"
|
||||
)
|
||||
|
||||
type WSHandler struct{}
|
||||
|
||||
func NewWSHandler() *WSHandler {
|
||||
return &WSHandler{}
|
||||
// WSHandler manages WebSocket connections and bridges NATS events to clients.
|
||||
// All connected clients receive every platform event (subject + payload).
|
||||
// Per-user filtering is added in Phase 2B when CI events carry access control.
|
||||
type WSHandler struct {
|
||||
bus events.EventBus
|
||||
mu sync.RWMutex
|
||||
clients map[string]chan events.WSEnvelope // connID → send channel
|
||||
}
|
||||
|
||||
func NewWSHandler(bus events.EventBus) *WSHandler {
|
||||
h := &WSHandler{
|
||||
bus: bus,
|
||||
clients: make(map[string]chan events.WSEnvelope),
|
||||
}
|
||||
h.subscribeAll()
|
||||
return h
|
||||
}
|
||||
|
||||
// subscribeAll wires the NATS wildcard subscription that fans events to all clients.
|
||||
func (h *WSHandler) subscribeAll() {
|
||||
unsub, err := h.bus.Subscribe(">", func(subject string, data []byte) {
|
||||
env := events.WSEnvelope{Subject: subject, Payload: data}
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for id, ch := range h.clients {
|
||||
select {
|
||||
case ch <- env:
|
||||
default:
|
||||
// Client send buffer full — drop event rather than block.
|
||||
log.Printf("ws: dropped event %s for slow client %s", subject, id)
|
||||
}
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("ws: nats subscribe failed: %v", err)
|
||||
return
|
||||
}
|
||||
// The subscription lives for the process lifetime; no cleanup needed.
|
||||
_ = unsub
|
||||
}
|
||||
|
||||
func (h *WSHandler) register(id string) chan events.WSEnvelope {
|
||||
ch := make(chan events.WSEnvelope, 64)
|
||||
h.mu.Lock()
|
||||
h.clients[id] = ch
|
||||
h.mu.Unlock()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (h *WSHandler) unregister(id string) {
|
||||
h.mu.Lock()
|
||||
delete(h.clients, id)
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
// Hub upgrades the HTTP connection to WebSocket, registers the client, and
|
||||
// pumps NATS-sourced events to it until the connection closes.
|
||||
func (h *WSHandler) Hub(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
||||
OriginPatterns: []string{"localhost:*"},
|
||||
@@ -22,13 +78,38 @@ func (h *WSHandler) Hub(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
defer conn.CloseNow()
|
||||
|
||||
// Use remote address + a monotonic component as a simple unique ID.
|
||||
connID := r.RemoteAddr + "|" + r.Header.Get("Sec-Websocket-Key")
|
||||
send := h.register(connID)
|
||||
defer h.unregister(connID)
|
||||
|
||||
ctx := r.Context()
|
||||
for {
|
||||
var msg map[string]any
|
||||
if err := wsjson.Read(ctx, conn, &msg); err != nil {
|
||||
break
|
||||
|
||||
// Write pump: drain the send channel and write to the WebSocket.
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case env, ok := <-send:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
msg := map[string]any{
|
||||
"subject": env.Subject,
|
||||
"payload": json.RawMessage(env.Payload),
|
||||
}
|
||||
if err := wsjson.Write(ctx, conn, msg); err != nil {
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := wsjson.Write(ctx, conn, msg); err != nil {
|
||||
}()
|
||||
|
||||
// Read pump: discard incoming messages (clients send no data for now),
|
||||
// keeping the connection alive and detecting closes.
|
||||
for {
|
||||
if _, _, err := conn.Read(ctx); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"xorm.io/xorm"
|
||||
|
||||
"github.com/forgeo/forgebucket/internal/events"
|
||||
"github.com/forgeo/forgebucket/internal/models"
|
||||
)
|
||||
|
||||
// statusRecorder wraps ResponseWriter to capture the written status code.
|
||||
type statusRecorder struct {
|
||||
http.ResponseWriter
|
||||
status int
|
||||
}
|
||||
|
||||
func (r *statusRecorder) WriteHeader(code int) {
|
||||
r.status = code
|
||||
r.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
|
||||
// AuditLog middleware records every state-mutating request (POST/PUT/PATCH/DELETE)
|
||||
// to the audit_log table and publishes an audit.event to the event bus.
|
||||
// It runs after auth middleware so the actor is available in context.
|
||||
func AuditLog(db *xorm.Engine, bus events.EventBus) func(http.Handler) http.Handler {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.Method {
|
||||
case http.MethodGet, http.MethodHead, http.MethodOptions, http.MethodTrace:
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
rec := &statusRecorder{ResponseWriter: w, status: http.StatusOK}
|
||||
next.ServeHTTP(rec, r)
|
||||
|
||||
actorID, _ := UserIDFromContext(r.Context())
|
||||
actorName, _ := r.Context().Value(ContextKeyUsername).(string)
|
||||
|
||||
entry := &models.AuditLog{
|
||||
ActorID: actorID,
|
||||
ActorName: actorName,
|
||||
Method: r.Method,
|
||||
Path: r.URL.Path,
|
||||
StatusCode: rec.status,
|
||||
IPAddress: r.RemoteAddr,
|
||||
UserAgent: r.UserAgent(),
|
||||
OccurredAt: time.Now().UTC(),
|
||||
}
|
||||
|
||||
go func() {
|
||||
db.Insert(entry) //nolint:errcheck
|
||||
bus.Publish(events.SubjectAuditEvent, events.AuditEvent{ //nolint:errcheck
|
||||
ActorID: entry.ActorID,
|
||||
ActorName: entry.ActorName,
|
||||
Action: entry.Method + " " + entry.Path,
|
||||
ResourcePath: entry.Path,
|
||||
IPAddress: entry.IPAddress,
|
||||
StatusCode: entry.StatusCode,
|
||||
At: entry.OccurredAt,
|
||||
})
|
||||
}()
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -17,9 +17,10 @@ import (
|
||||
"github.com/forgeo/forgebucket/internal/api/handlers"
|
||||
"github.com/forgeo/forgebucket/internal/api/middleware"
|
||||
"github.com/forgeo/forgebucket/internal/config"
|
||||
"github.com/forgeo/forgebucket/internal/events"
|
||||
)
|
||||
|
||||
func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, staticFiles fs.FS) http.Handler {
|
||||
func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, bus events.EventBus, staticFiles fs.FS) http.Handler {
|
||||
r := chi.NewRouter()
|
||||
|
||||
r.Use(chimiddleware.Logger)
|
||||
@@ -33,14 +34,15 @@ func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, staticFi
|
||||
MaxAge: 300,
|
||||
}))
|
||||
|
||||
csrf := middleware.CSRF(!cfg.Debug)
|
||||
auth := middleware.NewAuth(store, engine, handlers.LookupAccessToken)
|
||||
csrf := middleware.CSRF(!cfg.Debug)
|
||||
auth := middleware.NewAuth(store, engine, handlers.LookupAccessToken)
|
||||
audit := middleware.AuditLog(engine, bus)
|
||||
|
||||
repoH := handlers.NewRepoHandler(engine, cfg)
|
||||
userH := handlers.NewUserHandler(engine, store)
|
||||
prH := handlers.NewPRHandler(engine)
|
||||
pipeH := handlers.NewPipelineHandler(engine)
|
||||
wsH := handlers.NewWSHandler()
|
||||
wsH := handlers.NewWSHandler(bus)
|
||||
gitH := handlers.NewGitHTTPHandler(engine, cfg)
|
||||
issueH := handlers.NewIssueHandler(engine)
|
||||
sshKeyH := handlers.NewSSHKeyHandler(engine)
|
||||
@@ -53,6 +55,7 @@ func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, staticFi
|
||||
lfsH := handlers.NewLFSHandler(engine)
|
||||
exploreH := handlers.NewExploreHandler(engine)
|
||||
dashH := handlers.NewDashboardHandler(engine)
|
||||
auditH := handlers.NewAuditHandler(engine)
|
||||
|
||||
// ── Git smart-HTTP transport ───────────────────────────────────────────────
|
||||
// Regex constraint ensures only *.git paths match, so asset/SPA URLs
|
||||
@@ -94,9 +97,11 @@ func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, staticFi
|
||||
// ── Protected (session + CSRF for mutations) ──────────────────────────
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(auth.Require)
|
||||
r.Use(audit)
|
||||
|
||||
r.Get("/me", userH.Me)
|
||||
r.Get("/dashboard", dashH.Get)
|
||||
r.Get("/audit", auditH.List)
|
||||
|
||||
// SSH key management
|
||||
r.Get("/user/keys", sshKeyH.List)
|
||||
|
||||
@@ -25,6 +25,9 @@ type Config struct {
|
||||
OIDCClientID string
|
||||
OIDCClientSecret string
|
||||
|
||||
// Event bus
|
||||
NATSUrl string
|
||||
|
||||
// Federation
|
||||
InstanceURL string
|
||||
InstanceName string
|
||||
@@ -39,6 +42,7 @@ func Load() (*Config, error) {
|
||||
RepoRoot: getEnv("REPO_ROOT", "/var/lib/forgebucket/repos"),
|
||||
Debug: getEnvBool("DEBUG", false),
|
||||
|
||||
NATSUrl: getEnv("NATS_URL", ""),
|
||||
InstanceURL: getEnv("INSTANCE_URL", ""),
|
||||
InstanceName: getEnv("INSTANCE_NAME", "ForgeBucket"),
|
||||
}
|
||||
|
||||
@@ -0,0 +1,87 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// EventBus is the platform event bus interface.
|
||||
// Publish sends a typed payload to the given subject.
|
||||
// Subscribe registers a handler for a subject pattern (supports NATS wildcards).
|
||||
// The returned func() unsubscribes when called.
|
||||
type EventBus interface {
|
||||
Publish(subject string, payload any) error
|
||||
Subscribe(subject string, handler func(subject string, data []byte)) (func(), error)
|
||||
Close()
|
||||
}
|
||||
|
||||
// NATSBus is the NATS-backed EventBus. Events are published to core NATS subjects.
|
||||
// Phase 2A uses core NATS (ephemeral). Phase 2B will add JetStream for CI durability.
|
||||
type NATSBus struct {
|
||||
nc *nats.Conn
|
||||
}
|
||||
|
||||
func NewNATSBus(url string) (*NATSBus, error) {
|
||||
nc, err := nats.Connect(url,
|
||||
nats.MaxReconnects(-1),
|
||||
nats.ReconnectWait(2*time.Second),
|
||||
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
|
||||
if err != nil {
|
||||
log.Printf("nats: disconnected: %v", err)
|
||||
}
|
||||
}),
|
||||
nats.ReconnectHandler(func(_ *nats.Conn) {
|
||||
log.Printf("nats: reconnected")
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("nats connect %s: %w", url, err)
|
||||
}
|
||||
log.Printf("nats: connected to %s", url)
|
||||
return &NATSBus{nc: nc}, nil
|
||||
}
|
||||
|
||||
func (b *NATSBus) Publish(subject string, payload any) error {
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal: %w", err)
|
||||
}
|
||||
return b.nc.Publish(subject, data)
|
||||
}
|
||||
|
||||
func (b *NATSBus) Subscribe(subject string, handler func(subject string, data []byte)) (func(), error) {
|
||||
sub, err := b.nc.Subscribe(subject, func(msg *nats.Msg) {
|
||||
handler(msg.Subject, msg.Data)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return func() { sub.Unsubscribe() }, nil //nolint:errcheck
|
||||
}
|
||||
|
||||
func (b *NATSBus) Close() {
|
||||
if err := b.nc.Drain(); err != nil {
|
||||
log.Printf("nats: drain: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// NoOpBus is a no-op EventBus used when NATS_URL is not configured.
|
||||
// Events are silently dropped. The app works normally; just no real-time push.
|
||||
type NoOpBus struct{}
|
||||
|
||||
func (NoOpBus) Publish(_ string, _ any) error { return nil }
|
||||
func (NoOpBus) Subscribe(_ string, _ func(string, []byte)) (func(), error) { return func() {}, nil }
|
||||
func (NoOpBus) Close() {}
|
||||
|
||||
// New returns a NATSBus if url is non-empty, otherwise a NoOpBus.
|
||||
func New(url string) (EventBus, error) {
|
||||
if url == "" {
|
||||
log.Printf("events: NATS_URL not set — using no-op bus (real-time push disabled)")
|
||||
return NoOpBus{}, nil
|
||||
}
|
||||
return NewNATSBus(url)
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package events
|
||||
|
||||
// Subject constants for all platform events.
|
||||
// Wildcards: ">" matches one or more tokens, "*" matches exactly one token.
|
||||
const (
|
||||
// Repository lifecycle
|
||||
SubjectRepoCreated = "repo.created"
|
||||
SubjectRepoDeleted = "repo.deleted"
|
||||
SubjectRepoUpdated = "repo.updated"
|
||||
|
||||
// Git push
|
||||
SubjectPushReceived = "push.received"
|
||||
|
||||
// Pull requests
|
||||
SubjectPROpened = "pr.opened"
|
||||
SubjectPRMerged = "pr.merged"
|
||||
SubjectPRClosed = "pr.closed"
|
||||
SubjectPRReopened = "pr.reopened"
|
||||
SubjectPRUpdated = "pr.updated"
|
||||
|
||||
// Issues
|
||||
SubjectIssueOpened = "issue.opened"
|
||||
SubjectIssueClosed = "issue.closed"
|
||||
SubjectIssueReopened = "issue.reopened"
|
||||
|
||||
// CI/CD (Phase 2B)
|
||||
SubjectPipelineTriggered = "pipeline.triggered"
|
||||
SubjectPipelineStarted = "pipeline.started"
|
||||
SubjectPipelineCompleted = "pipeline.completed"
|
||||
SubjectPipelineFailed = "pipeline.failed"
|
||||
SubjectJobQueued = "job.queued"
|
||||
SubjectJobStarted = "job.started"
|
||||
SubjectJobCompleted = "job.completed"
|
||||
SubjectJobFailed = "job.failed"
|
||||
SubjectArtifactPublished = "artifact.published"
|
||||
|
||||
// Deployments (Phase 3A)
|
||||
SubjectDeploymentStarted = "deployment.started"
|
||||
SubjectDeploymentSucceeded = "deployment.succeeded"
|
||||
SubjectDeploymentFailed = "deployment.failed"
|
||||
SubjectDeploymentRolledBack = "deployment.rolled_back"
|
||||
|
||||
// Environments (Phase 3D)
|
||||
SubjectEnvironmentDriftDetected = "environment.drift_detected"
|
||||
|
||||
// Audit
|
||||
SubjectAuditEvent = "audit.event"
|
||||
)
|
||||
@@ -0,0 +1,64 @@
|
||||
package events
|
||||
|
||||
import "time"
|
||||
|
||||
type RepoEvent struct {
|
||||
RepoID int64 `json:"repoId"`
|
||||
RepoName string `json:"repoName"`
|
||||
OwnerID int64 `json:"ownerId"`
|
||||
OwnerName string `json:"ownerName"`
|
||||
ActorID int64 `json:"actorId"`
|
||||
ActorName string `json:"actorName"`
|
||||
At time.Time `json:"at"`
|
||||
}
|
||||
|
||||
type PushEvent struct {
|
||||
RepoID int64 `json:"repoId"`
|
||||
RepoName string `json:"repoName"`
|
||||
OwnerName string `json:"ownerName"`
|
||||
Ref string `json:"ref"`
|
||||
Before string `json:"before"`
|
||||
After string `json:"after"`
|
||||
Pusher string `json:"pusher"`
|
||||
At time.Time `json:"at"`
|
||||
}
|
||||
|
||||
type PREvent struct {
|
||||
PRID int64 `json:"prId"`
|
||||
RepoID int64 `json:"repoId"`
|
||||
RepoName string `json:"repoName"`
|
||||
OwnerName string `json:"ownerName"`
|
||||
Title string `json:"title"`
|
||||
SourceBranch string `json:"sourceBranch"`
|
||||
TargetBranch string `json:"targetBranch"`
|
||||
AuthorID int64 `json:"authorId"`
|
||||
AuthorName string `json:"authorName"`
|
||||
At time.Time `json:"at"`
|
||||
}
|
||||
|
||||
type IssueEvent struct {
|
||||
IssueID int64 `json:"issueId"`
|
||||
RepoID int64 `json:"repoId"`
|
||||
RepoName string `json:"repoName"`
|
||||
OwnerName string `json:"ownerName"`
|
||||
Title string `json:"title"`
|
||||
AuthorID int64 `json:"authorId"`
|
||||
At time.Time `json:"at"`
|
||||
}
|
||||
|
||||
type AuditEvent struct {
|
||||
ActorID int64 `json:"actorId"`
|
||||
ActorName string `json:"actorName"`
|
||||
Action string `json:"action"`
|
||||
ResourceType string `json:"resourceType"`
|
||||
ResourcePath string `json:"resourcePath"`
|
||||
IPAddress string `json:"ipAddress"`
|
||||
StatusCode int `json:"statusCode"`
|
||||
At time.Time `json:"at"`
|
||||
}
|
||||
|
||||
// WSEnvelope wraps any event for delivery over the WebSocket connection.
|
||||
type WSEnvelope struct {
|
||||
Subject string `json:"subject"`
|
||||
Payload []byte `json:"payload"`
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package models
|
||||
|
||||
import "time"
|
||||
|
||||
// AuditLog records every state-mutating HTTP request made by an authenticated user.
|
||||
type AuditLog struct {
|
||||
ID int64 `xorm:"'id' pk autoincr" json:"id"`
|
||||
ActorID int64 `xorm:"'actor_id' index" json:"actorId"`
|
||||
ActorName string `xorm:"'actor_name' varchar(64)" json:"actorName"`
|
||||
Method string `xorm:"'method' varchar(10)" json:"method"`
|
||||
Path string `xorm:"'path' varchar(500)" json:"path"`
|
||||
StatusCode int `xorm:"'status_code'" json:"statusCode"`
|
||||
IPAddress string `xorm:"'ip_address' varchar(45)" json:"ipAddress"`
|
||||
UserAgent string `xorm:"'user_agent' varchar(500)" json:"userAgent"`
|
||||
OccurredAt time.Time `xorm:"'occurred_at' index" json:"occurredAt"`
|
||||
}
|
||||
@@ -31,5 +31,8 @@ func Run(engine *xorm.Engine) error {
|
||||
if err := Run006(engine); err != nil {
|
||||
return err
|
||||
}
|
||||
return Run007(engine)
|
||||
if err := Run007(engine); err != nil {
|
||||
return err
|
||||
}
|
||||
return Run008(engine)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
package migrations
|
||||
|
||||
import (
|
||||
"github.com/forgeo/forgebucket/internal/models"
|
||||
"xorm.io/xorm"
|
||||
)
|
||||
|
||||
func Run008(engine *xorm.Engine) error {
|
||||
return engine.Sync2(&models.AuditLog{})
|
||||
}
|
||||
Reference in New Issue
Block a user