tenseleyflow/shithub / 740d0b4

Browse files

actions/lifecycle: enqueue workflow reruns

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
740d0b4a5f0dc76f079cdf5bc98bbcc2dec1c9ad
Parents
0d477a4
Tree
073839b

4 changed files

StatusFile+-
M internal/actions/lifecycle/cancel.go 1 9
A internal/actions/lifecycle/deps.go 18 0
A internal/actions/lifecycle/rerun.go 181 0
A internal/actions/lifecycle/rerun_test.go 180 0
internal/actions/lifecycle/cancel.gomodified
@@ -1,20 +1,18 @@
11
 // SPDX-License-Identifier: AGPL-3.0-or-later
22
 
33
 // Package lifecycle owns user-visible Actions run/job lifecycle mutations:
4
-// cancellation now, with re-runs and retention following in later S41g slices.
4
+// cancellation, re-runs, and retention as the S41g slices land.
55
 package lifecycle
66
 
77
 import (
88
 	"context"
99
 	"errors"
1010
 	"fmt"
11
-	"log/slog"
1211
 	"strings"
1312
 	"time"
1413
 
1514
 	"github.com/jackc/pgx/v5"
1615
 	"github.com/jackc/pgx/v5/pgtype"
17
-	"github.com/jackc/pgx/v5/pgxpool"
1816
 
1917
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
2018
 	"github.com/tenseleyFlow/shithub/internal/checks"
@@ -28,12 +26,6 @@ const (
2826
 	CancelReasonTimeout     = "timeout"
2927
 )
3028
 
31
-// Deps wires lifecycle operations to postgres and optional warning logs.
32
-type Deps struct {
33
-	Pool   *pgxpool.Pool
34
-	Logger *slog.Logger
35
-}
36
-
3729
 // CancelResult summarizes the durable state changes from a cancel request.
3830
 type CancelResult struct {
3931
 	RunID         int64
internal/actions/lifecycle/deps.goadded
@@ -0,0 +1,18 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package lifecycle
4
+
5
+import (
6
+	"log/slog"
7
+
8
+	"github.com/jackc/pgx/v5/pgxpool"
9
+
10
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
11
+)
12
+
13
+// Deps wires lifecycle operations to postgres and optional runtime services.
14
+type Deps struct {
15
+	Pool   *pgxpool.Pool
16
+	RepoFS *storage.RepoFS
17
+	Logger *slog.Logger
18
+}
internal/actions/lifecycle/rerun.goadded
@@ -0,0 +1,181 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package lifecycle
4
+
5
+import (
6
+	"context"
7
+	"crypto/rand"
8
+	"encoding/hex"
9
+	"encoding/json"
10
+	"errors"
11
+	"fmt"
12
+	"io"
13
+	"log/slog"
14
+
15
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
16
+	"github.com/tenseleyFlow/shithub/internal/actions/trigger"
17
+	"github.com/tenseleyFlow/shithub/internal/actions/workflow"
18
+	orgsdb "github.com/tenseleyFlow/shithub/internal/orgs/sqlc"
19
+	repogit "github.com/tenseleyFlow/shithub/internal/repos/git"
20
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
21
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
22
+)
23
+
24
+var (
25
+	ErrActorRequired              = errors.New("actions lifecycle: actor required")
26
+	ErrRunNotRerunnable           = errors.New("actions lifecycle: run is not rerunnable")
27
+	ErrWorkflowSourceUnavailable  = errors.New("actions lifecycle: workflow source unavailable")
28
+	ErrWorkflowSourceInvalid      = errors.New("actions lifecycle: workflow source invalid")
29
+	errWorkflowPayloadNotJSON     = errors.New("actions lifecycle: event payload is not a JSON object")
30
+	errWorkflowSourceEmpty        = errors.New("actions lifecycle: workflow source is empty")
31
+	errWorkflowSourceHasDiagError = errors.New("actions lifecycle: workflow source has error diagnostics")
32
+)
33
+
34
+// RerunResult summarizes the newly queued run produced from a terminal source
35
+// run.
36
+type RerunResult struct {
37
+	ParentRunID int64
38
+	RunID       int64
39
+	RunIndex    int64
40
+	CheckRunIDs []int64
41
+}
42
+
43
+// RerunRun queues a fresh workflow run from a terminal source run. It reads the
44
+// workflow YAML from the source run's original head_sha, not from the current
45
+// default branch, so reruns are reproducible even after workflow edits.
46
+func RerunRun(ctx context.Context, deps Deps, runID, actorUserID int64) (RerunResult, error) {
47
+	if deps.Pool == nil {
48
+		return RerunResult{}, errors.New("actions lifecycle: nil Pool")
49
+	}
50
+	if deps.RepoFS == nil {
51
+		return RerunResult{}, errors.New("actions lifecycle: nil RepoFS")
52
+	}
53
+	if actorUserID == 0 {
54
+		return RerunResult{}, ErrActorRequired
55
+	}
56
+	logger := deps.Logger
57
+	if logger == nil {
58
+		logger = slog.New(slog.NewTextHandler(io.Discard, nil))
59
+	}
60
+
61
+	q := actionsdb.New()
62
+	run, err := q.GetWorkflowRunByID(ctx, deps.Pool, runID)
63
+	if err != nil {
64
+		return RerunResult{}, err
65
+	}
66
+	if !workflowRunRerunnable(run.Status) {
67
+		return RerunResult{}, ErrRunNotRerunnable
68
+	}
69
+	payload, err := decodeRunEventPayload(run.EventPayload)
70
+	if err != nil {
71
+		return RerunResult{}, fmt.Errorf("%w: %w", ErrWorkflowSourceInvalid, err)
72
+	}
73
+	repo, err := reposdb.New().GetRepoByID(ctx, deps.Pool, run.RepoID)
74
+	if err != nil {
75
+		return RerunResult{}, err
76
+	}
77
+	owner, err := lifecycleRepoOwnerLogin(ctx, deps.Pool, repo)
78
+	if err != nil {
79
+		return RerunResult{}, err
80
+	}
81
+	gitDir, err := deps.RepoFS.RepoPath(owner, repo.Name)
82
+	if err != nil {
83
+		return RerunResult{}, err
84
+	}
85
+	body, err := repogit.ReadBlobBytes(ctx, gitDir, run.HeadSha, run.WorkflowFile, int64(workflow.MaxWorkflowFileBytes))
86
+	if err != nil {
87
+		return RerunResult{}, fmt.Errorf("%w: %w", ErrWorkflowSourceUnavailable, err)
88
+	}
89
+	if len(body) == 0 {
90
+		return RerunResult{}, fmt.Errorf("%w: %w", ErrWorkflowSourceInvalid, errWorkflowSourceEmpty)
91
+	}
92
+	wf, diags, err := workflow.Parse(body)
93
+	if err != nil {
94
+		return RerunResult{}, fmt.Errorf("%w: %w", ErrWorkflowSourceInvalid, err)
95
+	}
96
+	if workflowHasErrorDiagnostics(diags) {
97
+		return RerunResult{}, fmt.Errorf("%w: %w", ErrWorkflowSourceInvalid, errWorkflowSourceHasDiagError)
98
+	}
99
+	triggerID, err := rerunTriggerEventID(run.ID)
100
+	if err != nil {
101
+		return RerunResult{}, err
102
+	}
103
+	res, err := trigger.Enqueue(ctx, trigger.Deps{Pool: deps.Pool, Logger: logger}, trigger.EnqueueParams{
104
+		RepoID:         run.RepoID,
105
+		WorkflowFile:   run.WorkflowFile,
106
+		HeadSHA:        run.HeadSha,
107
+		HeadRef:        run.HeadRef,
108
+		EventKind:      trigger.EventKind(run.Event),
109
+		EventPayload:   payload,
110
+		ActorUserID:    actorUserID,
111
+		ParentRunID:    run.ID,
112
+		TriggerEventID: triggerID,
113
+		Workflow:       wf,
114
+	})
115
+	if err != nil {
116
+		return RerunResult{}, err
117
+	}
118
+	return RerunResult{
119
+		ParentRunID: run.ID,
120
+		RunID:       res.RunID,
121
+		RunIndex:    res.RunIndex,
122
+		CheckRunIDs: res.CheckRunIDs,
123
+	}, nil
124
+}
125
+
126
+func workflowRunRerunnable(status actionsdb.WorkflowRunStatus) bool {
127
+	return status == actionsdb.WorkflowRunStatusCompleted ||
128
+		status == actionsdb.WorkflowRunStatusCancelled
129
+}
130
+
131
+func decodeRunEventPayload(raw []byte) (map[string]any, error) {
132
+	if len(raw) == 0 {
133
+		return map[string]any{}, nil
134
+	}
135
+	var out map[string]any
136
+	if err := json.Unmarshal(raw, &out); err != nil {
137
+		return nil, err
138
+	}
139
+	if out == nil {
140
+		return nil, errWorkflowPayloadNotJSON
141
+	}
142
+	return out, nil
143
+}
144
+
145
+func workflowHasErrorDiagnostics(diags []workflow.Diagnostic) bool {
146
+	for _, d := range diags {
147
+		if d.Severity == workflow.Error {
148
+			return true
149
+		}
150
+	}
151
+	return false
152
+}
153
+
154
+func rerunTriggerEventID(parentRunID int64) (string, error) {
155
+	b := make([]byte, 8)
156
+	if _, err := rand.Read(b); err != nil {
157
+		return "", fmt.Errorf("actions lifecycle: rerun id entropy: %w", err)
158
+	}
159
+	return fmt.Sprintf("rerun:%d:%s", parentRunID, hex.EncodeToString(b)), nil
160
+}
161
+
162
+func lifecycleRepoOwnerLogin(ctx context.Context, db interface {
163
+	usersdb.DBTX
164
+	orgsdb.DBTX
165
+}, repo reposdb.Repo) (string, error) {
166
+	if repo.OwnerUserID.Valid {
167
+		u, err := usersdb.New().GetUserByID(ctx, db, repo.OwnerUserID.Int64)
168
+		if err != nil {
169
+			return "", err
170
+		}
171
+		return u.Username, nil
172
+	}
173
+	if repo.OwnerOrgID.Valid {
174
+		o, err := orgsdb.New().GetOrgByID(ctx, db, repo.OwnerOrgID.Int64)
175
+		if err != nil {
176
+			return "", err
177
+		}
178
+		return o.Slug, nil
179
+	}
180
+	return "", errors.New("repo has neither owner_user_id nor owner_org_id")
181
+}
internal/actions/lifecycle/rerun_test.goadded
@@ -0,0 +1,180 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package lifecycle
4
+
5
+import (
6
+	"context"
7
+	"errors"
8
+	"io"
9
+	"log/slog"
10
+	"strings"
11
+	"testing"
12
+	"time"
13
+
14
+	"github.com/jackc/pgx/v5/pgxpool"
15
+
16
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
17
+	"github.com/tenseleyFlow/shithub/internal/actions/trigger"
18
+	"github.com/tenseleyFlow/shithub/internal/actions/workflow"
19
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
20
+	repogit "github.com/tenseleyFlow/shithub/internal/repos/git"
21
+	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
22
+)
23
+
24
+const oldWorkflow = `name: CI
25
+on: push
26
+jobs:
27
+  old_job:
28
+    name: Old job
29
+    runs-on: ubuntu-latest
30
+    steps:
31
+      - run: echo old
32
+`
33
+
34
+const newWorkflow = `name: CI
35
+on: push
36
+jobs:
37
+  new_job:
38
+    name: New job
39
+    runs-on: ubuntu-latest
40
+    steps:
41
+      - run: echo new
42
+`
43
+
44
+func TestRerunRunUsesOriginalCommitWorkflowAndParentsNewRun(t *testing.T) {
45
+	ctx := context.Background()
46
+	pool := dbtestPool(t)
47
+	repoID, userID := setupLifecycleRepo(t, pool)
48
+	rfs := lifecycleRepoFS(t)
49
+	gitDir := lifecycleGitDir(t, rfs)
50
+	oldSHA := lifecycleCommitWorkflow(t, gitDir, oldWorkflow, time.Date(2026, 5, 11, 12, 0, 0, 0, time.UTC))
51
+	wf := parseLifecycleWorkflow(t, oldWorkflow)
52
+
53
+	logger := slog.New(slog.NewTextHandler(io.Discard, nil))
54
+	original, err := trigger.Enqueue(ctx, trigger.Deps{Pool: pool, Logger: logger}, trigger.EnqueueParams{
55
+		RepoID:         repoID,
56
+		WorkflowFile:   ".shithub/workflows/ci.yml",
57
+		HeadSHA:        oldSHA,
58
+		HeadRef:        "refs/heads/trunk",
59
+		EventKind:      trigger.EventPush,
60
+		EventPayload:   map[string]any{"ref": "refs/heads/trunk"},
61
+		ActorUserID:    userID,
62
+		TriggerEventID: "push:original",
63
+		Workflow:       wf,
64
+	})
65
+	if err != nil {
66
+		t.Fatalf("trigger.Enqueue original: %v", err)
67
+	}
68
+	if _, err := actionsdb.New().CompleteWorkflowRun(ctx, pool, actionsdb.CompleteWorkflowRunParams{
69
+		ID:         original.RunID,
70
+		Conclusion: actionsdb.CheckConclusionFailure,
71
+	}); err != nil {
72
+		t.Fatalf("CompleteWorkflowRun: %v", err)
73
+	}
74
+	_ = lifecycleCommitWorkflow(t, gitDir, newWorkflow, time.Date(2026, 5, 11, 12, 5, 0, 0, time.UTC))
75
+
76
+	result, err := RerunRun(ctx, Deps{Pool: pool, RepoFS: rfs, Logger: logger}, original.RunID, userID)
77
+	if err != nil {
78
+		t.Fatalf("RerunRun: %v", err)
79
+	}
80
+	if result.ParentRunID != original.RunID || result.RunID == 0 || result.RunID == original.RunID {
81
+		t.Fatalf("unexpected result: %+v", result)
82
+	}
83
+
84
+	q := actionsdb.New()
85
+	rerun, err := q.GetWorkflowRunByID(ctx, pool, result.RunID)
86
+	if err != nil {
87
+		t.Fatalf("GetWorkflowRunByID rerun: %v", err)
88
+	}
89
+	if !rerun.ParentRunID.Valid || rerun.ParentRunID.Int64 != original.RunID {
90
+		t.Fatalf("parent_run_id: %+v want %d", rerun.ParentRunID, original.RunID)
91
+	}
92
+	if rerun.HeadSha != oldSHA {
93
+		t.Fatalf("rerun head_sha = %q want original %q", rerun.HeadSha, oldSHA)
94
+	}
95
+	if !strings.HasPrefix(rerun.TriggerEventID, "rerun:") {
96
+		t.Fatalf("rerun trigger_event_id = %q", rerun.TriggerEventID)
97
+	}
98
+	if !rerun.ActorUserID.Valid || rerun.ActorUserID.Int64 != userID {
99
+		t.Fatalf("rerun actor_user_id: %+v want %d", rerun.ActorUserID, userID)
100
+	}
101
+	jobs, err := q.ListJobsForRun(ctx, pool, result.RunID)
102
+	if err != nil {
103
+		t.Fatalf("ListJobsForRun: %v", err)
104
+	}
105
+	if len(jobs) != 1 || jobs[0].JobKey != "old_job" || jobs[0].JobName != "Old job" {
106
+		t.Fatalf("rerun used wrong workflow jobs: %+v", jobs)
107
+	}
108
+}
109
+
110
+func TestRerunRunRejectsNonTerminalRun(t *testing.T) {
111
+	ctx := context.Background()
112
+	pool := dbtestPool(t)
113
+	repoID, userID := setupLifecycleRepo(t, pool)
114
+	rfs := lifecycleRepoFS(t)
115
+	run := insertLifecycleRun(t, pool, repoID, userID, 1)
116
+
117
+	_, err := RerunRun(ctx, Deps{Pool: pool, RepoFS: rfs}, run.ID, userID)
118
+	if !errors.Is(err, ErrRunNotRerunnable) {
119
+		t.Fatalf("RerunRun error = %v, want ErrRunNotRerunnable", err)
120
+	}
121
+}
122
+
123
+func dbtestPool(t *testing.T) *pgxpool.Pool {
124
+	t.Helper()
125
+	return dbtest.NewTestDB(t)
126
+}
127
+
128
+func lifecycleRepoFS(t *testing.T) *storage.RepoFS {
129
+	t.Helper()
130
+	rfs, err := storage.NewRepoFS(t.TempDir())
131
+	if err != nil {
132
+		t.Fatalf("NewRepoFS: %v", err)
133
+	}
134
+	return rfs
135
+}
136
+
137
+func lifecycleGitDir(t *testing.T, rfs *storage.RepoFS) string {
138
+	t.Helper()
139
+	gitDir, err := rfs.RepoPath("alice", "demo")
140
+	if err != nil {
141
+		t.Fatalf("RepoPath: %v", err)
142
+	}
143
+	if err := rfs.InitBare(context.Background(), gitDir); err != nil {
144
+		t.Fatalf("InitBare: %v", err)
145
+	}
146
+	return gitDir
147
+}
148
+
149
+func lifecycleCommitWorkflow(t *testing.T, gitDir, body string, when time.Time) string {
150
+	t.Helper()
151
+	commit, err := (repogit.InitialCommit{
152
+		GitDir:      gitDir,
153
+		AuthorName:  "Alice",
154
+		AuthorEmail: "alice@example.test",
155
+		Branch:      "trunk",
156
+		Message:     "Update workflow",
157
+		When:        when,
158
+		Files: []repogit.FileEntry{
159
+			{Path: ".shithub/workflows/ci.yml", Body: []byte(body)},
160
+		},
161
+	}).Build(context.Background())
162
+	if err != nil {
163
+		t.Fatalf("InitialCommit.Build: %v", err)
164
+	}
165
+	return commit
166
+}
167
+
168
+func parseLifecycleWorkflow(t *testing.T, body string) *workflow.Workflow {
169
+	t.Helper()
170
+	wf, diags, err := workflow.Parse([]byte(body))
171
+	if err != nil {
172
+		t.Fatalf("workflow.Parse: %v", err)
173
+	}
174
+	for _, d := range diags {
175
+		if d.Severity == workflow.Error {
176
+			t.Fatalf("workflow diagnostic: %v", d)
177
+		}
178
+	}
179
+	return wf
180
+}