Files
ForgeBucket/internal/api/handlers/pipelines.go
T

282 lines
8.1 KiB
Go

package handlers
import (
"net/http"
"strconv"
"time"
"github.com/go-chi/chi/v5"
"xorm.io/xorm"
"github.com/forgeo/forgebucket/internal/api/middleware"
"github.com/forgeo/forgebucket/internal/models"
)
type PipelineHandler struct {
db *xorm.Engine
}
func NewPipelineHandler(db *xorm.Engine) *PipelineHandler {
return &PipelineHandler{db: db}
}
// recentRunResponse extends PipelineRun with repo context for the global feed.
type recentRunResponse struct {
models.PipelineRun
RepoName string `json:"repoName"`
OwnerName string `json:"ownerName"`
}
// ListRecentRuns returns recent runs across all repos owned by the current user.
// GET /api/v1/pipelines/runs
func (h *PipelineHandler) ListRecentRuns(w http.ResponseWriter, r *http.Request) {
userID, _ := middleware.UserIDFromContext(r.Context())
limit := 30
if l, err := strconv.Atoi(r.URL.Query().Get("limit")); err == nil && l > 0 && l <= 100 {
limit = l
}
// Repos owned by this user.
var repos []models.Repository
h.db.Where("owner_id = ?", userID).Cols("id", "name").Find(&repos)
if len(repos) == 0 {
jsonOK(w, []recentRunResponse{})
return
}
repoIDs := make([]int64, len(repos))
repoNameByID := make(map[int64]string, len(repos))
for i, rp := range repos {
repoIDs[i] = rp.ID
repoNameByID[rp.ID] = rp.Name
}
// Owner username.
var owner models.User
h.db.ID(userID).Cols("username").Get(&owner)
var runs []models.PipelineRun
h.db.In("repo_id", repoIDs).Desc("id").Limit(limit).Find(&runs)
if runs == nil {
runs = []models.PipelineRun{}
}
result := make([]recentRunResponse, len(runs))
for i, run := range runs {
result[i] = recentRunResponse{
PipelineRun: run,
RepoName: repoNameByID[run.RepoID],
OwnerName: owner.Username,
}
}
jsonOK(w, result)
}
// ListPipelines returns all pipeline definitions for a repository.
func (h *PipelineHandler) ListPipelines(w http.ResponseWriter, r *http.Request) {
repoID, ok := h.repoID(w, r)
if !ok {
return
}
var pipelines []models.Pipeline
if err := h.db.Where("repo_id = ?", repoID).Find(&pipelines); err != nil {
jsonError(w, "could not list pipelines", http.StatusInternalServerError)
return
}
if pipelines == nil {
pipelines = []models.Pipeline{}
}
jsonOK(w, pipelines)
}
// ListRuns returns pipeline runs for a repository, most recent first.
func (h *PipelineHandler) ListRuns(w http.ResponseWriter, r *http.Request) {
repoID, ok := h.repoID(w, r)
if !ok {
return
}
limit := 30
if l, err := strconv.Atoi(r.URL.Query().Get("limit")); err == nil && l > 0 && l <= 100 {
limit = l
}
var runs []models.PipelineRun
if err := h.db.Where("repo_id = ?", repoID).Desc("id").Limit(limit).Find(&runs); err != nil {
jsonError(w, "could not list runs", http.StatusInternalServerError)
return
}
if runs == nil {
runs = []models.PipelineRun{}
}
jsonOK(w, runs)
}
type runDetailResponse struct {
models.PipelineRun
Jobs []jobDetailResponse `json:"jobs"`
}
type jobDetailResponse struct {
models.PipelineJob
Steps []models.PipelineStep `json:"steps"`
}
// GetRun returns a run with its full job + step tree.
func (h *PipelineHandler) GetRun(w http.ResponseWriter, r *http.Request) {
run, ok := h.lookupRun(w, r)
if !ok {
return
}
var jobs []models.PipelineJob
h.db.Where("run_id = ?", run.ID).Asc("id").Find(&jobs)
jobDetails := make([]jobDetailResponse, len(jobs))
for i, job := range jobs {
var steps []models.PipelineStep
h.db.Where("job_id = ?", job.ID).Asc("seq").Find(&steps)
if steps == nil {
steps = []models.PipelineStep{}
}
jobDetails[i] = jobDetailResponse{PipelineJob: job, Steps: steps}
}
jsonOK(w, runDetailResponse{PipelineRun: *run, Jobs: jobDetails})
}
// GetJobLogs returns all log chunks for a job, ordered by step seq and chunk index.
func (h *PipelineHandler) GetJobLogs(w http.ResponseWriter, r *http.Request) {
_, ok := h.lookupRun(w, r)
if !ok {
return
}
jobID, err := strconv.ParseInt(chi.URLParam(r, "jobID"), 10, 64)
if err != nil {
jsonError(w, "invalid job ID", http.StatusBadRequest)
return
}
// Verify job belongs to this run.
var job models.PipelineJob
runID, _ := strconv.ParseInt(chi.URLParam(r, "runID"), 10, 64)
if found, _ := h.db.Where("id = ? AND run_id = ?", jobID, runID).Get(&job); !found {
jsonError(w, "job not found", http.StatusNotFound)
return
}
var steps []models.PipelineStep
h.db.Where("job_id = ?", jobID).Asc("seq").Find(&steps)
type stepLogs struct {
models.PipelineStep
Logs []models.PipelineStepLog `json:"logs"`
}
result := make([]stepLogs, len(steps))
for i, step := range steps {
var logs []models.PipelineStepLog
h.db.Where("step_id = ?", step.ID).Asc("chunk_index").Find(&logs)
if logs == nil {
logs = []models.PipelineStepLog{}
}
result[i] = stepLogs{PipelineStep: step, Logs: logs}
}
jsonOK(w, result)
}
// CancelRun marks a queued or running run as cancelled.
func (h *PipelineHandler) CancelRun(w http.ResponseWriter, r *http.Request) {
run, ok := h.lookupRun(w, r)
if !ok {
return
}
if run.Status != "queued" && run.Status != "running" {
jsonError(w, "run is not cancellable in its current state", http.StatusConflict)
return
}
now := time.Now().UTC()
run.Status = "cancelled"
run.FinishedAt = &now
if _, err := h.db.ID(run.ID).Cols("status", "finished_at").Update(run); err != nil {
jsonError(w, "could not cancel run", http.StatusInternalServerError)
return
}
// Cancel any queued jobs.
h.db.Where("run_id = ? AND status = 'queued'", run.ID). //nolint:errcheck
Cols("status").Update(&models.PipelineJob{Status: "cancelled"})
jsonOK(w, run)
}
// RetryJob re-queues a failed job by resetting its status and re-publishing job.queued.
func (h *PipelineHandler) RetryJob(w http.ResponseWriter, r *http.Request) {
run, ok := h.lookupRun(w, r)
if !ok {
return
}
jobID, err := strconv.ParseInt(chi.URLParam(r, "jobID"), 10, 64)
if err != nil {
jsonError(w, "invalid job ID", http.StatusBadRequest)
return
}
var job models.PipelineJob
if found, _ := h.db.Where("id = ? AND run_id = ?", jobID, run.ID).Get(&job); !found {
jsonError(w, "job not found", http.StatusNotFound)
return
}
if job.Status != "failed" && job.Status != "cancelled" {
jsonError(w, "only failed or cancelled jobs can be retried", http.StatusConflict)
return
}
job.Status = "queued"
job.StartedAt = nil
job.FinishedAt = nil
h.db.ID(job.ID).Cols("status", "started_at", "finished_at").Update(&job) //nolint:errcheck
// Reset step statuses.
h.db.Where("job_id = ?", job.ID).Cols("status", "exit_code", "started_at", "finished_at"). //nolint:errcheck
Update(&models.PipelineStep{Status: "queued"})
// Also reset the run status if it was failed/cancelled.
if run.Status == "failed" || run.Status == "cancelled" {
run.Status = "running"
run.FinishedAt = nil
h.db.ID(run.ID).Cols("status", "finished_at").Update(run) //nolint:errcheck
}
jsonOK(w, job)
}
// ── Helpers ───────────────────────────────────────────────────────────────────
func (h *PipelineHandler) repoID(w http.ResponseWriter, r *http.Request) (int64, bool) {
owner := chi.URLParam(r, "owner")
repoName := chi.URLParam(r, "repo")
var u models.User
if found, _ := h.db.Where("username = ?", owner).Get(&u); !found {
jsonError(w, "repository not found", http.StatusNotFound)
return 0, false
}
var repo models.Repository
if found, _ := h.db.Where("owner_id = ? AND name = ?", u.ID, repoName).Get(&repo); !found {
jsonError(w, "repository not found", http.StatusNotFound)
return 0, false
}
return repo.ID, true
}
func (h *PipelineHandler) lookupRun(w http.ResponseWriter, r *http.Request) (*models.PipelineRun, bool) {
repoID, ok := h.repoID(w, r)
if !ok {
return nil, false
}
runID, err := strconv.ParseInt(chi.URLParam(r, "runID"), 10, 64)
if err != nil {
jsonError(w, "invalid run ID", http.StatusBadRequest)
return nil, false
}
var run models.PipelineRun
if found, _ := h.db.Where("id = ? AND repo_id = ?", runID, repoID).Get(&run); !found {
jsonError(w, "run not found", http.StatusNotFound)
return nil, false
}
return &run, true
}