completed phase 2b

This commit is contained in:
2026-05-11 20:10:45 +02:00
parent 83d96d0a1e
commit 4002a3b84d
20 changed files with 1566 additions and 50 deletions
+292
View File
@@ -0,0 +1,292 @@
package ci
import (
"context"
"encoding/json"
"log"
"strings"
"time"
"xorm.io/xorm"
"github.com/forgeo/forgebucket/internal/events"
"github.com/forgeo/forgebucket/internal/models"
)
// Orchestrator listens for push events, creates pipeline run records, and
// advances the DAG as jobs complete. It does NOT execute jobs directly —
// that is the RunnerManager's responsibility.
type Orchestrator struct {
db *xorm.Engine
bus events.EventBus
}
func NewOrchestrator(db *xorm.Engine, bus events.EventBus) *Orchestrator {
return &Orchestrator{db: db, bus: bus}
}
// Start subscribes to relevant NATS subjects and blocks until ctx is cancelled.
func (o *Orchestrator) Start(ctx context.Context) {
o.recoverStaleRuns()
unsub1, err := o.bus.Subscribe(events.SubjectPushReceived, func(_ string, data []byte) {
var evt events.PushEvent
if err := json.Unmarshal(data, &evt); err != nil {
log.Printf("orchestrator: bad push event: %v", err)
return
}
go o.handlePush(evt)
})
if err != nil {
log.Printf("orchestrator: subscribe push.received: %v", err)
} else {
defer unsub1()
}
unsub2, err := o.bus.Subscribe(events.SubjectJobCompleted, func(_ string, data []byte) {
var evt events.JobEvent
if err := json.Unmarshal(data, &evt); err != nil {
return
}
go o.advanceDAG(evt.RunID, evt.JobID, "succeeded")
})
if err != nil {
log.Printf("orchestrator: subscribe job.completed: %v", err)
} else {
defer unsub2()
}
unsub3, err := o.bus.Subscribe(events.SubjectJobFailed, func(_ string, data []byte) {
var evt events.JobEvent
if err := json.Unmarshal(data, &evt); err != nil {
return
}
go o.advanceDAG(evt.RunID, evt.JobID, "failed")
})
if err != nil {
log.Printf("orchestrator: subscribe job.failed: %v", err)
} else {
defer unsub3()
}
<-ctx.Done()
}
// handlePush is called for every successful git push. It finds matching workflow
// files, creates run records, and enqueues the first wave of jobs.
func (o *Orchestrator) handlePush(evt events.PushEvent) {
// Ignore branch deletions (new SHA = all zeros).
if evt.After == "" || strings.Repeat("0", len(evt.After)) == evt.After {
return
}
var repo models.Repository
if found, _ := o.db.ID(evt.RepoID).Get(&repo); !found {
return
}
workflowPaths, err := ListWorkflows(repo.DiskPath, evt.After)
if err != nil || len(workflowPaths) == 0 {
return
}
for _, path := range workflowPaths {
wf, err := ParseWorkflow(repo.DiskPath, evt.After, path)
if err != nil {
log.Printf("orchestrator: parse workflow %s: %v", path, err)
continue
}
if !MatchesPushTrigger(wf, evt.Ref) {
continue
}
if err := o.createRun(repo, evt, wf, path); err != nil {
log.Printf("orchestrator: create run for %s: %v", path, err)
}
}
}
func (o *Orchestrator) createRun(repo models.Repository, evt events.PushEvent, wf *WorkflowFile, filePath string) error {
// Upsert the Pipeline definition record.
pipeline := &models.Pipeline{RepoID: repo.ID, FilePath: filePath}
has, _ := o.db.Where("repo_id = ? AND file_path = ?", repo.ID, filePath).Get(pipeline)
pipeline.Name = wf.Name
if pipeline.Name == "" {
pipeline.Name = filePath
}
if has {
o.db.ID(pipeline.ID).Cols("name").Update(pipeline) //nolint:errcheck
} else {
if _, err := o.db.Insert(pipeline); err != nil {
return err
}
}
// Validate DAG before writing anything.
if _, err := TopoSort(wf.Jobs); err != nil {
return err
}
now := time.Now().UTC()
run := &models.PipelineRun{
PipelineID: pipeline.ID,
RepoID: repo.ID,
TriggerRef: evt.Ref,
TriggerSHA: evt.After,
TriggeredBy: evt.Pusher,
Status: "queued",
StartedAt: &now,
}
if _, err := o.db.Insert(run); err != nil {
return err
}
// Create job + step records for every job in the workflow.
for jobName, wfJob := range wf.Jobs {
needsJSON, _ := json.Marshal([]string(wfJob.Needs))
job := &models.PipelineJob{
RunID: run.ID,
Name: jobName,
Image: wfJob.RunsOn,
Needs: string(needsJSON),
Status: "queued",
}
if _, err := o.db.Insert(job); err != nil {
return err
}
for seq, step := range wfJob.Steps {
s := &models.PipelineStep{
JobID: job.ID,
Seq: seq,
Name: step.Name,
RunCmd: step.Run,
UsesAction: step.Uses,
Status: "queued",
}
if _, err := o.db.Insert(s); err != nil {
return err
}
}
}
// Enqueue jobs with no dependencies.
o.enqueueReadyJobs(run.ID, wf.Jobs)
o.bus.Publish(events.SubjectPipelineTriggered, events.PipelineEvent{ //nolint:errcheck
RunID: run.ID,
RepoID: repo.ID,
Status: "queued",
At: now,
})
log.Printf("orchestrator: created run %d for %s/%s (%s)", run.ID, repo.Name, filePath, evt.After[:7])
return nil
}
// advanceDAG is called when a job finishes. It marks the job, checks whether
// all jobs are done (completing the run) or enqueues the next wave.
func (o *Orchestrator) advanceDAG(runID, jobID int64, result string) {
var job models.PipelineJob
if found, _ := o.db.ID(jobID).Get(&job); !found {
return
}
now := time.Now().UTC()
job.Status = result
job.FinishedAt = &now
o.db.ID(job.ID).Cols("status", "finished_at").Update(&job) //nolint:errcheck
var run models.PipelineRun
if found, _ := o.db.ID(runID).Get(&run); !found {
return
}
// Load all jobs for this run to check completion.
var allJobs []models.PipelineJob
o.db.Where("run_id = ?", runID).Find(&allJobs)
// If any job failed, cancel remaining queued jobs and fail the run.
if result == "failed" {
for _, j := range allJobs {
if j.Status == "queued" {
j.Status = "skipped"
o.db.ID(j.ID).Cols("status").Update(&j) //nolint:errcheck
}
}
run.Status = "failed"
run.FinishedAt = &now
o.db.ID(run.ID).Cols("status", "finished_at").Update(&run) //nolint:errcheck
o.bus.Publish(events.SubjectPipelineFailed, events.PipelineEvent{RunID: run.ID, RepoID: run.RepoID, Status: "failed", At: now}) //nolint:errcheck
return
}
// Check if all jobs are done.
allDone := true
for _, j := range allJobs {
if j.Status != "succeeded" && j.Status != "failed" && j.Status != "skipped" && j.Status != "cancelled" {
allDone = false
break
}
}
if allDone {
run.Status = "succeeded"
run.FinishedAt = &now
o.db.ID(run.ID).Cols("status", "finished_at").Update(&run) //nolint:errcheck
o.bus.Publish(events.SubjectPipelineCompleted, events.PipelineEvent{RunID: run.ID, RepoID: run.RepoID, Status: "succeeded", At: now}) //nolint:errcheck
return
}
// Reload the workflow to get the job dependency graph, then enqueue next wave.
var pipeline models.Pipeline
if found, _ := o.db.ID(run.PipelineID).Get(&pipeline); !found {
return
}
var repo models.Repository
if found, _ := o.db.ID(run.RepoID).Get(&repo); !found {
return
}
wf, err := ParseWorkflow(repo.DiskPath, run.TriggerSHA, pipeline.FilePath)
if err != nil {
return
}
o.enqueueReadyJobs(runID, wf.Jobs)
}
func (o *Orchestrator) enqueueReadyJobs(runID int64, wfJobs map[string]WorkflowJob) {
var dbJobs []models.PipelineJob
o.db.Where("run_id = ?", runID).Find(&dbJobs)
completedNames := make(map[string]bool)
enqueuedNames := make(map[string]bool)
for _, j := range dbJobs {
if j.Status == "succeeded" {
completedNames[j.Name] = true
}
if j.Status == "running" || j.Status == "succeeded" {
enqueuedNames[j.Name] = true
}
}
readyNames := ReadyJobs(wfJobs, completedNames, enqueuedNames)
for _, name := range readyNames {
for _, j := range dbJobs {
if j.Name == name && j.Status == "queued" {
o.bus.Publish(events.SubjectJobQueued, events.JobEvent{ //nolint:errcheck
RunID: runID,
JobID: j.ID,
})
break
}
}
}
}
// recoverStaleRuns marks any jobs/runs left in "running" state as failed
// (they were interrupted by a previous server crash).
func (o *Orchestrator) recoverStaleRuns() {
now := time.Now().UTC()
o.db.Where("status = 'running'").Cols("status", "finished_at").
Update(&models.PipelineRun{Status: "failed", FinishedAt: &now}) //nolint:errcheck
o.db.Where("status = 'running'").Cols("status", "finished_at").
Update(&models.PipelineJob{Status: "failed", FinishedAt: &now}) //nolint:errcheck
o.db.Where("status = 'running'").Cols("status", "finished_at").
Update(&models.PipelineStep{Status: "failed", FinishedAt: &now}) //nolint:errcheck
}