96 lines
2.5 KiB
Go
96 lines
2.5 KiB
Go
package gitops
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"log"
|
|
"time"
|
|
|
|
"xorm.io/xorm"
|
|
|
|
"github.com/forgeo/forgebucket/internal/config"
|
|
"github.com/forgeo/forgebucket/internal/events"
|
|
"github.com/forgeo/forgebucket/internal/models"
|
|
)
|
|
|
|
// Controller is the GitOps reconciliation engine. It subscribes to NATS events
|
|
// and drives drift detection + auto-sync for every configured environment.
|
|
type Controller struct {
|
|
db *xorm.Engine
|
|
bus events.EventBus
|
|
cfg *config.Config
|
|
}
|
|
|
|
func NewController(db *xorm.Engine, bus events.EventBus, cfg *config.Config) *Controller {
|
|
return &Controller{db: db, bus: bus, cfg: cfg}
|
|
}
|
|
|
|
// Start subscribes to relevant events and blocks until ctx is cancelled.
|
|
func (c *Controller) Start(ctx context.Context) {
|
|
c.recoverSyncingState()
|
|
|
|
unsub1, err := c.bus.Subscribe(events.SubjectPushReceived, func(_ string, data []byte) {
|
|
var evt events.PushEvent
|
|
if err := json.Unmarshal(data, &evt); err != nil {
|
|
log.Printf("gitops: bad push.received payload: %v", err)
|
|
return
|
|
}
|
|
go c.handlePush(evt)
|
|
})
|
|
if err != nil {
|
|
log.Printf("gitops: subscribe push.received: %v", err)
|
|
} else {
|
|
defer unsub1()
|
|
}
|
|
|
|
unsub2, err := c.bus.Subscribe(events.SubjectDeploymentSucceeded, func(_ string, data []byte) {
|
|
go c.handleDeploymentSucceeded(data)
|
|
})
|
|
if err != nil {
|
|
log.Printf("gitops: subscribe deployment.succeeded: %v", err)
|
|
} else {
|
|
defer unsub2()
|
|
}
|
|
|
|
unsub3, err := c.bus.Subscribe(events.SubjectDeploymentFailed, func(_ string, data []byte) {
|
|
go c.handleDeploymentFailed(data)
|
|
})
|
|
if err != nil {
|
|
log.Printf("gitops: subscribe deployment.failed: %v", err)
|
|
} else {
|
|
defer unsub3()
|
|
}
|
|
|
|
if c.cfg.GitOpsReconcileInterval > 0 {
|
|
go c.runTicker(ctx)
|
|
}
|
|
|
|
log.Printf("gitops: controller started (reconcile interval: %ds)", c.cfg.GitOpsReconcileInterval)
|
|
<-ctx.Done()
|
|
}
|
|
|
|
func (c *Controller) runTicker(ctx context.Context) {
|
|
interval := time.Duration(c.cfg.GitOpsReconcileInterval) * time.Second
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
c.periodicCheck()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// recoverSyncingState marks any configs left in "syncing" as "drifted" on startup
|
|
// (they were in-flight when the server last stopped).
|
|
func (c *Controller) recoverSyncingState() {
|
|
affected, _ := c.db.Where("sync_status = 'syncing'").
|
|
Cols("sync_status").
|
|
Update(&models.GitOpsConfig{SyncStatus: "drifted"})
|
|
if affected > 0 {
|
|
log.Printf("gitops: recovered %d stale syncing configs → drifted", affected)
|
|
}
|
|
}
|