tenseleyflow/shithub / 104c4c4

Browse files

api/runners: accept workflow step status updates

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
104c4c4af292b0ac8778c85a47c8528b4d232077
Parents
765de35
Tree
5326bb2

2 changed files

StatusFile+-
M internal/web/handlers/api/runners.go 168 0
M internal/web/handlers/api/runners_test.go 80 0
internal/web/handlers/api/runners.gomodified
@@ -19,6 +19,7 @@ import (
1919
 	"github.com/jackc/pgx/v5"
2020
 	"github.com/jackc/pgx/v5/pgtype"
2121
 
22
+	"github.com/tenseleyFlow/shithub/internal/actions/finalize"
2223
 	"github.com/tenseleyFlow/shithub/internal/actions/runnerlabels"
2324
 	"github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
2425
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
@@ -27,6 +28,7 @@ import (
2728
 	checksdb "github.com/tenseleyFlow/shithub/internal/checks/sqlc"
2829
 	"github.com/tenseleyFlow/shithub/internal/infra/metrics"
2930
 	"github.com/tenseleyFlow/shithub/internal/ratelimit"
31
+	"github.com/tenseleyFlow/shithub/internal/worker"
3032
 )
3133
 
3234
 var runnerHeartbeatLimit = ratelimit.Policy{
@@ -38,6 +40,7 @@ var runnerHeartbeatLimit = ratelimit.Policy{
3840
 func (h *Handlers) mountRunners(r chi.Router) {
3941
 	r.Post("/api/v1/runners/heartbeat", h.runnerHeartbeat)
4042
 	r.Post("/api/v1/jobs/{id}/logs", h.runnerJobLogs)
43
+	r.Post("/api/v1/jobs/{id}/steps/{step_id}/status", h.runnerStepStatus)
4144
 	r.Post("/api/v1/jobs/{id}/status", h.runnerJobStatus)
4245
 	r.Post("/api/v1/jobs/{id}/artifacts/upload", h.runnerJobArtifactUpload)
4346
 	r.Post("/api/v1/jobs/{id}/cancel-check", h.runnerJobCancelCheck)
@@ -413,6 +416,43 @@ func (h *Handlers) runnerJobStatus(w http.ResponseWriter, r *http.Request) {
413416
 	h.writeNextTokenResponse(w, r, http.StatusOK, auth, bodyMap)
414417
 }
415418
 
419
+func (h *Handlers) runnerStepStatus(w http.ResponseWriter, r *http.Request) {
420
+	auth, ok := h.authenticateRunnerJob(w, r)
421
+	if !ok {
422
+		return
423
+	}
424
+	stepID, err := strconv.ParseInt(chi.URLParam(r, "step_id"), 10, 64)
425
+	if err != nil || stepID <= 0 {
426
+		writeAPIError(w, http.StatusNotFound, "step not found")
427
+		return
428
+	}
429
+	q := actionsdb.New()
430
+	step, err := q.GetWorkflowStepByID(r.Context(), h.d.Pool, stepID)
431
+	if err != nil || step.JobID != auth.Job.ID {
432
+		writeAPIError(w, http.StatusNotFound, "step not found")
433
+		return
434
+	}
435
+	var body runnerStatusRequest
436
+	if err := decodeJSONBody(r.Body, &body); err != nil {
437
+		writeAPIError(w, http.StatusBadRequest, "invalid JSON: "+err.Error())
438
+		return
439
+	}
440
+	update, terminal, err := normalizeStepStatusUpdate(step, body)
441
+	if err != nil {
442
+		writeAPIError(w, http.StatusBadRequest, err.Error())
443
+		return
444
+	}
445
+	updated, err := h.applyStepStatus(r.Context(), step, update, terminal)
446
+	if err != nil {
447
+		writeAPIError(w, http.StatusInternalServerError, "step status update failed")
448
+		return
449
+	}
450
+	h.writeNextTokenResponse(w, r, http.StatusOK, auth, map[string]any{
451
+		"status":     string(updated.Status),
452
+		"conclusion": nullableConclusion(updated.Conclusion),
453
+	})
454
+}
455
+
416456
 type normalizedJobStatusUpdate struct {
417457
 	Status      actionsdb.WorkflowJobStatus
418458
 	Conclusion  actionsdb.NullCheckConclusion
@@ -488,6 +528,134 @@ func validWorkflowJobTransition(from, to actionsdb.WorkflowJobStatus) bool {
488528
 	}
489529
 }
490530
 
531
+type normalizedStepStatusUpdate struct {
532
+	Status      actionsdb.WorkflowStepStatus
533
+	Conclusion  actionsdb.NullCheckConclusion
534
+	StartedAt   pgtype.Timestamptz
535
+	CompletedAt pgtype.Timestamptz
536
+}
537
+
538
+func normalizeStepStatusUpdate(step actionsdb.WorkflowStep, body runnerStatusRequest) (normalizedStepStatusUpdate, bool, error) {
539
+	now := time.Now().UTC()
540
+	status := actionsdb.WorkflowStepStatus(strings.TrimSpace(body.Status))
541
+	if status == "" {
542
+		return normalizedStepStatusUpdate{}, false, errors.New("status is required")
543
+	}
544
+	if !validWorkflowStepTransition(step.Status, status) {
545
+		return normalizedStepStatusUpdate{}, false, fmt.Errorf("invalid status transition %s -> %s", step.Status, status)
546
+	}
547
+	startedAt := step.StartedAt
548
+	if body.StartedAt != "" {
549
+		t, err := parseTimeOptional(body.StartedAt)
550
+		if err != nil {
551
+			return normalizedStepStatusUpdate{}, false, fmt.Errorf("started_at: %w", err)
552
+		}
553
+		startedAt = pgtype.Timestamptz{Time: t, Valid: !t.IsZero()}
554
+	}
555
+	if !startedAt.Valid && (status == actionsdb.WorkflowStepStatusRunning ||
556
+		status == actionsdb.WorkflowStepStatusCompleted ||
557
+		status == actionsdb.WorkflowStepStatusCancelled) {
558
+		startedAt = pgtype.Timestamptz{Time: now, Valid: true}
559
+	}
560
+	completedAt := step.CompletedAt
561
+	terminal := status == actionsdb.WorkflowStepStatusCompleted ||
562
+		status == actionsdb.WorkflowStepStatusCancelled ||
563
+		status == actionsdb.WorkflowStepStatusSkipped
564
+	if body.CompletedAt != "" {
565
+		t, err := parseTimeOptional(body.CompletedAt)
566
+		if err != nil {
567
+			return normalizedStepStatusUpdate{}, false, fmt.Errorf("completed_at: %w", err)
568
+		}
569
+		completedAt = pgtype.Timestamptz{Time: t, Valid: !t.IsZero()}
570
+	}
571
+	if terminal && !completedAt.Valid {
572
+		completedAt = pgtype.Timestamptz{Time: now, Valid: true}
573
+	}
574
+	conclusion := actionsdb.NullCheckConclusion{}
575
+	if terminal {
576
+		c := strings.TrimSpace(body.Conclusion)
577
+		if c == "" && status == actionsdb.WorkflowStepStatusCancelled {
578
+			c = "cancelled"
579
+		}
580
+		if c == "" && status == actionsdb.WorkflowStepStatusSkipped {
581
+			c = "skipped"
582
+		}
583
+		if !validRunnerConclusion(c) {
584
+			return normalizedStepStatusUpdate{}, false, errors.New("invalid or missing conclusion")
585
+		}
586
+		conclusion = actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusion(c), Valid: true}
587
+	} else if strings.TrimSpace(body.Conclusion) != "" {
588
+		return normalizedStepStatusUpdate{}, false, errors.New("conclusion is only valid for terminal statuses")
589
+	}
590
+	return normalizedStepStatusUpdate{
591
+		Status:      status,
592
+		Conclusion:  conclusion,
593
+		StartedAt:   startedAt,
594
+		CompletedAt: completedAt,
595
+	}, terminal, nil
596
+}
597
+
598
+func validWorkflowStepTransition(from, to actionsdb.WorkflowStepStatus) bool {
599
+	switch to {
600
+	case actionsdb.WorkflowStepStatusRunning:
601
+		return from == actionsdb.WorkflowStepStatusQueued || from == actionsdb.WorkflowStepStatusRunning
602
+	case actionsdb.WorkflowStepStatusCompleted:
603
+		return from == actionsdb.WorkflowStepStatusQueued || from == actionsdb.WorkflowStepStatusRunning || from == actionsdb.WorkflowStepStatusCompleted
604
+	case actionsdb.WorkflowStepStatusCancelled:
605
+		return from == actionsdb.WorkflowStepStatusQueued || from == actionsdb.WorkflowStepStatusRunning || from == actionsdb.WorkflowStepStatusCancelled
606
+	case actionsdb.WorkflowStepStatusSkipped:
607
+		return from == actionsdb.WorkflowStepStatusQueued || from == actionsdb.WorkflowStepStatusRunning || from == actionsdb.WorkflowStepStatusSkipped
608
+	default:
609
+		return false
610
+	}
611
+}
612
+
613
+func (h *Handlers) applyStepStatus(
614
+	ctx context.Context,
615
+	step actionsdb.WorkflowStep,
616
+	update normalizedStepStatusUpdate,
617
+	terminal bool,
618
+) (actionsdb.WorkflowStep, error) {
619
+	q := actionsdb.New()
620
+	tx, err := h.d.Pool.Begin(ctx)
621
+	if err != nil {
622
+		return actionsdb.WorkflowStep{}, err
623
+	}
624
+	committed := false
625
+	defer func() {
626
+		if !committed {
627
+			_ = tx.Rollback(ctx)
628
+		}
629
+	}()
630
+	updated, err := q.UpdateWorkflowStepStatus(ctx, tx, actionsdb.UpdateWorkflowStepStatusParams{
631
+		ID:          step.ID,
632
+		Status:      update.Status,
633
+		Conclusion:  update.Conclusion,
634
+		StartedAt:   update.StartedAt,
635
+		CompletedAt: update.CompletedAt,
636
+	})
637
+	if err != nil {
638
+		return actionsdb.WorkflowStep{}, err
639
+	}
640
+	shouldNotify := false
641
+	if terminal && h.d.ObjectStore != nil {
642
+		if _, err := worker.Enqueue(ctx, tx, finalize.KindWorkflowFinalizeStep, finalize.Payload{StepID: step.ID}, worker.EnqueueOptions{}); err != nil {
643
+			return actionsdb.WorkflowStep{}, err
644
+		}
645
+		shouldNotify = true
646
+	}
647
+	if err := tx.Commit(ctx); err != nil {
648
+		return actionsdb.WorkflowStep{}, err
649
+	}
650
+	committed = true
651
+	if shouldNotify {
652
+		if err := worker.Notify(ctx, h.d.Pool); err != nil && h.d.Logger != nil {
653
+			h.d.Logger.WarnContext(ctx, "runner step finalizer notify failed", "step_id", step.ID, "error", err)
654
+		}
655
+	}
656
+	return updated, nil
657
+}
658
+
491659
 func (h *Handlers) applyJobStatus(
492660
 	ctx context.Context,
493661
 	job actionsdb.WorkflowJob,
internal/web/handlers/api/runners_test.gomodified
@@ -20,6 +20,7 @@ import (
2020
 	"github.com/jackc/pgx/v5/pgtype"
2121
 	"github.com/jackc/pgx/v5/pgxpool"
2222
 
23
+	"github.com/tenseleyFlow/shithub/internal/actions/finalize"
2324
 	"github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
2425
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
2526
 	"github.com/tenseleyFlow/shithub/internal/actions/trigger"
@@ -30,6 +31,7 @@ import (
3031
 	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
3132
 	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
3233
 	apih "github.com/tenseleyFlow/shithub/internal/web/handlers/api"
34
+	workerdb "github.com/tenseleyFlow/shithub/internal/worker/sqlc"
3335
 )
3436
 
3537
 const runnerAPIFixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
@@ -227,6 +229,84 @@ func TestRunnerArtifactUploadReturnsSignedURL(t *testing.T) {
227229
 	}
228230
 }
229231
 
232
+func TestRunnerStepStatusEnqueuesFinalizeWorker(t *testing.T) {
233
+	ctx := context.Background()
234
+	pool := dbtest.NewTestDB(t)
235
+	logger := slog.New(slog.NewTextHandler(io.Discard, nil))
236
+	repoID, userID := setupRunnerAPIRepo(t, pool)
237
+	enqueueRunnerAPIRun(t, pool, logger, repoID, userID)
238
+	token, _ := registerRunnerForTest(t, pool, []string{"ubuntu-latest"}, 1)
239
+	router := newRunnerAPIRouter(t, pool, logger, runnerAPISigner(t, time.Now()), storage.NewMemoryStore())
240
+
241
+	req := httptest.NewRequest(http.MethodPost, "/api/v1/runners/heartbeat",
242
+		strings.NewReader(`{"labels":["ubuntu-latest"],"capacity":1}`))
243
+	req.Header.Set("Authorization", "Bearer "+token)
244
+	rr := httptest.NewRecorder()
245
+	router.ServeHTTP(rr, req)
246
+	if rr.Code != http.StatusOK {
247
+		t.Fatalf("heartbeat status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
248
+	}
249
+	var claim struct {
250
+		Token string `json:"token"`
251
+		Job   struct {
252
+			ID    int64 `json:"id"`
253
+			Steps []struct {
254
+				ID  int64  `json:"id"`
255
+				Run string `json:"run"`
256
+			} `json:"steps"`
257
+		} `json:"job"`
258
+	}
259
+	if err := json.Unmarshal(rr.Body.Bytes(), &claim); err != nil {
260
+		t.Fatalf("decode claim: %v", err)
261
+	}
262
+	if len(claim.Job.Steps) < 2 {
263
+		t.Fatalf("claim steps: %+v", claim.Job.Steps)
264
+	}
265
+	stepID := claim.Job.Steps[1].ID
266
+
267
+	req = httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/jobs/%d/steps/%d/status", claim.Job.ID, stepID),
268
+		strings.NewReader(`{"status":"completed","conclusion":"success"}`))
269
+	req.Header.Set("Authorization", "Bearer "+claim.Token)
270
+	rr = httptest.NewRecorder()
271
+	router.ServeHTTP(rr, req)
272
+	if rr.Code != http.StatusOK {
273
+		t.Fatalf("step status: got %d, want 200; body=%s", rr.Code, rr.Body.String())
274
+	}
275
+	var statusResp struct {
276
+		Status     string `json:"status"`
277
+		Conclusion string `json:"conclusion"`
278
+		NextToken  string `json:"next_token"`
279
+	}
280
+	if err := json.Unmarshal(rr.Body.Bytes(), &statusResp); err != nil {
281
+		t.Fatalf("decode status response: %v", err)
282
+	}
283
+	if statusResp.Status != "completed" || statusResp.Conclusion != "success" || statusResp.NextToken == "" {
284
+		t.Fatalf("unexpected step status response: %+v", statusResp)
285
+	}
286
+	step, err := actionsdb.New().GetWorkflowStepByID(ctx, pool, stepID)
287
+	if err != nil {
288
+		t.Fatalf("GetWorkflowStepByID: %v", err)
289
+	}
290
+	if step.Status != actionsdb.WorkflowStepStatusCompleted ||
291
+		!step.Conclusion.Valid || step.Conclusion.CheckConclusion != actionsdb.CheckConclusionSuccess {
292
+		t.Fatalf("step was not completed: %+v", step)
293
+	}
294
+	job, err := workerdb.New().ClaimJob(ctx, pool, workerdb.ClaimJobParams{
295
+		Kind:     string(finalize.KindWorkflowFinalizeStep),
296
+		LockedBy: pgtype.Text{String: "test", Valid: true},
297
+	})
298
+	if err != nil {
299
+		t.Fatalf("ClaimJob finalize: %v", err)
300
+	}
301
+	var payload finalize.Payload
302
+	if err := json.Unmarshal(job.Payload, &payload); err != nil {
303
+		t.Fatalf("decode worker payload: %v", err)
304
+	}
305
+	if payload.StepID != stepID {
306
+		t.Fatalf("worker payload: %+v, want step_id %d", payload, stepID)
307
+	}
308
+}
309
+
230310
 func newRunnerAPIRouter(
231311
 	t *testing.T,
232312
 	pool *pgxpool.Pool,