From 4002a3b84dcadb715cef896405f40ae35792fc0c Mon Sep 17 00:00:00 2001 From: erangel1 Date: Mon, 11 May 2026 20:10:45 +0200 Subject: [PATCH] completed phase 2b --- .env | 2 +- cmd/forgebucket/main.go | 17 +- go.mod | 1 + internal/api/handlers/artifacts.go | 186 ++++++++++++++++ internal/api/handlers/githttp.go | 70 ++++-- internal/api/handlers/pipelines.go | 210 ++++++++++++++++-- internal/api/handlers/runners.go | 94 ++++++++ internal/api/router.go | 28 ++- internal/config/config.go | 12 +- internal/domain/ci/dag.go | 78 +++++++ internal/domain/ci/executor.go | 271 +++++++++++++++++++++++ internal/domain/ci/orchestrator.go | 292 +++++++++++++++++++++++++ internal/domain/ci/parser.go | 79 +++++++ internal/domain/ci/runner_manager.go | 86 ++++++++ internal/domain/ci/types.go | 58 +++++ internal/events/subjects.go | 1 + internal/events/types.go | 22 ++ internal/models/migrations/001_init.go | 5 +- internal/models/migrations/009_ci.go | 18 ++ internal/models/pipeline.go | 86 ++++++++ 20 files changed, 1566 insertions(+), 50 deletions(-) create mode 100644 internal/api/handlers/artifacts.go create mode 100644 internal/api/handlers/runners.go create mode 100644 internal/domain/ci/dag.go create mode 100644 internal/domain/ci/executor.go create mode 100644 internal/domain/ci/orchestrator.go create mode 100644 internal/domain/ci/parser.go create mode 100644 internal/domain/ci/runner_manager.go create mode 100644 internal/domain/ci/types.go create mode 100644 internal/models/migrations/009_ci.go create mode 100644 internal/models/pipeline.go diff --git a/.env b/.env index db03ffd..16ff265 100644 --- a/.env +++ b/.env @@ -1,6 +1,6 @@ # ─── Required ─────────────────────────────────────────────────────────────── # PostgreSQL connection string -DATABASE_URL=postgres://forgebucket:password@localhost:5432/forgebucket?sslmode=disable +DATABASE_URL=postgres://forgebucket:password@postgres:5432/forgebucket?sslmode=disable # Session cookie signing key — must be at least 32 characters # Generate: openssl rand -hex 32 diff --git a/cmd/forgebucket/main.go b/cmd/forgebucket/main.go index fb79515..1c2a21b 100644 --- a/cmd/forgebucket/main.go +++ b/cmd/forgebucket/main.go @@ -17,6 +17,7 @@ import ( "github.com/forgeo/forgebucket/internal/api" "github.com/forgeo/forgebucket/internal/config" "github.com/forgeo/forgebucket/internal/db" + "github.com/forgeo/forgebucket/internal/domain/ci" gitdomain "github.com/forgeo/forgebucket/internal/domain/git" "github.com/forgeo/forgebucket/internal/events" "github.com/forgeo/forgebucket/internal/models/migrations" @@ -43,6 +44,10 @@ func main() { gitdomain.SetRepoRoot(cfg.RepoRoot) + if err := os.MkdirAll(cfg.ArtifactRoot, 0755); err != nil { + log.Fatalf("artifact root: %v", err) + } + bus, err := events.New(cfg.NATSUrl) if err != nil { log.Fatalf("events: %v", err) @@ -58,7 +63,17 @@ func main() { SameSite: http.SameSiteLaxMode, } - handler := api.New(cfg, engine, store, bus, web.FS()) + // Start CI orchestrator and runner manager in background goroutines. + ciCtx, ciCancel := context.WithCancel(context.Background()) + defer ciCancel() + + orchestrator := ci.NewOrchestrator(engine, bus) + go orchestrator.Start(ciCtx) + + runnerMgr := ci.NewRunnerManager(engine, bus, cfg, 4) + go runnerMgr.Start(ciCtx) + + handler := api.New(cfg, engine, store, bus, cfg.ArtifactRoot, web.FS()) srv := &http.Server{ Addr: fmt.Sprintf(":%s", cfg.Port), diff --git a/go.mod b/go.mod index 8043ce5..ee0c757 100644 --- a/go.mod +++ b/go.mod @@ -24,5 +24,6 @@ require ( github.com/syndtr/goleveldb v1.0.0 // indirect golang.org/x/sys v0.43.0 // indirect golang.org/x/tools v0.43.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect xorm.io/builder v0.3.13 // indirect ) diff --git a/internal/api/handlers/artifacts.go b/internal/api/handlers/artifacts.go new file mode 100644 index 0000000..8e7c4ee --- /dev/null +++ b/internal/api/handlers/artifacts.go @@ -0,0 +1,186 @@ +package handlers + +import ( + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strconv" + + "github.com/go-chi/chi/v5" + "xorm.io/xorm" + + "github.com/forgeo/forgebucket/internal/models" +) + +type ArtifactHandler struct { + db *xorm.Engine + artifactRoot string +} + +func NewArtifactHandler(db *xorm.Engine, artifactRoot string) *ArtifactHandler { + return &ArtifactHandler{db: db, artifactRoot: artifactRoot} +} + +// ListArtifacts returns all artifacts for a pipeline run. +func (h *ArtifactHandler) List(w http.ResponseWriter, r *http.Request) { + repoID, runID, ok := h.resolveRunIDs(w, r) + if !ok { + return + } + var artifacts []models.Artifact + if err := h.db.Where("run_id = ? AND repo_id = ?", runID, repoID).Find(&artifacts); err != nil { + jsonError(w, "could not list artifacts", http.StatusInternalServerError) + return + } + if artifacts == nil { + artifacts = []models.Artifact{} + } + jsonOK(w, artifacts) +} + +// Upload accepts a multipart file upload and stores it as an artifact. +// Callers must provide a valid Bearer access token with write scope (runner auth). +func (h *ArtifactHandler) Upload(w http.ResponseWriter, r *http.Request) { + repoID, runID, ok := h.resolveRunIDs(w, r) + if !ok { + return + } + + r.Body = http.MaxBytesReader(w, r.Body, 512<<20) // 512 MB max + if err := r.ParseMultipartForm(32 << 20); err != nil { + jsonError(w, "multipart parse failed", http.StatusBadRequest) + return + } + file, header, err := r.FormFile("file") + if err != nil { + jsonError(w, "file field is required", http.StatusBadRequest) + return + } + defer file.Close() + + name := r.FormValue("name") + if name == "" { + name = header.Filename + } + + // Sanitize name: no path separators. + for _, c := range []byte(name) { + if c == '/' || c == '\\' || c == 0 { + jsonError(w, "artifact name must not contain path separators", http.StatusBadRequest) + return + } + } + + dir := filepath.Join(h.artifactRoot, fmt.Sprintf("%d", runID)) + if err := os.MkdirAll(dir, 0755); err != nil { + jsonError(w, "could not create storage directory", http.StatusInternalServerError) + return + } + + storagePath := filepath.Join(dir, name) + dst, err := os.Create(storagePath) + if err != nil { + jsonError(w, "could not create file", http.StatusInternalServerError) + return + } + defer dst.Close() + + size, err := io.Copy(dst, file) + if err != nil { + jsonError(w, "could not write file", http.StatusInternalServerError) + return + } + + ct := header.Header.Get("Content-Type") + if ct == "" { + ct = "application/octet-stream" + } + + // Store path relative to artifactRoot for portability. + relPath := fmt.Sprintf("%d/%s", runID, name) + + artifact := &models.Artifact{ + RunID: runID, + RepoID: repoID, + Name: name, + StoragePath: relPath, + Size: size, + ContentType: ct, + } + if _, err := h.db.Insert(artifact); err != nil { + jsonError(w, "could not record artifact", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusCreated) + jsonOK(w, artifact) +} + +// Download streams the artifact file to the client. +func (h *ArtifactHandler) Download(w http.ResponseWriter, r *http.Request) { + artifactID, err := strconv.ParseInt(chi.URLParam(r, "artifactID"), 10, 64) + if err != nil { + jsonError(w, "invalid artifact ID", http.StatusBadRequest) + return + } + var artifact models.Artifact + if found, _ := h.db.ID(artifactID).Get(&artifact); !found { + jsonError(w, "artifact not found", http.StatusNotFound) + return + } + + fullPath := filepath.Join(h.artifactRoot, filepath.FromSlash(artifact.StoragePath)) + // Ensure the resolved path stays within artifactRoot (traversal guard). + if !isUnder(h.artifactRoot, fullPath) { + jsonError(w, "forbidden", http.StatusForbidden) + return + } + + f, err := os.Open(fullPath) + if err != nil { + jsonError(w, "artifact file not found", http.StatusNotFound) + return + } + defer f.Close() + + ct := artifact.ContentType + if ct == "" { + ct = "application/octet-stream" + } + w.Header().Set("Content-Type", ct) + w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename=%q`, artifact.Name)) + w.Header().Set("Content-Length", strconv.FormatInt(artifact.Size, 10)) + io.Copy(w, f) //nolint:errcheck +} + +func (h *ArtifactHandler) resolveRunIDs(w http.ResponseWriter, r *http.Request) (repoID, runID int64, ok bool) { + owner := chi.URLParam(r, "owner") + repoName := chi.URLParam(r, "repo") + var u models.User + if found, _ := h.db.Where("username = ?", owner).Get(&u); !found { + jsonError(w, "repository not found", http.StatusNotFound) + return 0, 0, false + } + var repo models.Repository + if found, _ := h.db.Where("owner_id = ? AND name = ?", u.ID, repoName).Get(&repo); !found { + jsonError(w, "repository not found", http.StatusNotFound) + return 0, 0, false + } + runID, err := strconv.ParseInt(chi.URLParam(r, "runID"), 10, 64) + if err != nil { + jsonError(w, "invalid run ID", http.StatusBadRequest) + return 0, 0, false + } + return repo.ID, runID, true +} + +func isUnder(root, path string) bool { + root = filepath.Clean(root) + path = filepath.Clean(path) + if len(path) <= len(root) { + return false + } + return path[:len(root)] == root && path[len(root)] == filepath.Separator +} diff --git a/internal/api/handlers/githttp.go b/internal/api/handlers/githttp.go index 7c671e1..bee41de 100644 --- a/internal/api/handlers/githttp.go +++ b/internal/api/handlers/githttp.go @@ -11,22 +11,32 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/go-chi/chi/v5" "golang.org/x/crypto/bcrypt" "xorm.io/xorm" "github.com/forgeo/forgebucket/internal/config" + "github.com/forgeo/forgebucket/internal/events" "github.com/forgeo/forgebucket/internal/models" ) type GitHTTPHandler struct { db *xorm.Engine cfg *config.Config + bus events.EventBus } -func NewGitHTTPHandler(db *xorm.Engine, cfg *config.Config) *GitHTTPHandler { - return &GitHTTPHandler{db: db, cfg: cfg} +func NewGitHTTPHandler(db *xorm.Engine, cfg *config.Config, bus events.EventBus) *GitHTTPHandler { + return &GitHTTPHandler{db: db, cfg: cfg, bus: bus} +} + +// refUpdate captures one ref-update line from a git-receive-pack request. +type refUpdate struct { + OldRev string + NewRev string + Ref string } // ServeGit is the entry point for all git smart-HTTP requests. @@ -107,13 +117,15 @@ func (h *GitHTTPHandler) ServeGit(w http.ResponseWriter, r *http.Request) { // Branch protection check: parse pkt-lines from the receive-pack body, // check each ref against stored protection rules, then restore the body. + var pushedRefs []refUpdate if service == "git-receive-pack" { - if reason, newBody := checkProtectionsFromBody(h.db, repo.ID, authedUser, r.Body); reason != "" { + reason, refs, newBody := parseAndCheckBody(h.db, repo.ID, authedUser, r.Body) + if reason != "" { http.Error(w, reason, http.StatusForbidden) return - } else { - r.Body = io.NopCloser(newBody) } + pushedRefs = refs + r.Body = io.NopCloser(newBody) } // Build PATH_INFO: /{reponame}.git/{suffix} @@ -157,6 +169,27 @@ func (h *GitHTTPHandler) ServeGit(w http.ResponseWriter, r *http.Request) { if err := runGitBackend(r.Context(), w, r.Body, gitExec, env); err != nil { http.Error(w, fmt.Sprintf("git http-backend: %v", err), http.StatusInternalServerError) + return + } + + // Publish push.received for each ref pushed so the CI orchestrator can react. + if service == "git-receive-pack" { + zeroOID := strings.Repeat("0", 40) + for _, ref := range pushedRefs { + if ref.NewRev == zeroOID { + continue // branch deletion — skip CI trigger + } + go h.bus.Publish(events.SubjectPushReceived, events.PushEvent{ //nolint:errcheck + RepoID: repo.ID, + RepoName: repoName, + OwnerName: owner, + Ref: ref.Ref, + Before: ref.OldRev, + After: ref.NewRev, + Pusher: authedUser, + At: time.Now().UTC(), + }) + } } } @@ -239,15 +272,14 @@ func runGitBackend(ctx context.Context, w http.ResponseWriter, body io.Reader, g return waitErr } -// checkProtectionsFromBody parses git pkt-line ref updates from a receive-pack body, -// checks each ref against stored branch protection rules, and returns a denial reason -// (or "") plus a restored reader so the body can still be passed to http-backend. -func checkProtectionsFromBody(db *xorm.Engine, repoID int64, pusher string, body io.Reader) (reason string, restored io.Reader) { +// parseAndCheckBody parses git pkt-line ref updates from a receive-pack body, +// checks each ref against stored branch protection rules, and returns a denial +// reason (or ""), the list of parsed ref updates, and a restored reader. +func parseAndCheckBody(db *xorm.Engine, repoID int64, pusher string, body io.Reader) (reason string, refs []refUpdate, restored io.Reader) { var buf bytes.Buffer zeroOID := strings.Repeat("0", 40) for { - // Every pkt-line starts with a 4-hex-digit length that includes itself. lenBuf := make([]byte, 4) if _, err := io.ReadFull(body, lenBuf); err != nil { break @@ -259,8 +291,7 @@ func checkProtectionsFromBody(db *xorm.Engine, repoID int64, pusher string, body break } if pktLen64 == 0 { - // Flush packet — end of ref-update list. - break + break // flush packet — end of ref-update list } dataLen := int(pktLen64) - 4 if dataLen <= 0 { @@ -280,16 +311,15 @@ func checkProtectionsFromBody(db *xorm.Engine, repoID int64, pusher string, body } oldRev, newRev, refname := parts[0], parts[1], parts[2] - // New branches (oldRev all zeros) are not subject to protection. - if oldRev == zeroOID { - continue - } - // Detect force push: if newRev is all zeros it's a branch deletion. - isForcePush := newRev == zeroOID + refs = append(refs, refUpdate{OldRev: oldRev, NewRev: newRev, Ref: refname}) + if oldRev == zeroOID { + continue // new branch — not subject to protection + } + isForcePush := newRev == zeroOID if msg := CheckBranchProtection(db, repoID, pusher, refname, isForcePush); msg != "" { - return msg, io.MultiReader(&buf, body) + return msg, refs, io.MultiReader(&buf, body) } } - return "", io.MultiReader(&buf, body) + return "", refs, io.MultiReader(&buf, body) } diff --git a/internal/api/handlers/pipelines.go b/internal/api/handlers/pipelines.go index 84713bd..6677e5d 100644 --- a/internal/api/handlers/pipelines.go +++ b/internal/api/handlers/pipelines.go @@ -2,6 +2,8 @@ package handlers import ( "net/http" + "strconv" + "time" "github.com/go-chi/chi/v5" "xorm.io/xorm" @@ -17,37 +19,209 @@ func NewPipelineHandler(db *xorm.Engine) *PipelineHandler { return &PipelineHandler{db: db} } -func (h *PipelineHandler) List(w http.ResponseWriter, r *http.Request) { - ownerName := chi.URLParam(r, "owner") - repoName := chi.URLParam(r, "repo") - - repoID, ok := h.repoID(w, ownerName, repoName) +// ListPipelines returns all pipeline definitions for a repository. +func (h *PipelineHandler) ListPipelines(w http.ResponseWriter, r *http.Request) { + repoID, ok := h.repoID(w, r) if !ok { return } - _ = repoID - // Pipeline records will be added in Phase 3 (CI integration). - // Return empty list so the client doesn't break. - jsonOK(w, []any{}) + var pipelines []models.Pipeline + if err := h.db.Where("repo_id = ?", repoID).Find(&pipelines); err != nil { + jsonError(w, "could not list pipelines", http.StatusInternalServerError) + return + } + if pipelines == nil { + pipelines = []models.Pipeline{} + } + jsonOK(w, pipelines) } -func (h *PipelineHandler) Get(w http.ResponseWriter, r *http.Request) { - jsonError(w, "not implemented", http.StatusNotImplemented) +// ListRuns returns pipeline runs for a repository, most recent first. +func (h *PipelineHandler) ListRuns(w http.ResponseWriter, r *http.Request) { + repoID, ok := h.repoID(w, r) + if !ok { + return + } + limit := 30 + if l, err := strconv.Atoi(r.URL.Query().Get("limit")); err == nil && l > 0 && l <= 100 { + limit = l + } + var runs []models.PipelineRun + if err := h.db.Where("repo_id = ?", repoID).Desc("id").Limit(limit).Find(&runs); err != nil { + jsonError(w, "could not list runs", http.StatusInternalServerError) + return + } + if runs == nil { + runs = []models.PipelineRun{} + } + jsonOK(w, runs) } -func (h *PipelineHandler) repoID(w http.ResponseWriter, ownerName, repoName string) (int64, bool) { - var owner models.User - found, err := h.db.Where("username = ?", ownerName).Get(&owner) - if err != nil || !found { +type runDetailResponse struct { + models.PipelineRun + Jobs []jobDetailResponse `json:"jobs"` +} + +type jobDetailResponse struct { + models.PipelineJob + Steps []models.PipelineStep `json:"steps"` +} + +// GetRun returns a run with its full job + step tree. +func (h *PipelineHandler) GetRun(w http.ResponseWriter, r *http.Request) { + run, ok := h.lookupRun(w, r) + if !ok { + return + } + var jobs []models.PipelineJob + h.db.Where("run_id = ?", run.ID).Asc("id").Find(&jobs) + + jobDetails := make([]jobDetailResponse, len(jobs)) + for i, job := range jobs { + var steps []models.PipelineStep + h.db.Where("job_id = ?", job.ID).Asc("seq").Find(&steps) + if steps == nil { + steps = []models.PipelineStep{} + } + jobDetails[i] = jobDetailResponse{PipelineJob: job, Steps: steps} + } + jsonOK(w, runDetailResponse{PipelineRun: *run, Jobs: jobDetails}) +} + +// GetJobLogs returns all log chunks for a job, ordered by step seq and chunk index. +func (h *PipelineHandler) GetJobLogs(w http.ResponseWriter, r *http.Request) { + _, ok := h.lookupRun(w, r) + if !ok { + return + } + jobID, err := strconv.ParseInt(chi.URLParam(r, "jobID"), 10, 64) + if err != nil { + jsonError(w, "invalid job ID", http.StatusBadRequest) + return + } + + // Verify job belongs to this run. + var job models.PipelineJob + runID, _ := strconv.ParseInt(chi.URLParam(r, "runID"), 10, 64) + if found, _ := h.db.Where("id = ? AND run_id = ?", jobID, runID).Get(&job); !found { + jsonError(w, "job not found", http.StatusNotFound) + return + } + + var steps []models.PipelineStep + h.db.Where("job_id = ?", jobID).Asc("seq").Find(&steps) + + type stepLogs struct { + models.PipelineStep + Logs []models.PipelineStepLog `json:"logs"` + } + result := make([]stepLogs, len(steps)) + for i, step := range steps { + var logs []models.PipelineStepLog + h.db.Where("step_id = ?", step.ID).Asc("chunk_index").Find(&logs) + if logs == nil { + logs = []models.PipelineStepLog{} + } + result[i] = stepLogs{PipelineStep: step, Logs: logs} + } + jsonOK(w, result) +} + +// CancelRun marks a queued or running run as cancelled. +func (h *PipelineHandler) CancelRun(w http.ResponseWriter, r *http.Request) { + run, ok := h.lookupRun(w, r) + if !ok { + return + } + if run.Status != "queued" && run.Status != "running" { + jsonError(w, "run is not cancellable in its current state", http.StatusConflict) + return + } + now := time.Now().UTC() + run.Status = "cancelled" + run.FinishedAt = &now + if _, err := h.db.ID(run.ID).Cols("status", "finished_at").Update(run); err != nil { + jsonError(w, "could not cancel run", http.StatusInternalServerError) + return + } + // Cancel any queued jobs. + h.db.Where("run_id = ? AND status = 'queued'", run.ID). //nolint:errcheck + Cols("status").Update(&models.PipelineJob{Status: "cancelled"}) + jsonOK(w, run) +} + +// RetryJob re-queues a failed job by resetting its status and re-publishing job.queued. +func (h *PipelineHandler) RetryJob(w http.ResponseWriter, r *http.Request) { + run, ok := h.lookupRun(w, r) + if !ok { + return + } + jobID, err := strconv.ParseInt(chi.URLParam(r, "jobID"), 10, 64) + if err != nil { + jsonError(w, "invalid job ID", http.StatusBadRequest) + return + } + var job models.PipelineJob + if found, _ := h.db.Where("id = ? AND run_id = ?", jobID, run.ID).Get(&job); !found { + jsonError(w, "job not found", http.StatusNotFound) + return + } + if job.Status != "failed" && job.Status != "cancelled" { + jsonError(w, "only failed or cancelled jobs can be retried", http.StatusConflict) + return + } + + job.Status = "queued" + job.StartedAt = nil + job.FinishedAt = nil + h.db.ID(job.ID).Cols("status", "started_at", "finished_at").Update(&job) //nolint:errcheck + + // Reset step statuses. + h.db.Where("job_id = ?", job.ID).Cols("status", "exit_code", "started_at", "finished_at"). //nolint:errcheck + Update(&models.PipelineStep{Status: "queued"}) + + // Also reset the run status if it was failed/cancelled. + if run.Status == "failed" || run.Status == "cancelled" { + run.Status = "running" + run.FinishedAt = nil + h.db.ID(run.ID).Cols("status", "finished_at").Update(run) //nolint:errcheck + } + + jsonOK(w, job) +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +func (h *PipelineHandler) repoID(w http.ResponseWriter, r *http.Request) (int64, bool) { + owner := chi.URLParam(r, "owner") + repoName := chi.URLParam(r, "repo") + var u models.User + if found, _ := h.db.Where("username = ?", owner).Get(&u); !found { jsonError(w, "repository not found", http.StatusNotFound) return 0, false } - var repo models.Repository - found, err = h.db.Where("owner_id = ? AND name = ?", owner.ID, repoName).Get(&repo) - if err != nil || !found { + if found, _ := h.db.Where("owner_id = ? AND name = ?", u.ID, repoName).Get(&repo); !found { jsonError(w, "repository not found", http.StatusNotFound) return 0, false } return repo.ID, true } + +func (h *PipelineHandler) lookupRun(w http.ResponseWriter, r *http.Request) (*models.PipelineRun, bool) { + repoID, ok := h.repoID(w, r) + if !ok { + return nil, false + } + runID, err := strconv.ParseInt(chi.URLParam(r, "runID"), 10, 64) + if err != nil { + jsonError(w, "invalid run ID", http.StatusBadRequest) + return nil, false + } + var run models.PipelineRun + if found, _ := h.db.Where("id = ? AND repo_id = ?", runID, repoID).Get(&run); !found { + jsonError(w, "run not found", http.StatusNotFound) + return nil, false + } + return &run, true +} diff --git a/internal/api/handlers/runners.go b/internal/api/handlers/runners.go new file mode 100644 index 0000000..bfd9634 --- /dev/null +++ b/internal/api/handlers/runners.go @@ -0,0 +1,94 @@ +package handlers + +import ( + "crypto/rand" + "encoding/base64" + "encoding/json" + "net/http" + + "golang.org/x/crypto/bcrypt" + "xorm.io/xorm" + + "github.com/forgeo/forgebucket/internal/api/middleware" + "github.com/forgeo/forgebucket/internal/models" +) + +type RunnerHandler struct{ db *xorm.Engine } + +func NewRunnerHandler(db *xorm.Engine) *RunnerHandler { return &RunnerHandler{db: db} } + +// List returns all registered runners. Admin-only. +func (h *RunnerHandler) List(w http.ResponseWriter, r *http.Request) { + if !isAdmin(r) { + jsonError(w, "admin access required", http.StatusForbidden) + return + } + var runners []models.Runner + if err := h.db.Find(&runners); err != nil { + jsonError(w, "could not list runners", http.StatusInternalServerError) + return + } + if runners == nil { + runners = []models.Runner{} + } + jsonOK(w, runners) +} + +// Register creates a new runner record and returns the plaintext registration token +// (shown once; the server stores only the bcrypt hash). +func (h *RunnerHandler) Register(w http.ResponseWriter, r *http.Request) { + if !isAdmin(r) { + jsonError(w, "admin access required", http.StatusForbidden) + return + } + + var body struct { + Name string `json:"name"` + Labels []string `json:"labels"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + jsonError(w, "invalid request body", http.StatusBadRequest) + return + } + if body.Name == "" { + jsonError(w, "name is required", http.StatusBadRequest) + return + } + + raw := make([]byte, 32) + if _, err := rand.Read(raw); err != nil { + jsonError(w, "could not generate token", http.StatusInternalServerError) + return + } + token := base64.RawURLEncoding.EncodeToString(raw) + + hash, err := bcrypt.GenerateFromPassword([]byte(token), bcrypt.DefaultCost) + if err != nil { + jsonError(w, "could not hash token", http.StatusInternalServerError) + return + } + + labelsJSON, _ := json.Marshal(body.Labels) + runner := &models.Runner{ + Name: body.Name, + Labels: string(labelsJSON), + Status: "idle", + TokenHash: string(hash), + } + if _, err := h.db.Insert(runner); err != nil { + jsonError(w, "runner name already taken", http.StatusConflict) + return + } + + w.WriteHeader(http.StatusCreated) + jsonOK(w, map[string]any{ + "id": runner.ID, + "name": runner.Name, + "token": token, // shown once — store it securely + }) +} + +func isAdmin(r *http.Request) bool { + v, _ := r.Context().Value(middleware.ContextKeyIsAdmin).(bool) + return v +} diff --git a/internal/api/router.go b/internal/api/router.go index 72cdc0b..fa56bee 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -20,7 +20,7 @@ import ( "github.com/forgeo/forgebucket/internal/events" ) -func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, bus events.EventBus, staticFiles fs.FS) http.Handler { +func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, bus events.EventBus, artifactRoot string, staticFiles fs.FS) http.Handler { r := chi.NewRouter() r.Use(chimiddleware.Logger) @@ -43,7 +43,7 @@ func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, bus even prH := handlers.NewPRHandler(engine) pipeH := handlers.NewPipelineHandler(engine) wsH := handlers.NewWSHandler(bus) - gitH := handlers.NewGitHTTPHandler(engine, cfg) + gitH := handlers.NewGitHTTPHandler(engine, cfg, bus) issueH := handlers.NewIssueHandler(engine) sshKeyH := handlers.NewSSHKeyHandler(engine) memberH := handlers.NewMemberHandler(engine) @@ -56,6 +56,8 @@ func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, bus even exploreH := handlers.NewExploreHandler(engine) dashH := handlers.NewDashboardHandler(engine) auditH := handlers.NewAuditHandler(engine) + artifactH := handlers.NewArtifactHandler(engine, artifactRoot) + runnerH := handlers.NewRunnerHandler(engine) // ── Git smart-HTTP transport ─────────────────────────────────────────────── // Regex constraint ensures only *.git paths match, so asset/SPA URLs @@ -103,6 +105,11 @@ func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, bus even r.Get("/dashboard", dashH.Get) r.Get("/audit", auditH.List) + r.Route("/admin", func(r chi.Router) { + r.Get("/runners", runnerH.List) + r.With(csrf).Post("/runners/register", runnerH.Register) + }) + // SSH key management r.Get("/user/keys", sshKeyH.List) r.With(csrf).Post("/user/keys", sshKeyH.Add) @@ -140,10 +147,21 @@ func New(cfg *config.Config, engine *xorm.Engine, store sessions.Store, bus even r.With(csrf).Post("/{issueNum}/close", issueH.Close) r.With(csrf).Post("/{issueNum}/reopen", issueH.Reopen) }) - r.Route("/pipelines", func(r chi.Router) { - r.Get("/", pipeH.List) - r.Get("/{runID}", pipeH.Get) + r.Get("/pipelines", pipeH.ListPipelines) + r.Route("/runs", func(r chi.Router) { + r.Get("/", pipeH.ListRuns) + r.Route("/{runID}", func(r chi.Router) { + r.Get("/", pipeH.GetRun) + r.With(csrf).Post("/cancel", pipeH.CancelRun) + r.Route("/jobs/{jobID}", func(r chi.Router) { + r.Get("/logs", pipeH.GetJobLogs) + r.With(csrf).Post("/retry", pipeH.RetryJob) + }) + r.Get("/artifacts", artifactH.List) + r.With(csrf).Post("/artifacts", artifactH.Upload) + }) }) + r.Get("/artifacts/{artifactID}/download", artifactH.Download) r.Route("/members", func(r chi.Router) { r.Get("/", memberH.List) r.With(csrf).Post("/", memberH.Add) diff --git a/internal/config/config.go b/internal/config/config.go index 945b9f1..313b9a5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,6 +3,7 @@ package config import ( "fmt" "os" + "path/filepath" "strconv" ) @@ -14,7 +15,8 @@ type Config struct { DatabaseURL string // Storage - RepoRoot string + RepoRoot string + ArtifactRoot string // Security SessionSecret string // must be 32 or 64 bytes for AES-GCM @@ -37,10 +39,12 @@ type Config struct { } func Load() (*Config, error) { + repoRoot := getEnv("REPO_ROOT", "/var/lib/forgebucket/repos") cfg := &Config{ - Port: getEnv("PORT", "8080"), - RepoRoot: getEnv("REPO_ROOT", "/var/lib/forgebucket/repos"), - Debug: getEnvBool("DEBUG", false), + Port: getEnv("PORT", "8080"), + RepoRoot: repoRoot, + ArtifactRoot: getEnv("ARTIFACT_ROOT", filepath.Join(filepath.Dir(repoRoot), "artifacts")), + Debug: getEnvBool("DEBUG", false), NATSUrl: getEnv("NATS_URL", ""), InstanceURL: getEnv("INSTANCE_URL", ""), diff --git a/internal/domain/ci/dag.go b/internal/domain/ci/dag.go new file mode 100644 index 0000000..a8a70a0 --- /dev/null +++ b/internal/domain/ci/dag.go @@ -0,0 +1,78 @@ +package ci + +import "fmt" + +// dagNode holds a job name and its resolved dependencies. +type dagNode struct { + name string + needs []string +} + +// TopoSort returns the job names in a valid topological execution order. +// Returns an error if the dependency graph has cycles or references unknown jobs. +func TopoSort(jobs map[string]WorkflowJob) ([]string, error) { + nodes := make(map[string]*dagNode, len(jobs)) + for name, job := range jobs { + nodes[name] = &dagNode{name: name, needs: []string(job.Needs)} + } + // Validate all dependencies exist. + for _, node := range nodes { + for _, dep := range node.needs { + if _, ok := nodes[dep]; !ok { + return nil, fmt.Errorf("job %q depends on unknown job %q", node.name, dep) + } + } + } + + var order []string + visited := make(map[string]bool, len(nodes)) + inStack := make(map[string]bool, len(nodes)) + + var visit func(name string) error + visit = func(name string) error { + if inStack[name] { + return fmt.Errorf("cycle detected at job %q", name) + } + if visited[name] { + return nil + } + inStack[name] = true + for _, dep := range nodes[name].needs { + if err := visit(dep); err != nil { + return err + } + } + inStack[name] = false + visited[name] = true + order = append(order, name) + return nil + } + + for name := range nodes { + if err := visit(name); err != nil { + return nil, err + } + } + return order, nil +} + +// ReadyJobs returns the names of jobs whose dependencies are all in completedJobs. +func ReadyJobs(jobs map[string]WorkflowJob, completedJobs map[string]bool, enqueuedJobs map[string]bool) []string { + var ready []string + for name, job := range jobs { + if enqueuedJobs[name] { + continue + } + allDone := true + for _, dep := range job.Needs { + if !completedJobs[dep] { + allDone = false + break + } + } + if allDone { + ready = append(ready, name) + } + } + return ready +} diff --git a/internal/domain/ci/executor.go b/internal/domain/ci/executor.go new file mode 100644 index 0000000..4c17a06 --- /dev/null +++ b/internal/domain/ci/executor.go @@ -0,0 +1,271 @@ +package ci + +import ( + "bufio" + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "xorm.io/xorm" + + "github.com/forgeo/forgebucket/internal/events" + "github.com/forgeo/forgebucket/internal/models" +) + +// JobContext holds everything needed to execute a single pipeline job. +type JobContext struct { + Run models.PipelineRun + Job models.PipelineJob + Steps []models.PipelineStep + Repo models.Repository +} + +// ExecuteJob runs all steps of a job inside isolated Docker containers, +// streams log output to NATS and the DB, then publishes job.completed or job.failed. +func ExecuteJob(ctx context.Context, db *xorm.Engine, bus events.EventBus, jc JobContext, workspaceRoot string) { + now := time.Now().UTC() + jc.Job.Status = "running" + jc.Job.StartedAt = &now + db.ID(jc.Job.ID).Cols("status", "started_at").Update(&jc.Job) //nolint:errcheck + + // Extract repo snapshot into a workspace directory. + workDir, err := extractWorkspace(jc.Repo.DiskPath, jc.Run.TriggerSHA, workspaceRoot, jc.Run.ID) + if err != nil { + failJob(db, bus, jc, fmt.Sprintf("workspace setup failed: %v", err)) + return + } + defer os.RemoveAll(workDir) + + image := jc.Job.Image + if image == "" { + image = "ubuntu:22.04" + } + + // Pull image once per job (non-fatal if pull fails and image exists locally). + pullCmd := exec.CommandContext(ctx, "docker", "pull", image) + pullCmd.Run() //nolint:errcheck + + for i := range jc.Steps { + step := &jc.Steps[i] + if step.UsesAction == "checkout" { + // Built-in checkout: workspace is already set up by extractWorkspace. + markStep(db, step, "succeeded", 0) + continue + } + if step.RunCmd == "" { + markStep(db, step, "skipped", 0) + continue + } + exitCode, err := runStep(ctx, db, bus, jc.Run.ID, jc.Job.ID, step, image, workDir) + if err != nil || exitCode != 0 { + if exitCode == 0 { + exitCode = 1 + } + markStep(db, step, "failed", exitCode) + failJob(db, bus, jc, fmt.Sprintf("step %q exited %d", step.Name, exitCode)) + return + } + markStep(db, step, "succeeded", 0) + } + + fin := time.Now().UTC() + jc.Job.Status = "succeeded" + jc.Job.FinishedAt = &fin + db.ID(jc.Job.ID).Cols("status", "finished_at").Update(&jc.Job) //nolint:errcheck + bus.Publish(events.SubjectJobCompleted, events.JobEvent{ //nolint:errcheck + RunID: jc.Run.ID, JobID: jc.Job.ID, Status: "succeeded", At: fin, + }) +} + +// runStep runs a single shell-command step inside a Docker container. +func runStep(ctx context.Context, db *xorm.Engine, bus events.EventBus, + runID, jobID int64, step *models.PipelineStep, image, workDir string) (int, error) { + + now := time.Now().UTC() + step.Status = "running" + step.StartedAt = &now + db.ID(step.ID).Cols("status", "started_at").Update(step) //nolint:errcheck + + cmd := exec.CommandContext(ctx, "docker", "run", "--rm", + "-v", workDir+":/workspace", + "-w", "/workspace", + "--network=none", // no network by default; Phase 2C will add network scopes + image, + "/bin/sh", "-ec", step.RunCmd, + ) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return 1, err + } + cmd.Stderr = cmd.Stdout // merge stderr into stdout + + if err := cmd.Start(); err != nil { + return 1, fmt.Errorf("docker run: %w", err) + } + + chunk := 0 + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + line := scanner.Text() + "\n" + writeLogChunk(db, bus, runID, jobID, step.ID, chunk, line) + chunk++ + } + + exitCode := 0 + if err := cmd.Wait(); err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + exitCode = exitErr.ExitCode() + } else { + exitCode = 1 + } + } + return exitCode, nil +} + +// extractWorkspace uses git-archive to export the repo at a given SHA into a +// temporary directory under workspaceRoot. +func extractWorkspace(repoPath, sha, workspaceRoot string, runID int64) (string, error) { + dir := filepath.Join(workspaceRoot, fmt.Sprintf("run-%d", runID)) + if err := os.MkdirAll(dir, 0755); err != nil { + return "", err + } + + archive := exec.Command("git", "archive", "--format=tar", sha) + archive.Dir = repoPath + archive.Env = []string{"GIT_TERMINAL_PROMPT=0", "HOME=/tmp"} + + tar := exec.Command("tar", "-x", "-C", dir) + tar.Stdin, _ = archive.StdoutPipe() + + if err := archive.Start(); err != nil { + os.RemoveAll(dir) + return "", fmt.Errorf("git archive: %w", err) + } + if err := tar.Start(); err != nil { + archive.Process.Kill() //nolint:errcheck + os.RemoveAll(dir) + return "", fmt.Errorf("tar: %w", err) + } + + archiveErr := archive.Wait() + tarErr := tar.Wait() + if archiveErr != nil { + os.RemoveAll(dir) + return "", fmt.Errorf("git archive wait: %w", archiveErr) + } + if tarErr != nil { + os.RemoveAll(dir) + return "", fmt.Errorf("tar wait: %w", tarErr) + } + return dir, nil +} + +func writeLogChunk(db *xorm.Engine, bus events.EventBus, runID, jobID, stepID int64, idx int, content string) { + entry := &models.PipelineStepLog{ + StepID: stepID, + ChunkIndex: idx, + Content: content, + } + db.Insert(entry) //nolint:errcheck + bus.Publish(events.SubjectPipelineLog, events.LogChunkEvent{ //nolint:errcheck + RunID: runID, JobID: jobID, StepID: stepID, ChunkIndex: idx, Content: content, + }) +} + +func markStep(db *xorm.Engine, step *models.PipelineStep, status string, exitCode int) { + now := time.Now().UTC() + step.Status = status + step.ExitCode = exitCode + step.FinishedAt = &now + db.ID(step.ID).Cols("status", "exit_code", "finished_at").Update(step) //nolint:errcheck +} + +func failJob(db *xorm.Engine, bus events.EventBus, jc JobContext, reason string) { + now := time.Now().UTC() + jc.Job.Status = "failed" + jc.Job.FinishedAt = &now + db.ID(jc.Job.ID).Cols("status", "finished_at").Update(&jc.Job) //nolint:errcheck + + // Write the failure reason as a synthetic log line. + var lastStep models.PipelineStep + if found, _ := db.Where("job_id = ?", jc.Job.ID).Desc("seq").Get(&lastStep); found { + writeLogChunk(db, bus, jc.Run.ID, jc.Job.ID, lastStep.ID, 0, + "\n[ForgeBucket] Job failed: "+reason+"\n") + } + + bus.Publish(events.SubjectJobFailed, events.JobEvent{ //nolint:errcheck + RunID: jc.Run.ID, JobID: jc.Job.ID, Status: "failed", At: now, + }) +} + +// workspaceDir returns the scratch directory root for CI job workspaces. +func workspaceDir(artifactRoot string) string { + return filepath.Join(filepath.Dir(artifactRoot), "ci-workspaces") +} + +// IsDockerAvailable checks whether the docker CLI is reachable. +func IsDockerAvailable() bool { + cmd := exec.Command("docker", "info") + cmd.Env = []string{"HOME=/tmp"} + return cmd.Run() == nil +} + +// stepsForJob loads PipelineStep rows for a job ordered by seq. +func stepsForJob(db *xorm.Engine, jobID int64) ([]models.PipelineStep, error) { + var steps []models.PipelineStep + err := db.Where("job_id = ?", jobID).Asc("seq").Find(&steps) + return steps, err +} + +// repoForRun loads the Repository for a given run. +func repoForRun(db *xorm.Engine, runID int64) (models.Repository, models.PipelineRun, bool) { + var run models.PipelineRun + if found, _ := db.ID(runID).Get(&run); !found { + return models.Repository{}, run, false + } + var repo models.Repository + if found, _ := db.ID(run.RepoID).Get(&repo); !found { + return models.Repository{}, run, false + } + return repo, run, true +} + +// buildJobContext assembles a JobContext from DB rows. +func buildJobContext(db *xorm.Engine, jobID int64) (JobContext, bool) { + var job models.PipelineJob + if found, _ := db.ID(jobID).Get(&job); !found { + return JobContext{}, false + } + repo, run, ok := repoForRun(db, job.RunID) + if !ok { + return JobContext{}, false + } + steps, err := stepsForJob(db, jobID) + if err != nil { + return JobContext{}, false + } + return JobContext{Run: run, Job: job, Steps: steps, Repo: repo}, true +} + +// pipeForRun returns the longest-matching step label for an image. +// Phase 2B: unused placeholder for future label matching. +func pipeForRun(_ string) string { return "" } + +// sanitizeImage prevents injection in docker image names. +func sanitizeImage(image string) string { + // Allow only characters valid in Docker image references. + var b strings.Builder + for _, c := range image { + if (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || + (c >= '0' && c <= '9') || c == '.' || c == '-' || c == '_' || + c == '/' || c == ':' || c == '@' { + b.WriteRune(c) + } + } + return b.String() +} diff --git a/internal/domain/ci/orchestrator.go b/internal/domain/ci/orchestrator.go new file mode 100644 index 0000000..24505c3 --- /dev/null +++ b/internal/domain/ci/orchestrator.go @@ -0,0 +1,292 @@ +package ci + +import ( + "context" + "encoding/json" + "log" + "strings" + "time" + + "xorm.io/xorm" + + "github.com/forgeo/forgebucket/internal/events" + "github.com/forgeo/forgebucket/internal/models" +) + +// Orchestrator listens for push events, creates pipeline run records, and +// advances the DAG as jobs complete. It does NOT execute jobs directly — +// that is the RunnerManager's responsibility. +type Orchestrator struct { + db *xorm.Engine + bus events.EventBus +} + +func NewOrchestrator(db *xorm.Engine, bus events.EventBus) *Orchestrator { + return &Orchestrator{db: db, bus: bus} +} + +// Start subscribes to relevant NATS subjects and blocks until ctx is cancelled. +func (o *Orchestrator) Start(ctx context.Context) { + o.recoverStaleRuns() + + unsub1, err := o.bus.Subscribe(events.SubjectPushReceived, func(_ string, data []byte) { + var evt events.PushEvent + if err := json.Unmarshal(data, &evt); err != nil { + log.Printf("orchestrator: bad push event: %v", err) + return + } + go o.handlePush(evt) + }) + if err != nil { + log.Printf("orchestrator: subscribe push.received: %v", err) + } else { + defer unsub1() + } + + unsub2, err := o.bus.Subscribe(events.SubjectJobCompleted, func(_ string, data []byte) { + var evt events.JobEvent + if err := json.Unmarshal(data, &evt); err != nil { + return + } + go o.advanceDAG(evt.RunID, evt.JobID, "succeeded") + }) + if err != nil { + log.Printf("orchestrator: subscribe job.completed: %v", err) + } else { + defer unsub2() + } + + unsub3, err := o.bus.Subscribe(events.SubjectJobFailed, func(_ string, data []byte) { + var evt events.JobEvent + if err := json.Unmarshal(data, &evt); err != nil { + return + } + go o.advanceDAG(evt.RunID, evt.JobID, "failed") + }) + if err != nil { + log.Printf("orchestrator: subscribe job.failed: %v", err) + } else { + defer unsub3() + } + + <-ctx.Done() +} + +// handlePush is called for every successful git push. It finds matching workflow +// files, creates run records, and enqueues the first wave of jobs. +func (o *Orchestrator) handlePush(evt events.PushEvent) { + // Ignore branch deletions (new SHA = all zeros). + if evt.After == "" || strings.Repeat("0", len(evt.After)) == evt.After { + return + } + + var repo models.Repository + if found, _ := o.db.ID(evt.RepoID).Get(&repo); !found { + return + } + + workflowPaths, err := ListWorkflows(repo.DiskPath, evt.After) + if err != nil || len(workflowPaths) == 0 { + return + } + + for _, path := range workflowPaths { + wf, err := ParseWorkflow(repo.DiskPath, evt.After, path) + if err != nil { + log.Printf("orchestrator: parse workflow %s: %v", path, err) + continue + } + if !MatchesPushTrigger(wf, evt.Ref) { + continue + } + if err := o.createRun(repo, evt, wf, path); err != nil { + log.Printf("orchestrator: create run for %s: %v", path, err) + } + } +} + +func (o *Orchestrator) createRun(repo models.Repository, evt events.PushEvent, wf *WorkflowFile, filePath string) error { + // Upsert the Pipeline definition record. + pipeline := &models.Pipeline{RepoID: repo.ID, FilePath: filePath} + has, _ := o.db.Where("repo_id = ? AND file_path = ?", repo.ID, filePath).Get(pipeline) + pipeline.Name = wf.Name + if pipeline.Name == "" { + pipeline.Name = filePath + } + if has { + o.db.ID(pipeline.ID).Cols("name").Update(pipeline) //nolint:errcheck + } else { + if _, err := o.db.Insert(pipeline); err != nil { + return err + } + } + + // Validate DAG before writing anything. + if _, err := TopoSort(wf.Jobs); err != nil { + return err + } + + now := time.Now().UTC() + run := &models.PipelineRun{ + PipelineID: pipeline.ID, + RepoID: repo.ID, + TriggerRef: evt.Ref, + TriggerSHA: evt.After, + TriggeredBy: evt.Pusher, + Status: "queued", + StartedAt: &now, + } + if _, err := o.db.Insert(run); err != nil { + return err + } + + // Create job + step records for every job in the workflow. + for jobName, wfJob := range wf.Jobs { + needsJSON, _ := json.Marshal([]string(wfJob.Needs)) + job := &models.PipelineJob{ + RunID: run.ID, + Name: jobName, + Image: wfJob.RunsOn, + Needs: string(needsJSON), + Status: "queued", + } + if _, err := o.db.Insert(job); err != nil { + return err + } + for seq, step := range wfJob.Steps { + s := &models.PipelineStep{ + JobID: job.ID, + Seq: seq, + Name: step.Name, + RunCmd: step.Run, + UsesAction: step.Uses, + Status: "queued", + } + if _, err := o.db.Insert(s); err != nil { + return err + } + } + } + + // Enqueue jobs with no dependencies. + o.enqueueReadyJobs(run.ID, wf.Jobs) + + o.bus.Publish(events.SubjectPipelineTriggered, events.PipelineEvent{ //nolint:errcheck + RunID: run.ID, + RepoID: repo.ID, + Status: "queued", + At: now, + }) + + log.Printf("orchestrator: created run %d for %s/%s (%s)", run.ID, repo.Name, filePath, evt.After[:7]) + return nil +} + +// advanceDAG is called when a job finishes. It marks the job, checks whether +// all jobs are done (completing the run) or enqueues the next wave. +func (o *Orchestrator) advanceDAG(runID, jobID int64, result string) { + var job models.PipelineJob + if found, _ := o.db.ID(jobID).Get(&job); !found { + return + } + now := time.Now().UTC() + job.Status = result + job.FinishedAt = &now + o.db.ID(job.ID).Cols("status", "finished_at").Update(&job) //nolint:errcheck + + var run models.PipelineRun + if found, _ := o.db.ID(runID).Get(&run); !found { + return + } + + // Load all jobs for this run to check completion. + var allJobs []models.PipelineJob + o.db.Where("run_id = ?", runID).Find(&allJobs) + + // If any job failed, cancel remaining queued jobs and fail the run. + if result == "failed" { + for _, j := range allJobs { + if j.Status == "queued" { + j.Status = "skipped" + o.db.ID(j.ID).Cols("status").Update(&j) //nolint:errcheck + } + } + run.Status = "failed" + run.FinishedAt = &now + o.db.ID(run.ID).Cols("status", "finished_at").Update(&run) //nolint:errcheck + o.bus.Publish(events.SubjectPipelineFailed, events.PipelineEvent{RunID: run.ID, RepoID: run.RepoID, Status: "failed", At: now}) //nolint:errcheck + return + } + + // Check if all jobs are done. + allDone := true + for _, j := range allJobs { + if j.Status != "succeeded" && j.Status != "failed" && j.Status != "skipped" && j.Status != "cancelled" { + allDone = false + break + } + } + if allDone { + run.Status = "succeeded" + run.FinishedAt = &now + o.db.ID(run.ID).Cols("status", "finished_at").Update(&run) //nolint:errcheck + o.bus.Publish(events.SubjectPipelineCompleted, events.PipelineEvent{RunID: run.ID, RepoID: run.RepoID, Status: "succeeded", At: now}) //nolint:errcheck + return + } + + // Reload the workflow to get the job dependency graph, then enqueue next wave. + var pipeline models.Pipeline + if found, _ := o.db.ID(run.PipelineID).Get(&pipeline); !found { + return + } + var repo models.Repository + if found, _ := o.db.ID(run.RepoID).Get(&repo); !found { + return + } + wf, err := ParseWorkflow(repo.DiskPath, run.TriggerSHA, pipeline.FilePath) + if err != nil { + return + } + o.enqueueReadyJobs(runID, wf.Jobs) +} + +func (o *Orchestrator) enqueueReadyJobs(runID int64, wfJobs map[string]WorkflowJob) { + var dbJobs []models.PipelineJob + o.db.Where("run_id = ?", runID).Find(&dbJobs) + + completedNames := make(map[string]bool) + enqueuedNames := make(map[string]bool) + for _, j := range dbJobs { + if j.Status == "succeeded" { + completedNames[j.Name] = true + } + if j.Status == "running" || j.Status == "succeeded" { + enqueuedNames[j.Name] = true + } + } + + readyNames := ReadyJobs(wfJobs, completedNames, enqueuedNames) + for _, name := range readyNames { + for _, j := range dbJobs { + if j.Name == name && j.Status == "queued" { + o.bus.Publish(events.SubjectJobQueued, events.JobEvent{ //nolint:errcheck + RunID: runID, + JobID: j.ID, + }) + break + } + } + } +} + +// recoverStaleRuns marks any jobs/runs left in "running" state as failed +// (they were interrupted by a previous server crash). +func (o *Orchestrator) recoverStaleRuns() { + now := time.Now().UTC() + o.db.Where("status = 'running'").Cols("status", "finished_at"). + Update(&models.PipelineRun{Status: "failed", FinishedAt: &now}) //nolint:errcheck + o.db.Where("status = 'running'").Cols("status", "finished_at"). + Update(&models.PipelineJob{Status: "failed", FinishedAt: &now}) //nolint:errcheck + o.db.Where("status = 'running'").Cols("status", "finished_at"). + Update(&models.PipelineStep{Status: "failed", FinishedAt: &now}) //nolint:errcheck +} diff --git a/internal/domain/ci/parser.go b/internal/domain/ci/parser.go new file mode 100644 index 0000000..2485e37 --- /dev/null +++ b/internal/domain/ci/parser.go @@ -0,0 +1,79 @@ +package ci + +import ( + "fmt" + "strings" + + gitdomain "github.com/forgeo/forgebucket/internal/domain/git" + "gopkg.in/yaml.v3" +) + +const workflowDir = ".forgebucket/workflows" + +// ListWorkflows returns the file paths of all workflow YAML files in a repo at a +// given ref. Returns nil (no error) when the workflows directory doesn't exist. +func ListWorkflows(repoPath, ref string) ([]string, error) { + entries, err := gitdomain.TreeLS(repoPath, ref, workflowDir) + if err != nil { + // Directory does not exist at this ref — no workflows, not an error. + return nil, nil + } + var paths []string + for _, e := range entries { + if e.Type == "blob" && (strings.HasSuffix(e.Name, ".yml") || strings.HasSuffix(e.Name, ".yaml")) { + paths = append(paths, workflowDir+"/"+e.Name) + } + } + return paths, nil +} + +// ParseWorkflow reads and parses a single workflow YAML file from the repo at ref. +func ParseWorkflow(repoPath, ref, filePath string) (*WorkflowFile, error) { + data, err := gitdomain.BlobCat(repoPath, ref, filePath) + if err != nil { + return nil, fmt.Errorf("read %s: %w", filePath, err) + } + var wf WorkflowFile + if err := yaml.Unmarshal(data, &wf); err != nil { + return nil, fmt.Errorf("parse %s: %w", filePath, err) + } + return &wf, nil +} + +// MatchesPushTrigger reports whether a workflow should run for a push to ref. +// ref is the full ref name, e.g. "refs/heads/main". +func MatchesPushTrigger(wf *WorkflowFile, ref string) bool { + if wf.On.Push == nil { + return false + } + trigger := wf.On.Push + // No branch filter means "all branches". + if len(trigger.Branches) == 0 && len(trigger.Tags) == 0 { + return true + } + branch := strings.TrimPrefix(ref, "refs/heads/") + for _, pattern := range trigger.Branches { + if matchGlob(pattern, branch) { + return true + } + } + tag := strings.TrimPrefix(ref, "refs/tags/") + for _, pattern := range trigger.Tags { + if matchGlob(pattern, tag) { + return true + } + } + return false +} + +// matchGlob supports simple "*" wildcards (not full glob). +func matchGlob(pattern, s string) bool { + if pattern == "*" { + return true + } + if !strings.Contains(pattern, "*") { + return pattern == s + } + parts := strings.SplitN(pattern, "*", 2) + return strings.HasPrefix(s, parts[0]) && strings.HasSuffix(s, parts[1]) +} diff --git a/internal/domain/ci/runner_manager.go b/internal/domain/ci/runner_manager.go new file mode 100644 index 0000000..9fd91ce --- /dev/null +++ b/internal/domain/ci/runner_manager.go @@ -0,0 +1,86 @@ +package ci + +import ( + "context" + "encoding/json" + "log" + + "xorm.io/xorm" + + "github.com/forgeo/forgebucket/internal/config" + "github.com/forgeo/forgebucket/internal/events" +) + +// RunnerManager subscribes to job.queued events and dispatches them to the +// local Docker executor. A semaphore limits concurrent executions. +type RunnerManager struct { + db *xorm.Engine + bus events.EventBus + cfg *config.Config + sem chan struct{} +} + +func NewRunnerManager(db *xorm.Engine, bus events.EventBus, cfg *config.Config, maxConcurrent int) *RunnerManager { + if maxConcurrent <= 0 { + maxConcurrent = 4 + } + return &RunnerManager{ + db: db, + bus: bus, + cfg: cfg, + sem: make(chan struct{}, maxConcurrent), + } +} + +// Start subscribes to job.queued and dispatches executions until ctx is cancelled. +func (m *RunnerManager) Start(ctx context.Context) { + if !IsDockerAvailable() { + log.Printf("runner: Docker not available — CI execution disabled") + <-ctx.Done() + return + } + log.Printf("runner: started (max concurrent jobs: %d)", cap(m.sem)) + + wsDir := workspaceDir(m.cfg.ArtifactRoot) + + unsub, err := m.bus.Subscribe(events.SubjectJobQueued, func(_ string, data []byte) { + var evt events.JobEvent + if err := json.Unmarshal(data, &evt); err != nil { + log.Printf("runner: bad job.queued payload: %v", err) + return + } + + jc, ok := buildJobContext(m.db, evt.JobID) + if !ok { + log.Printf("runner: could not build job context for job %d", evt.JobID) + return + } + + // Acquire semaphore slot — blocks if at capacity. + select { + case m.sem <- struct{}{}: + case <-ctx.Done(): + return + } + + go func() { + defer func() { <-m.sem }() + // Sanitize the Docker image name before execution. + jc.Job.Image = sanitizeImage(jc.Job.Image) + ExecuteJob(ctx, m.db, m.bus, jc, wsDir) + }() + }) + if err != nil { + log.Printf("runner: subscribe job.queued: %v", err) + <-ctx.Done() + return + } + defer unsub() + + <-ctx.Done() + log.Printf("runner: stopping — draining %d active jobs", len(m.sem)) + // Wait for all running jobs to finish by filling the semaphore. + for i := 0; i < cap(m.sem); i++ { + m.sem <- struct{}{} + } +} diff --git a/internal/domain/ci/types.go b/internal/domain/ci/types.go new file mode 100644 index 0000000..0373a59 --- /dev/null +++ b/internal/domain/ci/types.go @@ -0,0 +1,58 @@ +package ci + +import "gopkg.in/yaml.v3" + +// WorkflowFile is the parsed representation of a .forgebucket/workflows/*.yml file. +type WorkflowFile struct { + Name string `yaml:"name"` + On WorkflowTrigger `yaml:"on"` + Jobs map[string]WorkflowJob `yaml:"jobs"` +} + +type WorkflowTrigger struct { + Push *PushTrigger `yaml:"push"` + PullRequest *PRTrigger `yaml:"pull_request"` +} + +type PushTrigger struct { + Branches []string `yaml:"branches"` + Tags []string `yaml:"tags"` +} + +type PRTrigger struct { + Branches []string `yaml:"branches"` +} + +type WorkflowJob struct { + Name string `yaml:"name"` + RunsOn string `yaml:"runs-on"` + Needs StringOrSlice `yaml:"needs"` + Steps []WorkflowStep `yaml:"steps"` +} + +type WorkflowStep struct { + Name string `yaml:"name"` + Uses string `yaml:"uses"` + Run string `yaml:"run"` + Env map[string]string `yaml:"env"` +} + +// StringOrSlice unmarshals a YAML value that may be either a single string +// ("needs: test") or a list ("needs: [test, build]"). +type StringOrSlice []string + +func (s *StringOrSlice) UnmarshalYAML(value *yaml.Node) error { + switch value.Kind { + case yaml.ScalarNode: + if value.Value != "" { + *s = []string{value.Value} + } + case yaml.SequenceNode: + var items []string + if err := value.Decode(&items); err != nil { + return err + } + *s = items + } + return nil +} diff --git a/internal/events/subjects.go b/internal/events/subjects.go index 5d326cd..7055751 100644 --- a/internal/events/subjects.go +++ b/internal/events/subjects.go @@ -33,6 +33,7 @@ const ( SubjectJobCompleted = "job.completed" SubjectJobFailed = "job.failed" SubjectArtifactPublished = "artifact.published" + SubjectPipelineLog = "pipeline.log" // Deployments (Phase 3A) SubjectDeploymentStarted = "deployment.started" diff --git a/internal/events/types.go b/internal/events/types.go index 921e93f..77260ad 100644 --- a/internal/events/types.go +++ b/internal/events/types.go @@ -57,6 +57,28 @@ type AuditEvent struct { At time.Time `json:"at"` } +type PipelineEvent struct { + RunID int64 `json:"runId"` + RepoID int64 `json:"repoId"` + Status string `json:"status"` + At time.Time `json:"at"` +} + +type JobEvent struct { + RunID int64 `json:"runId"` + JobID int64 `json:"jobId"` + Status string `json:"status"` + At time.Time `json:"at"` +} + +type LogChunkEvent struct { + RunID int64 `json:"runId"` + JobID int64 `json:"jobId"` + StepID int64 `json:"stepId"` + ChunkIndex int `json:"chunkIndex"` + Content string `json:"content"` +} + // WSEnvelope wraps any event for delivery over the WebSocket connection. type WSEnvelope struct { Subject string `json:"subject"` diff --git a/internal/models/migrations/001_init.go b/internal/models/migrations/001_init.go index 4816613..92e01bb 100644 --- a/internal/models/migrations/001_init.go +++ b/internal/models/migrations/001_init.go @@ -34,5 +34,8 @@ func Run(engine *xorm.Engine) error { if err := Run007(engine); err != nil { return err } - return Run008(engine) + if err := Run008(engine); err != nil { + return err + } + return Run009(engine) } diff --git a/internal/models/migrations/009_ci.go b/internal/models/migrations/009_ci.go new file mode 100644 index 0000000..a2d64a0 --- /dev/null +++ b/internal/models/migrations/009_ci.go @@ -0,0 +1,18 @@ +package migrations + +import ( + "github.com/forgeo/forgebucket/internal/models" + "xorm.io/xorm" +) + +func Run009(engine *xorm.Engine) error { + return engine.Sync2( + &models.Pipeline{}, + &models.PipelineRun{}, + &models.PipelineJob{}, + &models.PipelineStep{}, + &models.PipelineStepLog{}, + &models.Runner{}, + &models.Artifact{}, + ) +} diff --git a/internal/models/pipeline.go b/internal/models/pipeline.go new file mode 100644 index 0000000..e4a9e27 --- /dev/null +++ b/internal/models/pipeline.go @@ -0,0 +1,86 @@ +package models + +import "time" + +// Pipeline represents a workflow definition file stored in the repository. +type Pipeline struct { + ID int64 `xorm:"'id' pk autoincr" json:"id"` + RepoID int64 `xorm:"'repo_id' notnull index" json:"repoId"` + Name string `xorm:"'name' varchar(255)" json:"name"` + FilePath string `xorm:"'file_path' varchar(500)" json:"filePath"` + CreatedAt time.Time `xorm:"'created_at' created" json:"createdAt"` + UpdatedAt time.Time `xorm:"'updated_at' updated" json:"updatedAt"` +} + +// PipelineRun is a single execution of a Pipeline. +type PipelineRun struct { + ID int64 `xorm:"'id' pk autoincr" json:"id"` + PipelineID int64 `xorm:"'pipeline_id' notnull index" json:"pipelineId"` + RepoID int64 `xorm:"'repo_id' notnull index" json:"repoId"` + TriggerRef string `xorm:"'trigger_ref' varchar(255)" json:"triggerRef"` // refs/heads/main + TriggerSHA string `xorm:"'trigger_sha' varchar(40)" json:"triggerSha"` + TriggeredBy string `xorm:"'triggered_by' varchar(64)" json:"triggeredBy"` + Status string `xorm:"'status' varchar(20)" json:"status"` // queued/running/succeeded/failed/cancelled + StartedAt *time.Time `xorm:"'started_at'" json:"startedAt"` + FinishedAt *time.Time `xorm:"'finished_at'" json:"finishedAt"` + CreatedAt time.Time `xorm:"'created_at' created" json:"createdAt"` +} + +// PipelineJob is a single node in the DAG for a PipelineRun. +type PipelineJob struct { + ID int64 `xorm:"'id' pk autoincr" json:"id"` + RunID int64 `xorm:"'run_id' notnull index" json:"runId"` + Name string `xorm:"'name' varchar(255)" json:"name"` + Image string `xorm:"'image' varchar(500)" json:"image"` // Docker image + Needs string `xorm:"'needs' text" json:"needs"` // JSON array of dependency job names + Status string `xorm:"'status' varchar(20)" json:"status"` + StartedAt *time.Time `xorm:"'started_at'" json:"startedAt"` + FinishedAt *time.Time `xorm:"'finished_at'" json:"finishedAt"` + CreatedAt time.Time `xorm:"'created_at' created" json:"createdAt"` +} + +// PipelineStep is a single command within a PipelineJob. +type PipelineStep struct { + ID int64 `xorm:"'id' pk autoincr" json:"id"` + JobID int64 `xorm:"'job_id' notnull index" json:"jobId"` + Seq int `xorm:"'seq'" json:"seq"` // execution order within the job + Name string `xorm:"'name' varchar(255)" json:"name"` + RunCmd string `xorm:"'run_cmd' text" json:"runCmd"` // shell command (run:) + UsesAction string `xorm:"'uses_action' varchar(255)" json:"usesAction"` // built-in action (uses:) + Status string `xorm:"'status' varchar(20)" json:"status"` + ExitCode int `xorm:"'exit_code'" json:"exitCode"` + StartedAt *time.Time `xorm:"'started_at'" json:"startedAt"` + FinishedAt *time.Time `xorm:"'finished_at'" json:"finishedAt"` +} + +// PipelineStepLog stores append-only log output for a step. +type PipelineStepLog struct { + ID int64 `xorm:"'id' pk autoincr" json:"id"` + StepID int64 `xorm:"'step_id' notnull index" json:"stepId"` + ChunkIndex int `xorm:"'chunk_index'" json:"chunkIndex"` + Content string `xorm:"'content' text" json:"content"` + CreatedAt time.Time `xorm:"'created_at' created" json:"createdAt"` +} + +// Runner is a registered execution backend. +type Runner struct { + ID int64 `xorm:"'id' pk autoincr" json:"id"` + Name string `xorm:"'name' unique varchar(100)" json:"name"` + Labels string `xorm:"'labels' text" json:"labels"` // JSON array of capability labels + Status string `xorm:"'status' varchar(20)" json:"status"` // idle/busy/offline + TokenHash string `xorm:"'token_hash' varchar(64)" json:"-"` + LastSeenAt time.Time `xorm:"'last_seen_at'" json:"lastSeenAt"` + CreatedAt time.Time `xorm:"'created_at' created" json:"createdAt"` +} + +// Artifact is a file produced by a PipelineRun and stored for later download. +type Artifact struct { + ID int64 `xorm:"'id' pk autoincr" json:"id"` + RunID int64 `xorm:"'run_id' notnull index" json:"runId"` + RepoID int64 `xorm:"'repo_id' notnull index" json:"repoId"` + Name string `xorm:"'name' varchar(255)" json:"name"` + StoragePath string `xorm:"'storage_path' varchar(500)" json:"-"` + Size int64 `xorm:"'size'" json:"size"` + ContentType string `xorm:"'content_type' varchar(100)" json:"contentType"` + CreatedAt time.Time `xorm:"'created_at' created" json:"createdAt"` +}