package gitops import ( "context" "encoding/json" "log" "time" "xorm.io/xorm" "github.com/forgeo/forgebucket/internal/config" "github.com/forgeo/forgebucket/internal/events" "github.com/forgeo/forgebucket/internal/models" ) // Controller is the GitOps reconciliation engine. It subscribes to NATS events // and drives drift detection + auto-sync for every configured environment. type Controller struct { db *xorm.Engine bus events.EventBus cfg *config.Config } func NewController(db *xorm.Engine, bus events.EventBus, cfg *config.Config) *Controller { return &Controller{db: db, bus: bus, cfg: cfg} } // Start subscribes to relevant events and blocks until ctx is cancelled. func (c *Controller) Start(ctx context.Context) { c.recoverSyncingState() unsub1, err := c.bus.Subscribe(events.SubjectPushReceived, func(_ string, data []byte) { var evt events.PushEvent if err := json.Unmarshal(data, &evt); err != nil { log.Printf("gitops: bad push.received payload: %v", err) return } go c.handlePush(evt) }) if err != nil { log.Printf("gitops: subscribe push.received: %v", err) } else { defer unsub1() } unsub2, err := c.bus.Subscribe(events.SubjectDeploymentSucceeded, func(_ string, data []byte) { go c.handleDeploymentSucceeded(data) }) if err != nil { log.Printf("gitops: subscribe deployment.succeeded: %v", err) } else { defer unsub2() } unsub3, err := c.bus.Subscribe(events.SubjectDeploymentFailed, func(_ string, data []byte) { go c.handleDeploymentFailed(data) }) if err != nil { log.Printf("gitops: subscribe deployment.failed: %v", err) } else { defer unsub3() } if c.cfg.GitOpsReconcileInterval > 0 { go c.runTicker(ctx) } log.Printf("gitops: controller started (reconcile interval: %ds)", c.cfg.GitOpsReconcileInterval) <-ctx.Done() } func (c *Controller) runTicker(ctx context.Context) { interval := time.Duration(c.cfg.GitOpsReconcileInterval) * time.Second ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: c.periodicCheck() case <-ctx.Done(): return } } } // recoverSyncingState marks any configs left in "syncing" as "drifted" on startup // (they were in-flight when the server last stopped). func (c *Controller) recoverSyncingState() { affected, _ := c.db.Where("sync_status = 'syncing'"). Cols("sync_status"). Update(&models.GitOpsConfig{SyncStatus: "drifted"}) if affected > 0 { log.Printf("gitops: recovered %d stale syncing configs → drifted", affected) } }