package handlers import ( "encoding/json" "log" "net/http" "sync" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" "github.com/forgeo/forgebucket/internal/events" ) // WSHandler manages WebSocket connections and bridges NATS events to clients. // All connected clients receive every platform event (subject + payload). // Per-user filtering is added in Phase 2B when CI events carry access control. type WSHandler struct { bus events.EventBus mu sync.RWMutex clients map[string]chan events.WSEnvelope // connID → send channel } func NewWSHandler(bus events.EventBus) *WSHandler { h := &WSHandler{ bus: bus, clients: make(map[string]chan events.WSEnvelope), } h.subscribeAll() return h } // subscribeAll wires the NATS wildcard subscription that fans events to all clients. func (h *WSHandler) subscribeAll() { unsub, err := h.bus.Subscribe(">", func(subject string, data []byte) { env := events.WSEnvelope{Subject: subject, Payload: data} h.mu.RLock() defer h.mu.RUnlock() for id, ch := range h.clients { select { case ch <- env: default: // Client send buffer full — drop event rather than block. log.Printf("ws: dropped event %s for slow client %s", subject, id) } } }) if err != nil { log.Printf("ws: nats subscribe failed: %v", err) return } // The subscription lives for the process lifetime; no cleanup needed. _ = unsub } func (h *WSHandler) register(id string) chan events.WSEnvelope { ch := make(chan events.WSEnvelope, 64) h.mu.Lock() h.clients[id] = ch h.mu.Unlock() return ch } func (h *WSHandler) unregister(id string) { h.mu.Lock() delete(h.clients, id) h.mu.Unlock() } // Hub upgrades the HTTP connection to WebSocket, registers the client, and // pumps NATS-sourced events to it until the connection closes. func (h *WSHandler) Hub(w http.ResponseWriter, r *http.Request) { conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ OriginPatterns: []string{"localhost:*"}, }) if err != nil { return } defer conn.CloseNow() // Use remote address + a monotonic component as a simple unique ID. connID := r.RemoteAddr + "|" + r.Header.Get("Sec-Websocket-Key") send := h.register(connID) defer h.unregister(connID) ctx := r.Context() // Write pump: drain the send channel and write to the WebSocket. go func() { for { select { case env, ok := <-send: if !ok { return } msg := map[string]any{ "subject": env.Subject, "payload": json.RawMessage(env.Payload), } if err := wsjson.Write(ctx, conn, msg); err != nil { return } case <-ctx.Done(): return } } }() // Read pump: discard incoming messages (clients send no data for now), // keeping the connection alive and detecting closes. for { if _, _, err := conn.Read(ctx); err != nil { break } } }