302 lines
8.4 KiB
Go
302 lines
8.4 KiB
Go
package ci
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"log"
|
|
"strings"
|
|
"time"
|
|
|
|
"xorm.io/xorm"
|
|
|
|
"github.com/forgeo/forgebucket/internal/domain/sbom"
|
|
"github.com/forgeo/forgebucket/internal/events"
|
|
"github.com/forgeo/forgebucket/internal/models"
|
|
)
|
|
|
|
// Orchestrator listens for push events, creates pipeline run records, and
|
|
// advances the DAG as jobs complete. It does NOT execute jobs directly —
|
|
// that is the RunnerManager's responsibility.
|
|
type Orchestrator struct {
|
|
db *xorm.Engine
|
|
bus events.EventBus
|
|
sbomGen *sbom.Generator
|
|
}
|
|
|
|
func NewOrchestrator(db *xorm.Engine, bus events.EventBus, sbomGen *sbom.Generator) *Orchestrator {
|
|
return &Orchestrator{db: db, bus: bus, sbomGen: sbomGen}
|
|
}
|
|
|
|
// Start subscribes to relevant NATS subjects and blocks until ctx is cancelled.
|
|
func (o *Orchestrator) Start(ctx context.Context) {
|
|
o.recoverStaleRuns()
|
|
|
|
unsub1, err := o.bus.Subscribe(events.SubjectPushReceived, func(_ string, data []byte) {
|
|
var evt events.PushEvent
|
|
if err := json.Unmarshal(data, &evt); err != nil {
|
|
log.Printf("orchestrator: bad push event: %v", err)
|
|
return
|
|
}
|
|
go o.handlePush(evt)
|
|
})
|
|
if err != nil {
|
|
log.Printf("orchestrator: subscribe push.received: %v", err)
|
|
} else {
|
|
defer unsub1()
|
|
}
|
|
|
|
unsub2, err := o.bus.Subscribe(events.SubjectJobCompleted, func(_ string, data []byte) {
|
|
var evt events.JobEvent
|
|
if err := json.Unmarshal(data, &evt); err != nil {
|
|
return
|
|
}
|
|
go o.advanceDAG(evt.RunID, evt.JobID, "succeeded")
|
|
})
|
|
if err != nil {
|
|
log.Printf("orchestrator: subscribe job.completed: %v", err)
|
|
} else {
|
|
defer unsub2()
|
|
}
|
|
|
|
unsub3, err := o.bus.Subscribe(events.SubjectJobFailed, func(_ string, data []byte) {
|
|
var evt events.JobEvent
|
|
if err := json.Unmarshal(data, &evt); err != nil {
|
|
return
|
|
}
|
|
go o.advanceDAG(evt.RunID, evt.JobID, "failed")
|
|
})
|
|
if err != nil {
|
|
log.Printf("orchestrator: subscribe job.failed: %v", err)
|
|
} else {
|
|
defer unsub3()
|
|
}
|
|
|
|
<-ctx.Done()
|
|
}
|
|
|
|
// handlePush is called for every successful git push. It finds matching workflow
|
|
// files, creates run records, and enqueues the first wave of jobs.
|
|
func (o *Orchestrator) handlePush(evt events.PushEvent) {
|
|
// Ignore branch deletions (new SHA = all zeros).
|
|
if evt.After == "" || strings.Repeat("0", len(evt.After)) == evt.After {
|
|
return
|
|
}
|
|
|
|
var repo models.Repository
|
|
if found, _ := o.db.ID(evt.RepoID).Get(&repo); !found {
|
|
return
|
|
}
|
|
|
|
workflowPaths, err := ListWorkflows(repo.DiskPath, evt.After)
|
|
if err != nil || len(workflowPaths) == 0 {
|
|
return
|
|
}
|
|
|
|
for _, path := range workflowPaths {
|
|
wf, err := ParseWorkflow(repo.DiskPath, evt.After, path)
|
|
if err != nil {
|
|
log.Printf("orchestrator: parse workflow %s: %v", path, err)
|
|
continue
|
|
}
|
|
if !MatchesPushTrigger(wf, evt.Ref) {
|
|
continue
|
|
}
|
|
if err := o.createRun(repo, evt, wf, path); err != nil {
|
|
log.Printf("orchestrator: create run for %s: %v", path, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (o *Orchestrator) createRun(repo models.Repository, evt events.PushEvent, wf *WorkflowFile, filePath string) error {
|
|
// Upsert the Pipeline definition record.
|
|
pipeline := &models.Pipeline{RepoID: repo.ID, FilePath: filePath}
|
|
has, _ := o.db.Where("repo_id = ? AND file_path = ?", repo.ID, filePath).Get(pipeline)
|
|
pipeline.Name = wf.Name
|
|
if pipeline.Name == "" {
|
|
pipeline.Name = filePath
|
|
}
|
|
if has {
|
|
o.db.ID(pipeline.ID).Cols("name").Update(pipeline) //nolint:errcheck
|
|
} else {
|
|
if _, err := o.db.Insert(pipeline); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Validate DAG before writing anything.
|
|
if _, err := TopoSort(wf.Jobs); err != nil {
|
|
return err
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
run := &models.PipelineRun{
|
|
PipelineID: pipeline.ID,
|
|
RepoID: repo.ID,
|
|
TriggerRef: evt.Ref,
|
|
TriggerSHA: evt.After,
|
|
TriggeredBy: evt.Pusher,
|
|
Status: "queued",
|
|
StartedAt: &now,
|
|
}
|
|
if _, err := o.db.Insert(run); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create job + step records for every job in the workflow.
|
|
for jobName, wfJob := range wf.Jobs {
|
|
needs := []string(wfJob.Needs)
|
|
if needs == nil {
|
|
needs = []string{}
|
|
}
|
|
needsJSON, _ := json.Marshal(needs)
|
|
job := &models.PipelineJob{
|
|
RunID: run.ID,
|
|
Name: jobName,
|
|
Image: wfJob.RunsOn,
|
|
Needs: string(needsJSON),
|
|
Status: "queued",
|
|
}
|
|
if _, err := o.db.Insert(job); err != nil {
|
|
return err
|
|
}
|
|
for seq, step := range wfJob.Steps {
|
|
s := &models.PipelineStep{
|
|
JobID: job.ID,
|
|
Seq: seq,
|
|
Name: step.Name,
|
|
RunCmd: step.Run,
|
|
UsesAction: step.Uses,
|
|
Status: "queued",
|
|
}
|
|
if _, err := o.db.Insert(s); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Enqueue jobs with no dependencies.
|
|
o.enqueueReadyJobs(run.ID, wf.Jobs)
|
|
|
|
o.bus.Publish(events.SubjectPipelineTriggered, events.PipelineEvent{ //nolint:errcheck
|
|
RunID: run.ID,
|
|
RepoID: repo.ID,
|
|
Status: "queued",
|
|
At: now,
|
|
})
|
|
|
|
log.Printf("orchestrator: created run %d for %s/%s (%s)", run.ID, repo.Name, filePath, evt.After[:7])
|
|
return nil
|
|
}
|
|
|
|
// advanceDAG is called when a job finishes. It marks the job, checks whether
|
|
// all jobs are done (completing the run) or enqueues the next wave.
|
|
func (o *Orchestrator) advanceDAG(runID, jobID int64, result string) {
|
|
var job models.PipelineJob
|
|
if found, _ := o.db.ID(jobID).Get(&job); !found {
|
|
return
|
|
}
|
|
now := time.Now().UTC()
|
|
job.Status = result
|
|
job.FinishedAt = &now
|
|
o.db.ID(job.ID).Cols("status", "finished_at").Update(&job) //nolint:errcheck
|
|
|
|
var run models.PipelineRun
|
|
if found, _ := o.db.ID(runID).Get(&run); !found {
|
|
return
|
|
}
|
|
|
|
// Load all jobs for this run to check completion.
|
|
var allJobs []models.PipelineJob
|
|
o.db.Where("run_id = ?", runID).Find(&allJobs)
|
|
|
|
// If any job failed, cancel remaining queued jobs and fail the run.
|
|
if result == "failed" {
|
|
for _, j := range allJobs {
|
|
if j.Status == "queued" {
|
|
j.Status = "skipped"
|
|
o.db.ID(j.ID).Cols("status").Update(&j) //nolint:errcheck
|
|
}
|
|
}
|
|
run.Status = "failed"
|
|
run.FinishedAt = &now
|
|
o.db.ID(run.ID).Cols("status", "finished_at").Update(&run) //nolint:errcheck
|
|
o.bus.Publish(events.SubjectPipelineFailed, events.PipelineEvent{RunID: run.ID, RepoID: run.RepoID, Status: "failed", At: now}) //nolint:errcheck
|
|
return
|
|
}
|
|
|
|
// Check if all jobs are done.
|
|
allDone := true
|
|
for _, j := range allJobs {
|
|
if j.Status != "succeeded" && j.Status != "failed" && j.Status != "skipped" && j.Status != "cancelled" {
|
|
allDone = false
|
|
break
|
|
}
|
|
}
|
|
if allDone {
|
|
run.Status = "succeeded"
|
|
run.FinishedAt = &now
|
|
o.db.ID(run.ID).Cols("status", "finished_at").Update(&run) //nolint:errcheck
|
|
o.bus.Publish(events.SubjectPipelineCompleted, events.PipelineEvent{RunID: run.ID, RepoID: run.RepoID, Status: "succeeded", At: now}) //nolint:errcheck
|
|
if o.sbomGen != nil {
|
|
go o.sbomGen.GenerateOnDemand(run.RepoID, run.ID, run.TriggerSHA)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Reload the workflow to get the job dependency graph, then enqueue next wave.
|
|
var pipeline models.Pipeline
|
|
if found, _ := o.db.ID(run.PipelineID).Get(&pipeline); !found {
|
|
return
|
|
}
|
|
var repo models.Repository
|
|
if found, _ := o.db.ID(run.RepoID).Get(&repo); !found {
|
|
return
|
|
}
|
|
wf, err := ParseWorkflow(repo.DiskPath, run.TriggerSHA, pipeline.FilePath)
|
|
if err != nil {
|
|
return
|
|
}
|
|
o.enqueueReadyJobs(runID, wf.Jobs)
|
|
}
|
|
|
|
func (o *Orchestrator) enqueueReadyJobs(runID int64, wfJobs map[string]WorkflowJob) {
|
|
var dbJobs []models.PipelineJob
|
|
o.db.Where("run_id = ?", runID).Find(&dbJobs)
|
|
|
|
completedNames := make(map[string]bool)
|
|
enqueuedNames := make(map[string]bool)
|
|
for _, j := range dbJobs {
|
|
if j.Status == "succeeded" {
|
|
completedNames[j.Name] = true
|
|
}
|
|
if j.Status == "running" || j.Status == "succeeded" {
|
|
enqueuedNames[j.Name] = true
|
|
}
|
|
}
|
|
|
|
readyNames := ReadyJobs(wfJobs, completedNames, enqueuedNames)
|
|
for _, name := range readyNames {
|
|
for _, j := range dbJobs {
|
|
if j.Name == name && j.Status == "queued" {
|
|
o.bus.Publish(events.SubjectJobQueued, events.JobEvent{ //nolint:errcheck
|
|
RunID: runID,
|
|
JobID: j.ID,
|
|
})
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// recoverStaleRuns marks any jobs/runs left in "running" state as failed
|
|
// (they were interrupted by a previous server crash).
|
|
func (o *Orchestrator) recoverStaleRuns() {
|
|
now := time.Now().UTC()
|
|
o.db.Where("status = 'running'").Cols("status", "finished_at").
|
|
Update(&models.PipelineRun{Status: "failed", FinishedAt: &now}) //nolint:errcheck
|
|
o.db.Where("status = 'running'").Cols("status", "finished_at").
|
|
Update(&models.PipelineJob{Status: "failed", FinishedAt: &now}) //nolint:errcheck
|
|
o.db.Where("status = 'running'").Cols("status", "finished_at").
|
|
Update(&models.PipelineStep{Status: "failed", FinishedAt: &now}) //nolint:errcheck
|
|
}
|