tenseleyflow/shithub / 2f8b0e9

Browse files

actions/finalize: compact step logs to object storage

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
2f8b0e937cacbaf5c6353ed1253419389b97f9dd
Parents
70a6fc5
Tree
7c19093

6 changed files

StatusFile+-
M cmd/shithubd/worker.go 30 0
A internal/actions/finalize/handler.go 176 0
A internal/actions/finalize/handler_test.go 174 0
M internal/actions/queries/workflow_steps.sql 28 0
M internal/actions/sqlc/querier.go 2 0
M internal/actions/sqlc/workflow_steps.sql.go 108 0
cmd/shithubd/worker.gomodified
@@ -17,6 +17,7 @@ import (
1717
 
1818
 	"github.com/spf13/cobra"
1919
 
20
+	"github.com/tenseleyFlow/shithub/internal/actions/finalize"
2021
 	"github.com/tenseleyFlow/shithub/internal/actions/trigger"
2122
 	"github.com/tenseleyFlow/shithub/internal/auth/audit"
2223
 	"github.com/tenseleyFlow/shithub/internal/auth/email"
@@ -79,6 +80,10 @@ var workerCmd = &cobra.Command{
7980
 		defer pool.Close()
8081
 
8182
 		logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
83
+		objectStore, err := buildWorkerObjectStore(cfg.Storage.S3, logger)
84
+		if err != nil {
85
+			return fmt.Errorf("object storage: %w", err)
86
+		}
8287
 
8388
 		p := worker.NewPool(pool, worker.PoolConfig{
8489
 			Workers:    count,
@@ -157,6 +162,13 @@ var workerCmd = &cobra.Command{
157162
 		p.Register(trigger.KindWorkflowTrigger, trigger.Handler(trigger.JobDeps{
158163
 			Deps: trigger.Deps{Pool: pool, Logger: logger}, RepoFS: rfs,
159164
 		}))
165
+		if objectStore != nil {
166
+			p.Register(finalize.KindWorkflowFinalizeStep, finalize.Handler(finalize.Deps{
167
+				Pool: pool, ObjectStore: objectStore, Logger: logger,
168
+			}))
169
+		} else {
170
+			logger.Info("actions: object storage not configured; workflow step log finalization disabled")
171
+		}
160172
 
161173
 		return p.Run(ctx)
162174
 	},
@@ -167,6 +179,24 @@ func init() {
167179
 	rootCmd.AddCommand(workerCmd)
168180
 }
169181
 
182
+func buildWorkerObjectStore(s config.S3StorageConfig, logger *slog.Logger) (storage.ObjectStore, error) {
183
+	if s.Bucket == "" {
184
+		return nil, nil
185
+	}
186
+	if logger != nil {
187
+		logger.Info("storage: configuring object store for worker", "bucket", s.Bucket)
188
+	}
189
+	return storage.NewS3Store(storage.S3Config{
190
+		Endpoint:        s.Endpoint,
191
+		Region:          s.Region,
192
+		AccessKeyID:     s.AccessKeyID,
193
+		SecretAccessKey: s.SecretAccessKey,
194
+		Bucket:          s.Bucket,
195
+		UseSSL:          s.UseSSL,
196
+		ForcePathStyle:  s.ForcePathStyle,
197
+	})
198
+}
199
+
170200
 // pickNotifEmailSender mirrors pickAdminEmailSender / pickEmailSender
171201
 // in the web binary. Kept local to the worker so failure to construct
172202
 // the sender doesn't kill the process — fan-out without email is a
internal/actions/finalize/handler.goadded
@@ -0,0 +1,176 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package finalize owns server-side Actions finalization work that should not
4
+// run on the hot runner API path.
5
+package finalize
6
+
7
+import (
8
+	"bytes"
9
+	"context"
10
+	"encoding/json"
11
+	"errors"
12
+	"fmt"
13
+	"log/slog"
14
+
15
+	"github.com/jackc/pgx/v5"
16
+	"github.com/jackc/pgx/v5/pgtype"
17
+	"github.com/jackc/pgx/v5/pgxpool"
18
+
19
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
20
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
21
+	"github.com/tenseleyFlow/shithub/internal/worker"
22
+)
23
+
24
+// KindWorkflowFinalizeStep names the worker job that compacts per-step log
25
+// chunks into object storage and prunes the SQL chunks.
26
+const KindWorkflowFinalizeStep worker.Kind = "workflow:finalize_step"
27
+
28
+const (
29
+	defaultMaxLogBytes = 100 * 1024 * 1024
30
+	chunkPageSize      = 1000
31
+	logContentType     = "text/plain; charset=utf-8"
32
+)
33
+
34
+// Payload is the workflow:finalize_step worker payload.
35
+type Payload struct {
36
+	StepID int64 `json:"step_id"`
37
+}
38
+
39
+// Deps are the runtime dependencies for Handler.
40
+type Deps struct {
41
+	Pool        *pgxpool.Pool
42
+	ObjectStore storage.ObjectStore
43
+	Logger      *slog.Logger
44
+	MaxLogBytes int64
45
+}
46
+
47
+// Handler returns the worker handler for workflow:finalize_step.
48
+func Handler(deps Deps) worker.Handler {
49
+	maxLogBytes := deps.MaxLogBytes
50
+	if maxLogBytes <= 0 {
51
+		maxLogBytes = defaultMaxLogBytes
52
+	}
53
+	return func(ctx context.Context, raw json.RawMessage) error {
54
+		var p Payload
55
+		if err := json.Unmarshal(raw, &p); err != nil {
56
+			return worker.PoisonError(fmt.Errorf("finalize step: bad payload: %w", err))
57
+		}
58
+		if p.StepID <= 0 {
59
+			return worker.PoisonError(errors.New("finalize step: missing step_id"))
60
+		}
61
+		if deps.Pool == nil {
62
+			return worker.PoisonError(errors.New("finalize step: database pool is not configured"))
63
+		}
64
+		if deps.ObjectStore == nil {
65
+			return worker.PoisonError(errors.New("finalize step: object storage is not configured"))
66
+		}
67
+		if err := finalizeStep(ctx, deps, p.StepID, maxLogBytes); err != nil {
68
+			return err
69
+		}
70
+		return nil
71
+	}
72
+}
73
+
74
+func finalizeStep(ctx context.Context, deps Deps, stepID, maxLogBytes int64) error {
75
+	q := actionsdb.New()
76
+	step, err := q.GetWorkflowStepByID(ctx, deps.Pool, stepID)
77
+	if err != nil {
78
+		if errors.Is(err, pgx.ErrNoRows) {
79
+			return worker.PoisonError(fmt.Errorf("finalize step: step %d not found", stepID))
80
+		}
81
+		return fmt.Errorf("finalize step: load step: %w", err)
82
+	}
83
+	job, err := q.GetWorkflowJobByID(ctx, deps.Pool, step.JobID)
84
+	if err != nil {
85
+		if errors.Is(err, pgx.ErrNoRows) {
86
+			return worker.PoisonError(fmt.Errorf("finalize step: job %d not found", step.JobID))
87
+		}
88
+		return fmt.Errorf("finalize step: load job: %w", err)
89
+	}
90
+
91
+	log, err := collectChunks(ctx, q, deps.Pool, stepID, maxLogBytes)
92
+	if err != nil {
93
+		return err
94
+	}
95
+	if len(log) == 0 && step.LogObjectKey.Valid {
96
+		return nil
97
+	}
98
+
99
+	objectKey := pgtype.Text{}
100
+	if len(log) > 0 {
101
+		key := StepLogObjectKey(job.RunID, job.ID, stepID)
102
+		if _, err := deps.ObjectStore.Put(ctx, key, bytes.NewReader(log), storage.PutOpts{
103
+			ContentType:   logContentType,
104
+			ContentLength: int64(len(log)),
105
+		}); err != nil {
106
+			return fmt.Errorf("finalize step: upload log object: %w", err)
107
+		}
108
+		objectKey = pgtype.Text{String: key, Valid: true}
109
+	}
110
+
111
+	tx, err := deps.Pool.Begin(ctx)
112
+	if err != nil {
113
+		return fmt.Errorf("finalize step: begin tx: %w", err)
114
+	}
115
+	committed := false
116
+	defer func() {
117
+		if !committed {
118
+			_ = tx.Rollback(ctx)
119
+		}
120
+	}()
121
+	if _, err := q.UpdateWorkflowStepLogObject(ctx, tx, actionsdb.UpdateWorkflowStepLogObjectParams{
122
+		ID:           stepID,
123
+		LogObjectKey: objectKey,
124
+		LogByteCount: int64(len(log)),
125
+	}); err != nil {
126
+		return fmt.Errorf("finalize step: update step log object: %w", err)
127
+	}
128
+	if err := q.DeleteStepLogChunks(ctx, tx, stepID); err != nil {
129
+		return fmt.Errorf("finalize step: delete chunks: %w", err)
130
+	}
131
+	if err := tx.Commit(ctx); err != nil {
132
+		return fmt.Errorf("finalize step: commit: %w", err)
133
+	}
134
+	committed = true
135
+	if deps.Logger != nil {
136
+		deps.Logger.InfoContext(ctx, "finalized workflow step log",
137
+			"step_id", stepID, "job_id", job.ID, "run_id", job.RunID, "bytes", len(log))
138
+	}
139
+	return nil
140
+}
141
+
142
+func collectChunks(ctx context.Context, q *actionsdb.Queries, db actionsdb.DBTX, stepID, maxLogBytes int64) ([]byte, error) {
143
+	var (
144
+		out     bytes.Buffer
145
+		lastSeq int32 = -1
146
+	)
147
+	for {
148
+		chunks, err := q.ListStepLogChunks(ctx, db, actionsdb.ListStepLogChunksParams{
149
+			StepID: stepID,
150
+			Seq:    lastSeq,
151
+			Limit:  chunkPageSize,
152
+		})
153
+		if err != nil {
154
+			return nil, fmt.Errorf("finalize step: list chunks: %w", err)
155
+		}
156
+		if len(chunks) == 0 {
157
+			return out.Bytes(), nil
158
+		}
159
+		for _, chunk := range chunks {
160
+			if int64(out.Len()+len(chunk.Chunk)) > maxLogBytes {
161
+				return nil, worker.PoisonError(fmt.Errorf("finalize step: log exceeds %d bytes", maxLogBytes))
162
+			}
163
+			if _, err := out.Write(chunk.Chunk); err != nil {
164
+				return nil, fmt.Errorf("finalize step: append chunk: %w", err)
165
+			}
166
+			lastSeq = chunk.Seq
167
+		}
168
+	}
169
+}
170
+
171
+// StepLogObjectKey returns the deterministic object key for a finalized step
172
+// log. It is intentionally independent of step names so renames do not move
173
+// already-uploaded logs.
174
+func StepLogObjectKey(runID, jobID, stepID int64) string {
175
+	return fmt.Sprintf("actions/runs/%d/jobs/%d/steps/%d.log", runID, jobID, stepID)
176
+}
internal/actions/finalize/handler_test.goadded
@@ -0,0 +1,174 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package finalize
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"errors"
9
+	"io"
10
+	"strings"
11
+	"testing"
12
+
13
+	"github.com/jackc/pgx/v5/pgtype"
14
+	"github.com/jackc/pgx/v5/pgxpool"
15
+
16
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
17
+	"github.com/tenseleyFlow/shithub/internal/infra/storage"
18
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
19
+	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
20
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
21
+	"github.com/tenseleyFlow/shithub/internal/worker"
22
+)
23
+
24
+const finalizeFixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
25
+	"AAAAAAAAAAAAAAAA$" +
26
+	"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
27
+
28
+func TestHandlerUploadsConcatenatedLogAndDeletesChunks(t *testing.T) {
29
+	ctx := context.Background()
30
+	pool := dbtest.NewTestDB(t)
31
+	store := storage.NewMemoryStore()
32
+	_, job, step := insertFinalizeFixture(t, pool)
33
+
34
+	q := actionsdb.New()
35
+	if _, err := q.AppendStepLogChunk(ctx, pool, actionsdb.AppendStepLogChunkParams{
36
+		StepID: step.ID,
37
+		Seq:    0,
38
+		Chunk:  []byte("hello "),
39
+	}); err != nil {
40
+		t.Fatalf("AppendStepLogChunk 0: %v", err)
41
+	}
42
+	if _, err := q.AppendStepLogChunk(ctx, pool, actionsdb.AppendStepLogChunkParams{
43
+		StepID: step.ID,
44
+		Seq:    1,
45
+		Chunk:  []byte("world\n"),
46
+	}); err != nil {
47
+		t.Fatalf("AppendStepLogChunk 1: %v", err)
48
+	}
49
+
50
+	payload, err := json.Marshal(Payload{StepID: step.ID})
51
+	if err != nil {
52
+		t.Fatalf("marshal payload: %v", err)
53
+	}
54
+	if err := Handler(Deps{Pool: pool, ObjectStore: store})(ctx, payload); err != nil {
55
+		t.Fatalf("Handler: %v", err)
56
+	}
57
+
58
+	updated, err := q.GetWorkflowStepByID(ctx, pool, step.ID)
59
+	if err != nil {
60
+		t.Fatalf("GetWorkflowStepByID: %v", err)
61
+	}
62
+	wantKey := StepLogObjectKey(job.RunID, job.ID, step.ID)
63
+	if !updated.LogObjectKey.Valid || updated.LogObjectKey.String != wantKey || updated.LogByteCount != int64(len("hello world\n")) {
64
+		t.Fatalf("updated step log metadata: %+v", updated)
65
+	}
66
+	rc, meta, err := store.Get(ctx, wantKey)
67
+	if err != nil {
68
+		t.Fatalf("store.Get: %v", err)
69
+	}
70
+	defer rc.Close()
71
+	body, err := io.ReadAll(rc)
72
+	if err != nil {
73
+		t.Fatalf("read object: %v", err)
74
+	}
75
+	if string(body) != "hello world\n" || meta.ContentType != logContentType {
76
+		t.Fatalf("object: body=%q meta=%+v", body, meta)
77
+	}
78
+	chunks, err := q.ListStepLogChunks(ctx, pool, actionsdb.ListStepLogChunksParams{
79
+		StepID: step.ID,
80
+		Seq:    -1,
81
+		Limit:  10,
82
+	})
83
+	if err != nil {
84
+		t.Fatalf("ListStepLogChunks: %v", err)
85
+	}
86
+	if len(chunks) != 0 {
87
+		t.Fatalf("chunks were not deleted: %+v", chunks)
88
+	}
89
+}
90
+
91
+func TestHandlerPoisonsOversizedLogs(t *testing.T) {
92
+	ctx := context.Background()
93
+	pool := dbtest.NewTestDB(t)
94
+	_, _, step := insertFinalizeFixture(t, pool)
95
+	q := actionsdb.New()
96
+	if _, err := q.AppendStepLogChunk(ctx, pool, actionsdb.AppendStepLogChunkParams{
97
+		StepID: step.ID,
98
+		Seq:    0,
99
+		Chunk:  []byte("too large"),
100
+	}); err != nil {
101
+		t.Fatalf("AppendStepLogChunk: %v", err)
102
+	}
103
+	payload, err := json.Marshal(Payload{StepID: step.ID})
104
+	if err != nil {
105
+		t.Fatalf("marshal payload: %v", err)
106
+	}
107
+	err = Handler(Deps{Pool: pool, ObjectStore: storage.NewMemoryStore(), MaxLogBytes: 4})(ctx, payload)
108
+	if !errors.Is(err, worker.ErrPoison) || !strings.Contains(err.Error(), "exceeds 4 bytes") {
109
+		t.Fatalf("error: got %v, want poison oversized error", err)
110
+	}
111
+}
112
+
113
+func insertFinalizeFixture(t *testing.T, pool *pgxpool.Pool) (actionsdb.WorkflowRun, actionsdb.WorkflowJob, actionsdb.WorkflowStep) {
114
+	t.Helper()
115
+	ctx := context.Background()
116
+	user, err := usersdb.New().CreateUser(ctx, pool, usersdb.CreateUserParams{
117
+		Username:     "alice",
118
+		DisplayName:  "Alice",
119
+		PasswordHash: finalizeFixtureHash,
120
+	})
121
+	if err != nil {
122
+		t.Fatalf("CreateUser: %v", err)
123
+	}
124
+	repo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
125
+		OwnerUserID:   pgtype.Int8{Int64: user.ID, Valid: true},
126
+		Name:          "demo",
127
+		DefaultBranch: "trunk",
128
+		Visibility:    reposdb.RepoVisibilityPublic,
129
+	})
130
+	if err != nil {
131
+		t.Fatalf("CreateRepo: %v", err)
132
+	}
133
+	q := actionsdb.New()
134
+	run, err := q.InsertWorkflowRun(ctx, pool, actionsdb.InsertWorkflowRunParams{
135
+		RepoID:       repo.ID,
136
+		RunIndex:     1,
137
+		WorkflowFile: ".shithub/workflows/ci.yml",
138
+		WorkflowName: "ci",
139
+		HeadSha:      strings.Repeat("a", 40),
140
+		HeadRef:      "refs/heads/trunk",
141
+		Event:        actionsdb.WorkflowRunEventPush,
142
+		EventPayload: []byte(`{}`),
143
+		ActorUserID:  pgtype.Int8{Int64: user.ID, Valid: true},
144
+	})
145
+	if err != nil {
146
+		t.Fatalf("InsertWorkflowRun: %v", err)
147
+	}
148
+	job, err := q.InsertWorkflowJob(ctx, pool, actionsdb.InsertWorkflowJobParams{
149
+		RunID:          run.ID,
150
+		JobIndex:       0,
151
+		JobKey:         "build",
152
+		JobName:        "build",
153
+		RunsOn:         "ubuntu-latest",
154
+		TimeoutMinutes: 360,
155
+		Permissions:    []byte(`{}`),
156
+		JobEnv:         []byte(`{}`),
157
+	})
158
+	if err != nil {
159
+		t.Fatalf("InsertWorkflowJob: %v", err)
160
+	}
161
+	step, err := q.InsertWorkflowStep(ctx, pool, actionsdb.InsertWorkflowStepParams{
162
+		JobID:            job.ID,
163
+		StepIndex:        0,
164
+		StepName:         "test",
165
+		RunCommand:       "go test ./...",
166
+		StepEnv:          []byte(`{}`),
167
+		WorkingDirectory: "",
168
+		StepWith:         []byte(`{}`),
169
+	})
170
+	if err != nil {
171
+		t.Fatalf("InsertWorkflowStep: %v", err)
172
+	}
173
+	return run, job, step
174
+}
internal/actions/queries/workflow_steps.sqlmodified
@@ -23,6 +23,34 @@ SELECT id, job_id, step_index, step_id, step_name, if_expr,
2323
 FROM workflow_steps
2424
 WHERE id = $1;
2525
 
26
+-- name: UpdateWorkflowStepStatus :one
27
+UPDATE workflow_steps
28
+SET status = $2,
29
+    conclusion = sqlc.narg(conclusion)::check_conclusion,
30
+    started_at = sqlc.narg(started_at)::timestamptz,
31
+    completed_at = sqlc.narg(completed_at)::timestamptz,
32
+    version = version + 1,
33
+    updated_at = now()
34
+WHERE id = $1
35
+RETURNING id, job_id, step_index, step_id, step_name, if_expr,
36
+          run_command, uses_alias, working_directory, step_env,
37
+          continue_on_error, status, conclusion, log_object_key,
38
+          log_byte_count, started_at, completed_at, version,
39
+          created_at, updated_at, step_with;
40
+
41
+-- name: UpdateWorkflowStepLogObject :one
42
+UPDATE workflow_steps
43
+SET log_object_key = sqlc.narg(log_object_key)::text,
44
+    log_byte_count = sqlc.arg(log_byte_count)::bigint,
45
+    version = version + 1,
46
+    updated_at = now()
47
+WHERE id = sqlc.arg(id)::bigint
48
+RETURNING id, job_id, step_index, step_id, step_name, if_expr,
49
+          run_command, uses_alias, working_directory, step_env,
50
+          continue_on_error, status, conclusion, log_object_key,
51
+          log_byte_count, started_at, completed_at, version,
52
+          created_at, updated_at, step_with;
53
+
2654
 -- name: ListRunnerStepsForJob :many
2755
 SELECT id, job_id, step_index, step_id, step_name, if_expr,
2856
        run_command, uses_alias, working_directory, step_env,
internal/actions/sqlc/querier.gomodified
@@ -86,6 +86,8 @@ type Querier interface {
8686
 	RevokeAllTokensForRunner(ctx context.Context, db DBTX, runnerID int64) error
8787
 	TouchRunnerHeartbeat(ctx context.Context, db DBTX, arg TouchRunnerHeartbeatParams) error
8888
 	UpdateWorkflowJobStatus(ctx context.Context, db DBTX, arg UpdateWorkflowJobStatusParams) (WorkflowJob, error)
89
+	UpdateWorkflowStepLogObject(ctx context.Context, db DBTX, arg UpdateWorkflowStepLogObjectParams) (WorkflowStep, error)
90
+	UpdateWorkflowStepStatus(ctx context.Context, db DBTX, arg UpdateWorkflowStepStatusParams) (WorkflowStep, error)
8991
 	UpsertOrgSecret(ctx context.Context, db DBTX, arg UpsertOrgSecretParams) (WorkflowSecret, error)
9092
 	UpsertOrgVariable(ctx context.Context, db DBTX, arg UpsertOrgVariableParams) (ActionsVariable, error)
9193
 	// SPDX-License-Identifier: AGPL-3.0-or-later
internal/actions/sqlc/workflow_steps.sql.gomodified
@@ -291,3 +291,111 @@ func (q *Queries) ListStepsForJob(ctx context.Context, db DBTX, jobID int64) ([]
291291
 	}
292292
 	return items, nil
293293
 }
294
+
295
+const updateWorkflowStepLogObject = `-- name: UpdateWorkflowStepLogObject :one
296
+UPDATE workflow_steps
297
+SET log_object_key = $1::text,
298
+    log_byte_count = $2::bigint,
299
+    version = version + 1,
300
+    updated_at = now()
301
+WHERE id = $3::bigint
302
+RETURNING id, job_id, step_index, step_id, step_name, if_expr,
303
+          run_command, uses_alias, working_directory, step_env,
304
+          continue_on_error, status, conclusion, log_object_key,
305
+          log_byte_count, started_at, completed_at, version,
306
+          created_at, updated_at, step_with
307
+`
308
+
309
+type UpdateWorkflowStepLogObjectParams struct {
310
+	LogObjectKey pgtype.Text
311
+	LogByteCount int64
312
+	ID           int64
313
+}
314
+
315
+func (q *Queries) UpdateWorkflowStepLogObject(ctx context.Context, db DBTX, arg UpdateWorkflowStepLogObjectParams) (WorkflowStep, error) {
316
+	row := db.QueryRow(ctx, updateWorkflowStepLogObject, arg.LogObjectKey, arg.LogByteCount, arg.ID)
317
+	var i WorkflowStep
318
+	err := row.Scan(
319
+		&i.ID,
320
+		&i.JobID,
321
+		&i.StepIndex,
322
+		&i.StepID,
323
+		&i.StepName,
324
+		&i.IfExpr,
325
+		&i.RunCommand,
326
+		&i.UsesAlias,
327
+		&i.WorkingDirectory,
328
+		&i.StepEnv,
329
+		&i.ContinueOnError,
330
+		&i.Status,
331
+		&i.Conclusion,
332
+		&i.LogObjectKey,
333
+		&i.LogByteCount,
334
+		&i.StartedAt,
335
+		&i.CompletedAt,
336
+		&i.Version,
337
+		&i.CreatedAt,
338
+		&i.UpdatedAt,
339
+		&i.StepWith,
340
+	)
341
+	return i, err
342
+}
343
+
344
+const updateWorkflowStepStatus = `-- name: UpdateWorkflowStepStatus :one
345
+UPDATE workflow_steps
346
+SET status = $2,
347
+    conclusion = $3::check_conclusion,
348
+    started_at = $4::timestamptz,
349
+    completed_at = $5::timestamptz,
350
+    version = version + 1,
351
+    updated_at = now()
352
+WHERE id = $1
353
+RETURNING id, job_id, step_index, step_id, step_name, if_expr,
354
+          run_command, uses_alias, working_directory, step_env,
355
+          continue_on_error, status, conclusion, log_object_key,
356
+          log_byte_count, started_at, completed_at, version,
357
+          created_at, updated_at, step_with
358
+`
359
+
360
+type UpdateWorkflowStepStatusParams struct {
361
+	ID          int64
362
+	Status      WorkflowStepStatus
363
+	Conclusion  NullCheckConclusion
364
+	StartedAt   pgtype.Timestamptz
365
+	CompletedAt pgtype.Timestamptz
366
+}
367
+
368
+func (q *Queries) UpdateWorkflowStepStatus(ctx context.Context, db DBTX, arg UpdateWorkflowStepStatusParams) (WorkflowStep, error) {
369
+	row := db.QueryRow(ctx, updateWorkflowStepStatus,
370
+		arg.ID,
371
+		arg.Status,
372
+		arg.Conclusion,
373
+		arg.StartedAt,
374
+		arg.CompletedAt,
375
+	)
376
+	var i WorkflowStep
377
+	err := row.Scan(
378
+		&i.ID,
379
+		&i.JobID,
380
+		&i.StepIndex,
381
+		&i.StepID,
382
+		&i.StepName,
383
+		&i.IfExpr,
384
+		&i.RunCommand,
385
+		&i.UsesAlias,
386
+		&i.WorkingDirectory,
387
+		&i.StepEnv,
388
+		&i.ContinueOnError,
389
+		&i.Status,
390
+		&i.Conclusion,
391
+		&i.LogObjectKey,
392
+		&i.LogByteCount,
393
+		&i.StartedAt,
394
+		&i.CompletedAt,
395
+		&i.Version,
396
+		&i.CreatedAt,
397
+		&i.UpdatedAt,
398
+		&i.StepWith,
399
+	)
400
+	return i, err
401
+}