package ci import ( "bufio" "context" "fmt" "os" "os/exec" "path/filepath" "strings" "time" "xorm.io/xorm" "github.com/forgeo/forgebucket/internal/events" "github.com/forgeo/forgebucket/internal/models" ) // JobContext holds everything needed to execute a single pipeline job. type JobContext struct { Run models.PipelineRun Job models.PipelineJob Steps []models.PipelineStep Repo models.Repository Secrets map[string]string // resolved secret key→value map (Env > Repo > Workspace > Global) } // ExecuteJob runs all steps of a job inside isolated Docker containers, // streams log output to NATS and the DB, then publishes job.completed or job.failed. func ExecuteJob(ctx context.Context, db *xorm.Engine, bus events.EventBus, jc JobContext, workspaceRoot string) { now := time.Now().UTC() jc.Job.Status = "running" jc.Job.StartedAt = &now db.ID(jc.Job.ID).Cols("status", "started_at").Update(&jc.Job) //nolint:errcheck // Extract repo snapshot into a workspace directory. workDir, err := extractWorkspace(jc.Repo.DiskPath, jc.Run.TriggerSHA, workspaceRoot, jc.Run.ID) if err != nil { failJob(db, bus, jc, fmt.Sprintf("workspace setup failed: %v", err)) return } defer os.RemoveAll(workDir) image := jc.Job.Image if image == "" { image = "ubuntu:22.04" } // Pull image once per job (non-fatal if pull fails and image exists locally). pullCmd := exec.CommandContext(ctx, "docker", "pull", image) pullCmd.Run() //nolint:errcheck for i := range jc.Steps { step := &jc.Steps[i] if step.UsesAction == "checkout" { // Built-in checkout: workspace is already set up by extractWorkspace. markStep(db, step, "succeeded", 0) continue } if step.RunCmd == "" { markStep(db, step, "skipped", 0) continue } exitCode, err := runStep(ctx, db, bus, jc.Run.ID, jc.Job.ID, step, image, workDir, jc.Secrets) if err != nil || exitCode != 0 { if exitCode == 0 { exitCode = 1 } markStep(db, step, "failed", exitCode) failJob(db, bus, jc, fmt.Sprintf("step %q exited %d", step.Name, exitCode)) return } markStep(db, step, "succeeded", 0) } fin := time.Now().UTC() jc.Job.Status = "succeeded" jc.Job.FinishedAt = &fin db.ID(jc.Job.ID).Cols("status", "finished_at").Update(&jc.Job) //nolint:errcheck bus.Publish(events.SubjectJobCompleted, events.JobEvent{ //nolint:errcheck RunID: jc.Run.ID, JobID: jc.Job.ID, Status: "succeeded", At: fin, }) } // runStep runs a single shell-command step inside a Docker container. func runStep(ctx context.Context, db *xorm.Engine, bus events.EventBus, runID, jobID int64, step *models.PipelineStep, image, workDir string, secrets map[string]string) (int, error) { now := time.Now().UTC() step.Status = "running" step.StartedAt = &now db.ID(step.ID).Cols("status", "started_at").Update(step) //nolint:errcheck // Build docker args: base flags + one --env per secret. args := []string{"run", "--rm", "-v", workDir + ":/workspace", "-w", "/workspace", "--network=none", } for k, v := range secrets { args = append(args, "--env", k+"="+v) } args = append(args, image, "/bin/sh", "-ec", step.RunCmd) cmd := exec.CommandContext(ctx, "docker", args...) stdout, err := cmd.StdoutPipe() if err != nil { return 1, err } cmd.Stderr = cmd.Stdout // merge stderr into stdout if err := cmd.Start(); err != nil { return 1, fmt.Errorf("docker run: %w", err) } chunk := 0 scanner := bufio.NewScanner(stdout) for scanner.Scan() { line := scanner.Text() + "\n" writeLogChunk(db, bus, runID, jobID, step.ID, chunk, line) chunk++ } exitCode := 0 if err := cmd.Wait(); err != nil { if exitErr, ok := err.(*exec.ExitError); ok { exitCode = exitErr.ExitCode() } else { exitCode = 1 } } return exitCode, nil } // extractWorkspace uses git-archive to export the repo at a given SHA into a // temporary directory under workspaceRoot. func extractWorkspace(repoPath, sha, workspaceRoot string, runID int64) (string, error) { dir := filepath.Join(workspaceRoot, fmt.Sprintf("run-%d", runID)) if err := os.MkdirAll(dir, 0755); err != nil { return "", err } archive := exec.Command("git", "archive", "--format=tar", sha) archive.Dir = repoPath archive.Env = []string{"GIT_TERMINAL_PROMPT=0", "HOME=/tmp"} tar := exec.Command("tar", "-x", "-C", dir) tar.Stdin, _ = archive.StdoutPipe() if err := archive.Start(); err != nil { os.RemoveAll(dir) return "", fmt.Errorf("git archive: %w", err) } if err := tar.Start(); err != nil { archive.Process.Kill() //nolint:errcheck os.RemoveAll(dir) return "", fmt.Errorf("tar: %w", err) } archiveErr := archive.Wait() tarErr := tar.Wait() if archiveErr != nil { os.RemoveAll(dir) return "", fmt.Errorf("git archive wait: %w", archiveErr) } if tarErr != nil { os.RemoveAll(dir) return "", fmt.Errorf("tar wait: %w", tarErr) } return dir, nil } func writeLogChunk(db *xorm.Engine, bus events.EventBus, runID, jobID, stepID int64, idx int, content string) { entry := &models.PipelineStepLog{ StepID: stepID, ChunkIndex: idx, Content: content, } db.Insert(entry) //nolint:errcheck bus.Publish(events.SubjectPipelineLog, events.LogChunkEvent{ //nolint:errcheck RunID: runID, JobID: jobID, StepID: stepID, ChunkIndex: idx, Content: content, }) } func markStep(db *xorm.Engine, step *models.PipelineStep, status string, exitCode int) { now := time.Now().UTC() step.Status = status step.ExitCode = exitCode step.FinishedAt = &now db.ID(step.ID).Cols("status", "exit_code", "finished_at").Update(step) //nolint:errcheck } func failJob(db *xorm.Engine, bus events.EventBus, jc JobContext, reason string) { now := time.Now().UTC() jc.Job.Status = "failed" jc.Job.FinishedAt = &now db.ID(jc.Job.ID).Cols("status", "finished_at").Update(&jc.Job) //nolint:errcheck // Write the failure reason as a synthetic log line. var lastStep models.PipelineStep if found, _ := db.Where("job_id = ?", jc.Job.ID).Desc("seq").Get(&lastStep); found { writeLogChunk(db, bus, jc.Run.ID, jc.Job.ID, lastStep.ID, 0, "\n[ForgeBucket] Job failed: "+reason+"\n") } bus.Publish(events.SubjectJobFailed, events.JobEvent{ //nolint:errcheck RunID: jc.Run.ID, JobID: jc.Job.ID, Status: "failed", At: now, }) } // workspaceDir returns the scratch directory root for CI job workspaces. func workspaceDir(artifactRoot string) string { return filepath.Join(filepath.Dir(artifactRoot), "ci-workspaces") } // IsDockerAvailable checks whether the docker CLI is reachable. func IsDockerAvailable() bool { cmd := exec.Command("docker", "info") cmd.Env = []string{"HOME=/tmp"} return cmd.Run() == nil } // stepsForJob loads PipelineStep rows for a job ordered by seq. func stepsForJob(db *xorm.Engine, jobID int64) ([]models.PipelineStep, error) { var steps []models.PipelineStep err := db.Where("job_id = ?", jobID).Asc("seq").Find(&steps) return steps, err } // repoForRun loads the Repository for a given run. func repoForRun(db *xorm.Engine, runID int64) (models.Repository, models.PipelineRun, bool) { var run models.PipelineRun if found, _ := db.ID(runID).Get(&run); !found { return models.Repository{}, run, false } var repo models.Repository if found, _ := db.ID(run.RepoID).Get(&repo); !found { return models.Repository{}, run, false } return repo, run, true } // buildJobContext assembles a JobContext from DB rows and resolves the secret // hierarchy (Env > Repo > Workspace > Global) for injection into docker run. func buildJobContext(db *xorm.Engine, jobID int64, sessionSecret string) (JobContext, bool) { var job models.PipelineJob if found, _ := db.ID(jobID).Get(&job); !found { return JobContext{}, false } repo, run, ok := repoForRun(db, job.RunID) if !ok { return JobContext{}, false } steps, err := stepsForJob(db, jobID) if err != nil { return JobContext{}, false } // Determine workspace ID (0 if user-owned repo). var wsID int64 if repo.WorkspaceID != nil { wsID = *repo.WorkspaceID } secrets := resolveSecrets(db, repo.ID, wsID, 0, sessionSecret) return JobContext{Run: run, Job: job, Steps: steps, Repo: repo, Secrets: secrets}, true } // resolveSecrets builds a merged key→plaintext map respecting hierarchy: // Global < Workspace < Repo < Env (last writer wins per key). func resolveSecrets(db *xorm.Engine, repoID, workspaceID, envID int64, sessionSecret string) map[string]string { out := map[string]string{} load := func(scope models.SecretScope, scopeID int64) { var secrets []models.Secret db.Where("scope = ? AND scope_id = ?", scope, scopeID).Find(&secrets) for _, s := range secrets { // Higher-priority scopes loaded later — simply overwrite. if pt, err := models.DecryptSecret(s.EncryptedValue, sessionSecret); err == nil { out[s.Name] = pt } } } load(models.SecretScopeGlobal, 0) if workspaceID != 0 { load(models.SecretScopeWorkspace, workspaceID) } load(models.SecretScopeRepo, repoID) if envID != 0 { load(models.SecretScopeEnv, envID) } return out } // pipeForRun returns the longest-matching step label for an image. // Phase 2B: unused placeholder for future label matching. func pipeForRun(_ string) string { return "" } // sanitizeImage prevents injection in docker image names. func sanitizeImage(image string) string { // Allow only characters valid in Docker image references. var b strings.Builder for _, c := range image { if (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || c == '.' || c == '-' || c == '_' || c == '/' || c == ':' || c == '@' { b.WriteRune(c) } } return b.String() }