package ci import ( "context" "encoding/json" "log" "strings" "time" "xorm.io/xorm" "github.com/forgeo/forgebucket/internal/domain/sbom" "github.com/forgeo/forgebucket/internal/events" "github.com/forgeo/forgebucket/internal/models" ) // Orchestrator listens for push events, creates pipeline run records, and // advances the DAG as jobs complete. It does NOT execute jobs directly — // that is the RunnerManager's responsibility. type Orchestrator struct { db *xorm.Engine bus events.EventBus sbomGen *sbom.Generator } func NewOrchestrator(db *xorm.Engine, bus events.EventBus, sbomGen *sbom.Generator) *Orchestrator { return &Orchestrator{db: db, bus: bus, sbomGen: sbomGen} } // Start subscribes to relevant NATS subjects and blocks until ctx is cancelled. func (o *Orchestrator) Start(ctx context.Context) { o.recoverStaleRuns() unsub1, err := o.bus.Subscribe(events.SubjectPushReceived, func(_ string, data []byte) { var evt events.PushEvent if err := json.Unmarshal(data, &evt); err != nil { log.Printf("orchestrator: bad push event: %v", err) return } go o.handlePush(evt) }) if err != nil { log.Printf("orchestrator: subscribe push.received: %v", err) } else { defer unsub1() } unsub2, err := o.bus.Subscribe(events.SubjectJobCompleted, func(_ string, data []byte) { var evt events.JobEvent if err := json.Unmarshal(data, &evt); err != nil { return } go o.advanceDAG(evt.RunID, evt.JobID, "succeeded") }) if err != nil { log.Printf("orchestrator: subscribe job.completed: %v", err) } else { defer unsub2() } unsub3, err := o.bus.Subscribe(events.SubjectJobFailed, func(_ string, data []byte) { var evt events.JobEvent if err := json.Unmarshal(data, &evt); err != nil { return } go o.advanceDAG(evt.RunID, evt.JobID, "failed") }) if err != nil { log.Printf("orchestrator: subscribe job.failed: %v", err) } else { defer unsub3() } <-ctx.Done() } // handlePush is called for every successful git push. It finds matching workflow // files, creates run records, and enqueues the first wave of jobs. func (o *Orchestrator) handlePush(evt events.PushEvent) { // Ignore branch deletions (new SHA = all zeros). if evt.After == "" || strings.Repeat("0", len(evt.After)) == evt.After { return } var repo models.Repository if found, _ := o.db.ID(evt.RepoID).Get(&repo); !found { return } workflowPaths, err := ListWorkflows(repo.DiskPath, evt.After) if err != nil || len(workflowPaths) == 0 { return } for _, path := range workflowPaths { wf, err := ParseWorkflow(repo.DiskPath, evt.After, path) if err != nil { log.Printf("orchestrator: parse workflow %s: %v", path, err) continue } if !MatchesPushTrigger(wf, evt.Ref) { continue } if err := o.createRun(repo, evt, wf, path); err != nil { log.Printf("orchestrator: create run for %s: %v", path, err) } } } func (o *Orchestrator) createRun(repo models.Repository, evt events.PushEvent, wf *WorkflowFile, filePath string) error { // Upsert the Pipeline definition record. pipeline := &models.Pipeline{RepoID: repo.ID, FilePath: filePath} has, _ := o.db.Where("repo_id = ? AND file_path = ?", repo.ID, filePath).Get(pipeline) pipeline.Name = wf.Name if pipeline.Name == "" { pipeline.Name = filePath } if has { o.db.ID(pipeline.ID).Cols("name").Update(pipeline) //nolint:errcheck } else { if _, err := o.db.Insert(pipeline); err != nil { return err } } // Validate DAG before writing anything. if _, err := TopoSort(wf.Jobs); err != nil { return err } now := time.Now().UTC() run := &models.PipelineRun{ PipelineID: pipeline.ID, RepoID: repo.ID, TriggerRef: evt.Ref, TriggerSHA: evt.After, TriggeredBy: evt.Pusher, Status: "queued", StartedAt: &now, } if _, err := o.db.Insert(run); err != nil { return err } // Create job + step records for every job in the workflow. for jobName, wfJob := range wf.Jobs { needs := []string(wfJob.Needs) if needs == nil { needs = []string{} } needsJSON, _ := json.Marshal(needs) job := &models.PipelineJob{ RunID: run.ID, Name: jobName, Image: wfJob.RunsOn, Needs: string(needsJSON), Status: "queued", } if _, err := o.db.Insert(job); err != nil { return err } for seq, step := range wfJob.Steps { s := &models.PipelineStep{ JobID: job.ID, Seq: seq, Name: step.Name, RunCmd: step.Run, UsesAction: step.Uses, Status: "queued", } if _, err := o.db.Insert(s); err != nil { return err } } } // Enqueue jobs with no dependencies. o.enqueueReadyJobs(run.ID, wf.Jobs) o.bus.Publish(events.SubjectPipelineTriggered, events.PipelineEvent{ //nolint:errcheck RunID: run.ID, RepoID: repo.ID, Status: "queued", At: now, }) log.Printf("orchestrator: created run %d for %s/%s (%s)", run.ID, repo.Name, filePath, evt.After[:7]) return nil } // advanceDAG is called when a job finishes. It marks the job, checks whether // all jobs are done (completing the run) or enqueues the next wave. func (o *Orchestrator) advanceDAG(runID, jobID int64, result string) { var job models.PipelineJob if found, _ := o.db.ID(jobID).Get(&job); !found { return } now := time.Now().UTC() job.Status = result job.FinishedAt = &now o.db.ID(job.ID).Cols("status", "finished_at").Update(&job) //nolint:errcheck var run models.PipelineRun if found, _ := o.db.ID(runID).Get(&run); !found { return } // Load all jobs for this run to check completion. var allJobs []models.PipelineJob o.db.Where("run_id = ?", runID).Find(&allJobs) // If any job failed, cancel remaining queued jobs and fail the run. if result == "failed" { for _, j := range allJobs { if j.Status == "queued" { j.Status = "skipped" o.db.ID(j.ID).Cols("status").Update(&j) //nolint:errcheck } } run.Status = "failed" run.FinishedAt = &now o.db.ID(run.ID).Cols("status", "finished_at").Update(&run) //nolint:errcheck o.bus.Publish(events.SubjectPipelineFailed, events.PipelineEvent{RunID: run.ID, RepoID: run.RepoID, Status: "failed", At: now}) //nolint:errcheck return } // Check if all jobs are done. allDone := true for _, j := range allJobs { if j.Status != "succeeded" && j.Status != "failed" && j.Status != "skipped" && j.Status != "cancelled" { allDone = false break } } if allDone { run.Status = "succeeded" run.FinishedAt = &now o.db.ID(run.ID).Cols("status", "finished_at").Update(&run) //nolint:errcheck o.bus.Publish(events.SubjectPipelineCompleted, events.PipelineEvent{RunID: run.ID, RepoID: run.RepoID, Status: "succeeded", At: now}) //nolint:errcheck if o.sbomGen != nil { go o.sbomGen.GenerateOnDemand(run.RepoID, run.ID, run.TriggerSHA) } return } // Reload the workflow to get the job dependency graph, then enqueue next wave. var pipeline models.Pipeline if found, _ := o.db.ID(run.PipelineID).Get(&pipeline); !found { return } var repo models.Repository if found, _ := o.db.ID(run.RepoID).Get(&repo); !found { return } wf, err := ParseWorkflow(repo.DiskPath, run.TriggerSHA, pipeline.FilePath) if err != nil { return } o.enqueueReadyJobs(runID, wf.Jobs) } func (o *Orchestrator) enqueueReadyJobs(runID int64, wfJobs map[string]WorkflowJob) { var dbJobs []models.PipelineJob o.db.Where("run_id = ?", runID).Find(&dbJobs) completedNames := make(map[string]bool) enqueuedNames := make(map[string]bool) for _, j := range dbJobs { if j.Status == "succeeded" { completedNames[j.Name] = true } if j.Status == "running" || j.Status == "succeeded" { enqueuedNames[j.Name] = true } } readyNames := ReadyJobs(wfJobs, completedNames, enqueuedNames) for _, name := range readyNames { for _, j := range dbJobs { if j.Name == name && j.Status == "queued" { o.bus.Publish(events.SubjectJobQueued, events.JobEvent{ //nolint:errcheck RunID: runID, JobID: j.ID, }) break } } } } // recoverStaleRuns marks any jobs/runs left in "running" state as failed // (they were interrupted by a previous server crash). func (o *Orchestrator) recoverStaleRuns() { now := time.Now().UTC() o.db.Where("status = 'running'").Cols("status", "finished_at"). Update(&models.PipelineRun{Status: "failed", FinishedAt: &now}) //nolint:errcheck o.db.Where("status = 'running'").Cols("status", "finished_at"). Update(&models.PipelineJob{Status: "failed", FinishedAt: &now}) //nolint:errcheck o.db.Where("status = 'running'").Cols("status", "finished_at"). Update(&models.PipelineStep{Status: "failed", FinishedAt: &now}) //nolint:errcheck }