tenseleyflow/shithub / 18171f0

Browse files

actions/lifecycle: share cancel rollup and check sync

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
18171f0b0b59a28207b18c97be153f6bd1e6f25e
Parents
a6a06e3
Tree
b8b7445

3 changed files

StatusFile+-
A internal/actions/checksync/checksync.go 102 0
M internal/actions/lifecycle/cancel.go 6 128
A internal/actions/runstate/cancel.go 72 0
internal/actions/checksync/checksync.goadded
@@ -0,0 +1,102 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package checksync mirrors Actions workflow job state into check_run rows.
4
+package checksync
5
+
6
+import (
7
+	"context"
8
+	"errors"
9
+	"fmt"
10
+	"log/slog"
11
+	"strings"
12
+	"time"
13
+
14
+	"github.com/jackc/pgx/v5"
15
+	"github.com/jackc/pgx/v5/pgtype"
16
+	"github.com/jackc/pgx/v5/pgxpool"
17
+
18
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
19
+	"github.com/tenseleyFlow/shithub/internal/checks"
20
+	checksdb "github.com/tenseleyFlow/shithub/internal/checks/sqlc"
21
+)
22
+
23
+// Deps wires check synchronization to postgres and logging.
24
+type Deps struct {
25
+	Pool   *pgxpool.Pool
26
+	Logger *slog.Logger
27
+}
28
+
29
+// ChangedJobs best-effort syncs every job state that has a visible check_run
30
+// status representation. Missing check rows are intentionally non-fatal.
31
+func ChangedJobs(ctx context.Context, deps Deps, jobs []actionsdb.WorkflowJob) {
32
+	for _, job := range jobs {
33
+		if job.Status != actionsdb.WorkflowJobStatusRunning &&
34
+			job.Status != actionsdb.WorkflowJobStatusCompleted &&
35
+			job.Status != actionsdb.WorkflowJobStatusCancelled {
36
+			continue
37
+		}
38
+		if err := Job(ctx, deps, job); err != nil && deps.Logger != nil {
39
+			deps.Logger.WarnContext(ctx, "actions checksync: sync check_run", "job_id", job.ID, "error", err)
40
+		}
41
+	}
42
+}
43
+
44
+// Job mirrors one Actions workflow_job row into its check_run row. Missing check
45
+// rows are non-fatal because check creation is already best-effort in the
46
+// trigger path and can be reconciled independently.
47
+func Job(ctx context.Context, deps Deps, job actionsdb.WorkflowJob) error {
48
+	if deps.Pool == nil {
49
+		return errors.New("actions checksync: nil Pool")
50
+	}
51
+	run, err := actionsdb.New().GetWorkflowRunByID(ctx, deps.Pool, job.RunID)
52
+	if err != nil {
53
+		return err
54
+	}
55
+	name := strings.TrimSpace(job.JobName)
56
+	if name == "" {
57
+		name = job.JobKey
58
+	}
59
+	checkRun, err := checksdb.New().GetCheckRunByExternalID(ctx, deps.Pool, checksdb.GetCheckRunByExternalIDParams{
60
+		RepoID:     run.RepoID,
61
+		HeadSha:    run.HeadSha,
62
+		Name:       name,
63
+		ExternalID: pgtype.Text{String: fmt.Sprintf("workflow_run:%d:job:%s", job.RunID, job.JobKey), Valid: true},
64
+	})
65
+	if err != nil {
66
+		if errors.Is(err, pgx.ErrNoRows) {
67
+			return nil
68
+		}
69
+		return err
70
+	}
71
+	params := checks.UpdateParams{
72
+		RunID:        checkRun.ID,
73
+		HasStatus:    true,
74
+		HasStartedAt: true,
75
+		StartedAt:    timeFromPg(job.StartedAt),
76
+	}
77
+	switch job.Status {
78
+	case actionsdb.WorkflowJobStatusRunning:
79
+		params.Status = "in_progress"
80
+	case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusCancelled:
81
+		params.Status = "completed"
82
+		params.HasConclusion = true
83
+		if job.Conclusion.Valid {
84
+			params.Conclusion = string(job.Conclusion.CheckConclusion)
85
+		} else if job.Status == actionsdb.WorkflowJobStatusCancelled {
86
+			params.Conclusion = string(actionsdb.CheckConclusionCancelled)
87
+		}
88
+		params.HasCompletedAt = true
89
+		params.CompletedAt = timeFromPg(job.CompletedAt)
90
+	default:
91
+		return nil
92
+	}
93
+	_, err = checks.Update(ctx, checks.Deps{Pool: deps.Pool, Logger: deps.Logger}, params)
94
+	return err
95
+}
96
+
97
+func timeFromPg(ts pgtype.Timestamptz) time.Time {
98
+	if !ts.Valid {
99
+		return time.Time{}
100
+	}
101
+	return ts.Time
102
+}
internal/actions/lifecycle/cancel.gomodified
@@ -7,16 +7,13 @@ package lifecycle
77
 import (
88
 	"context"
99
 	"errors"
10
-	"fmt"
1110
 	"strings"
12
-	"time"
1311
 
1412
 	"github.com/jackc/pgx/v5"
15
-	"github.com/jackc/pgx/v5/pgtype"
1613
 
14
+	"github.com/tenseleyFlow/shithub/internal/actions/checksync"
15
+	"github.com/tenseleyFlow/shithub/internal/actions/runstate"
1716
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
18
-	"github.com/tenseleyFlow/shithub/internal/checks"
19
-	checksdb "github.com/tenseleyFlow/shithub/internal/checks/sqlc"
2017
 	"github.com/tenseleyFlow/shithub/internal/infra/metrics"
2118
 )
2219
 
@@ -72,7 +69,7 @@ func CancelRun(ctx context.Context, deps Deps, runID int64, reason string) (Canc
7269
 		runConclusion actionsdb.CheckConclusion
7370
 	)
7471
 	if len(changed) > 0 {
75
-		runCompleted, runConclusion, err = rollupRunAfterCancel(ctx, q, tx, runID)
72
+		runCompleted, runConclusion, err = runstate.RollupAfterCancel(ctx, q, tx, runID)
7673
 		if err != nil {
7774
 			return CancelResult{}, err
7875
 		}
@@ -137,7 +134,7 @@ func CancelJob(ctx context.Context, deps Deps, jobID int64, reason string) (Canc
137134
 		runConclusion actionsdb.CheckConclusion
138135
 	)
139136
 	if len(changed) > 0 {
140
-		runCompleted, runConclusion, err = rollupRunAfterCancel(ctx, q, tx, runID)
137
+		runCompleted, runConclusion, err = runstate.RollupAfterCancel(ctx, q, tx, runID)
141138
 		if err != nil {
142139
 			return CancelResult{}, err
143140
 		}
@@ -157,63 +154,6 @@ func CancelJob(ctx context.Context, deps Deps, jobID int64, reason string) (Canc
157154
 	}, nil
158155
 }
159156
 
160
-func rollupRunAfterCancel(
161
-	ctx context.Context,
162
-	q *actionsdb.Queries,
163
-	tx pgx.Tx,
164
-	runID int64,
165
-) (bool, actionsdb.CheckConclusion, error) {
166
-	jobs, err := q.ListJobsForRun(ctx, tx, runID)
167
-	if err != nil {
168
-		return false, "", err
169
-	}
170
-	runConclusion, complete := deriveWorkflowRunConclusion(jobs)
171
-	if complete {
172
-		if _, err := q.CompleteWorkflowRun(ctx, tx, actionsdb.CompleteWorkflowRunParams{
173
-			ID:         runID,
174
-			Conclusion: runConclusion,
175
-		}); err != nil {
176
-			return false, "", err
177
-		}
178
-		return true, runConclusion, nil
179
-	}
180
-	if err := q.MarkWorkflowRunRunning(ctx, tx, runID); err != nil {
181
-		return false, "", err
182
-	}
183
-	return false, "", nil
184
-}
185
-
186
-func deriveWorkflowRunConclusion(jobs []actionsdb.ListJobsForRunRow) (actionsdb.CheckConclusion, bool) {
187
-	if len(jobs) == 0 {
188
-		return actionsdb.CheckConclusionFailure, true
189
-	}
190
-	worst := actionsdb.CheckConclusionSuccess
191
-	for _, job := range jobs {
192
-		switch job.Status {
193
-		case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusCancelled, actionsdb.WorkflowJobStatusSkipped:
194
-		default:
195
-			return "", false
196
-		}
197
-		if job.Status == actionsdb.WorkflowJobStatusCancelled {
198
-			worst = actionsdb.CheckConclusionCancelled
199
-			continue
200
-		}
201
-		if !job.Conclusion.Valid {
202
-			return actionsdb.CheckConclusionFailure, true
203
-		}
204
-		c := job.Conclusion.CheckConclusion
205
-		if c == actionsdb.CheckConclusionFailure ||
206
-			c == actionsdb.CheckConclusionTimedOut ||
207
-			c == actionsdb.CheckConclusionActionRequired {
208
-			return c, true
209
-		}
210
-		if c == actionsdb.CheckConclusionCancelled {
211
-			worst = actionsdb.CheckConclusionCancelled
212
-		}
213
-	}
214
-	return worst, true
215
-}
216
-
217157
 func recordCancelledJobs(jobs []actionsdb.WorkflowJob, reason string) {
218158
 	if len(jobs) == 0 {
219159
 		return
@@ -235,74 +175,12 @@ func cancelReason(reason string) string {
235175
 }
236176
 
237177
 func syncChangedJobChecks(ctx context.Context, deps Deps, jobs []actionsdb.WorkflowJob) {
238
-	for _, job := range jobs {
239
-		if job.Status != actionsdb.WorkflowJobStatusRunning &&
240
-			job.Status != actionsdb.WorkflowJobStatusCompleted &&
241
-			job.Status != actionsdb.WorkflowJobStatusCancelled {
242
-			continue
243
-		}
244
-		if err := SyncCheckRunForJob(ctx, deps, job); err != nil && deps.Logger != nil {
245
-			deps.Logger.WarnContext(ctx, "actions lifecycle: sync check_run", "job_id", job.ID, "error", err)
246
-		}
247
-	}
178
+	checksync.ChangedJobs(ctx, checksync.Deps{Pool: deps.Pool, Logger: deps.Logger}, jobs)
248179
 }
249180
 
250181
 // SyncCheckRunForJob mirrors an Actions workflow_job row into its check_run
251182
 // row. Missing check rows are non-fatal because check creation is already
252183
 // best-effort in the trigger path and can be reconciled independently.
253184
 func SyncCheckRunForJob(ctx context.Context, deps Deps, job actionsdb.WorkflowJob) error {
254
-	if deps.Pool == nil {
255
-		return errors.New("actions lifecycle: nil Pool")
256
-	}
257
-	run, err := actionsdb.New().GetWorkflowRunByID(ctx, deps.Pool, job.RunID)
258
-	if err != nil {
259
-		return err
260
-	}
261
-	name := strings.TrimSpace(job.JobName)
262
-	if name == "" {
263
-		name = job.JobKey
264
-	}
265
-	checkRun, err := checksdb.New().GetCheckRunByExternalID(ctx, deps.Pool, checksdb.GetCheckRunByExternalIDParams{
266
-		RepoID:     run.RepoID,
267
-		HeadSha:    run.HeadSha,
268
-		Name:       name,
269
-		ExternalID: pgtype.Text{String: fmt.Sprintf("workflow_run:%d:job:%s", job.RunID, job.JobKey), Valid: true},
270
-	})
271
-	if err != nil {
272
-		if errors.Is(err, pgx.ErrNoRows) {
273
-			return nil
274
-		}
275
-		return err
276
-	}
277
-	params := checks.UpdateParams{
278
-		RunID:        checkRun.ID,
279
-		HasStatus:    true,
280
-		HasStartedAt: true,
281
-		StartedAt:    timeFromPg(job.StartedAt),
282
-	}
283
-	switch job.Status {
284
-	case actionsdb.WorkflowJobStatusRunning:
285
-		params.Status = "in_progress"
286
-	case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusCancelled:
287
-		params.Status = "completed"
288
-		params.HasConclusion = true
289
-		if job.Conclusion.Valid {
290
-			params.Conclusion = string(job.Conclusion.CheckConclusion)
291
-		} else if job.Status == actionsdb.WorkflowJobStatusCancelled {
292
-			params.Conclusion = string(actionsdb.CheckConclusionCancelled)
293
-		}
294
-		params.HasCompletedAt = true
295
-		params.CompletedAt = timeFromPg(job.CompletedAt)
296
-	default:
297
-		return nil
298
-	}
299
-	_, err = checks.Update(ctx, checks.Deps{Pool: deps.Pool, Logger: deps.Logger}, params)
300
-	return err
301
-}
302
-
303
-func timeFromPg(ts pgtype.Timestamptz) time.Time {
304
-	if !ts.Valid {
305
-		return time.Time{}
306
-	}
307
-	return ts.Time
185
+	return checksync.Job(ctx, checksync.Deps{Pool: deps.Pool, Logger: deps.Logger}, job)
308186
 }
internal/actions/runstate/cancel.goadded
@@ -0,0 +1,72 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package runstate owns shared workflow-run status derivation helpers.
4
+package runstate
5
+
6
+import (
7
+	"context"
8
+
9
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
10
+)
11
+
12
+// RollupAfterCancel updates the parent workflow_run after one or more jobs in
13
+// the run received a cancellation request. If all jobs are terminal, the run is
14
+// completed with the derived conclusion; otherwise it is marked running.
15
+func RollupAfterCancel(
16
+	ctx context.Context,
17
+	q *actionsdb.Queries,
18
+	db actionsdb.DBTX,
19
+	runID int64,
20
+) (bool, actionsdb.CheckConclusion, error) {
21
+	jobs, err := q.ListJobsForRun(ctx, db, runID)
22
+	if err != nil {
23
+		return false, "", err
24
+	}
25
+	runConclusion, complete := DeriveWorkflowRunConclusion(jobs)
26
+	if complete {
27
+		if _, err := q.CompleteWorkflowRun(ctx, db, actionsdb.CompleteWorkflowRunParams{
28
+			ID:         runID,
29
+			Conclusion: runConclusion,
30
+		}); err != nil {
31
+			return false, "", err
32
+		}
33
+		return true, runConclusion, nil
34
+	}
35
+	if err := q.MarkWorkflowRunRunning(ctx, db, runID); err != nil {
36
+		return false, "", err
37
+	}
38
+	return false, "", nil
39
+}
40
+
41
+// DeriveWorkflowRunConclusion mirrors GitHub's "worst job wins" rollup for
42
+// the conclusion set shithub currently supports.
43
+func DeriveWorkflowRunConclusion(jobs []actionsdb.ListJobsForRunRow) (actionsdb.CheckConclusion, bool) {
44
+	if len(jobs) == 0 {
45
+		return actionsdb.CheckConclusionFailure, true
46
+	}
47
+	worst := actionsdb.CheckConclusionSuccess
48
+	for _, job := range jobs {
49
+		switch job.Status {
50
+		case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusCancelled, actionsdb.WorkflowJobStatusSkipped:
51
+		default:
52
+			return "", false
53
+		}
54
+		if job.Status == actionsdb.WorkflowJobStatusCancelled {
55
+			worst = actionsdb.CheckConclusionCancelled
56
+			continue
57
+		}
58
+		if !job.Conclusion.Valid {
59
+			return actionsdb.CheckConclusionFailure, true
60
+		}
61
+		c := job.Conclusion.CheckConclusion
62
+		if c == actionsdb.CheckConclusionFailure ||
63
+			c == actionsdb.CheckConclusionTimedOut ||
64
+			c == actionsdb.CheckConclusionActionRequired {
65
+			return c, true
66
+		}
67
+		if c == actionsdb.CheckConclusionCancelled {
68
+			worst = actionsdb.CheckConclusionCancelled
69
+		}
70
+	}
71
+	return worst, true
72
+}