Files

96 lines
2.5 KiB
Go

package gitops
import (
"context"
"encoding/json"
"log"
"time"
"xorm.io/xorm"
"github.com/forgeo/forgebucket/internal/config"
"github.com/forgeo/forgebucket/internal/events"
"github.com/forgeo/forgebucket/internal/models"
)
// Controller is the GitOps reconciliation engine. It subscribes to NATS events
// and drives drift detection + auto-sync for every configured environment.
type Controller struct {
db *xorm.Engine
bus events.EventBus
cfg *config.Config
}
func NewController(db *xorm.Engine, bus events.EventBus, cfg *config.Config) *Controller {
return &Controller{db: db, bus: bus, cfg: cfg}
}
// Start subscribes to relevant events and blocks until ctx is cancelled.
func (c *Controller) Start(ctx context.Context) {
c.recoverSyncingState()
unsub1, err := c.bus.Subscribe(events.SubjectPushReceived, func(_ string, data []byte) {
var evt events.PushEvent
if err := json.Unmarshal(data, &evt); err != nil {
log.Printf("gitops: bad push.received payload: %v", err)
return
}
go c.handlePush(evt)
})
if err != nil {
log.Printf("gitops: subscribe push.received: %v", err)
} else {
defer unsub1()
}
unsub2, err := c.bus.Subscribe(events.SubjectDeploymentSucceeded, func(_ string, data []byte) {
go c.handleDeploymentSucceeded(data)
})
if err != nil {
log.Printf("gitops: subscribe deployment.succeeded: %v", err)
} else {
defer unsub2()
}
unsub3, err := c.bus.Subscribe(events.SubjectDeploymentFailed, func(_ string, data []byte) {
go c.handleDeploymentFailed(data)
})
if err != nil {
log.Printf("gitops: subscribe deployment.failed: %v", err)
} else {
defer unsub3()
}
if c.cfg.GitOpsReconcileInterval > 0 {
go c.runTicker(ctx)
}
log.Printf("gitops: controller started (reconcile interval: %ds)", c.cfg.GitOpsReconcileInterval)
<-ctx.Done()
}
func (c *Controller) runTicker(ctx context.Context) {
interval := time.Duration(c.cfg.GitOpsReconcileInterval) * time.Second
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.periodicCheck()
case <-ctx.Done():
return
}
}
}
// recoverSyncingState marks any configs left in "syncing" as "drifted" on startup
// (they were in-flight when the server last stopped).
func (c *Controller) recoverSyncingState() {
affected, _ := c.db.Where("sync_status = 'syncing'").
Cols("sync_status").
Update(&models.GitOpsConfig{SyncStatus: "drifted"})
if affected > 0 {
log.Printf("gitops: recovered %d stale syncing configs → drifted", affected)
}
}