package ci import ( "context" "encoding/json" "log" "xorm.io/xorm" "github.com/forgeo/forgebucket/internal/config" "github.com/forgeo/forgebucket/internal/events" ) // RunnerManager subscribes to job.queued events and dispatches them to the // local Docker executor. A semaphore limits concurrent executions. type RunnerManager struct { db *xorm.Engine bus events.EventBus cfg *config.Config sem chan struct{} } func NewRunnerManager(db *xorm.Engine, bus events.EventBus, cfg *config.Config, maxConcurrent int) *RunnerManager { if maxConcurrent <= 0 { maxConcurrent = 4 } return &RunnerManager{ db: db, bus: bus, cfg: cfg, sem: make(chan struct{}, maxConcurrent), } } // Start subscribes to job.queued and dispatches executions until ctx is cancelled. func (m *RunnerManager) Start(ctx context.Context) { if !IsDockerAvailable() { log.Printf("runner: Docker not available — CI execution disabled") <-ctx.Done() return } log.Printf("runner: started (max concurrent jobs: %d)", cap(m.sem)) wsDir := workspaceDir(m.cfg.ArtifactRoot) unsub, err := m.bus.Subscribe(events.SubjectJobQueued, func(_ string, data []byte) { var evt events.JobEvent if err := json.Unmarshal(data, &evt); err != nil { log.Printf("runner: bad job.queued payload: %v", err) return } jc, ok := buildJobContext(m.db, evt.JobID, m.cfg.SessionSecret) if !ok { log.Printf("runner: could not build job context for job %d", evt.JobID) return } // Acquire semaphore slot — blocks if at capacity. select { case m.sem <- struct{}{}: case <-ctx.Done(): return } go func() { defer func() { <-m.sem }() // Sanitize the Docker image name before execution. jc.Job.Image = sanitizeImage(jc.Job.Image) ExecuteJob(ctx, m.db, m.bus, jc, wsDir) }() }) if err != nil { log.Printf("runner: subscribe job.queued: %v", err) <-ctx.Done() return } defer unsub() <-ctx.Done() log.Printf("runner: stopping — draining %d active jobs", len(m.sem)) // Wait for all running jobs to finish by filling the semaphore. for i := 0; i < cap(m.sem); i++ { m.sem <- struct{}{} } }