tenseleyflow/shithub / 91f228d

Browse files

admin/runner: add registration json and queue diagnostics (S41j)

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
91f228db5ad2a7004dd9adcc6187a523f0ad4cda
Parents
c214372
Tree
2870d85

6 changed files

StatusFile+-
M cmd/shithubd/admin_runner.go 140 7
M cmd/shithubd/admin_runner_test.go 100 0
M internal/actions/queries/workflow_jobs.sql 16 0
M internal/actions/sqlc/querier.go 1 0
M internal/actions/sqlc/workflow_jobs.sql.go 49 0
M internal/actions/trigger/enqueue_test.go 63 0
cmd/shithubd/admin_runner.gomodified
@@ -4,8 +4,10 @@ package main
44
 
55
 import (
66
 	"context"
7
+	"encoding/json"
78
 	"errors"
89
 	"fmt"
10
+	"io"
911
 	"strconv"
1012
 	"strings"
1113
 	"text/tabwriter"
@@ -30,6 +32,7 @@ func newAdminRunnerCmd() *cobra.Command {
3032
 	}
3133
 	cmd.AddCommand(newAdminRunnerRegisterCmd())
3234
 	cmd.AddCommand(newAdminRunnerListCmd())
35
+	cmd.AddCommand(newAdminRunnerQueueCmd())
3336
 	cmd.AddCommand(newAdminRunnerRevokeCmd())
3437
 	return cmd
3538
 }
@@ -38,8 +41,10 @@ func newAdminRunnerRegisterCmd() *cobra.Command {
3841
 	var name string
3942
 	var labelsRaw string
4043
 	var capacity int
44
+	var output string
45
+	var expiresIn time.Duration
4146
 	cmd := &cobra.Command{
42
-		Use:   "register --name <name> [--labels self-hosted,linux] [--capacity 1]",
47
+		Use:   "register --name <name> [--labels self-hosted,linux,ubuntu-latest,x64] [--capacity 1]",
4348
 		Short: "Register an Actions runner and print its token once",
4449
 		RunE: func(cmd *cobra.Command, _ []string) error {
4550
 			name = strings.TrimSpace(name)
@@ -53,6 +58,17 @@ func newAdminRunnerRegisterCmd() *cobra.Command {
5358
 			if capacity < 1 || capacity > 64 {
5459
 				return errors.New("admin runner register: --capacity must be between 1 and 64")
5560
 			}
61
+			output = strings.ToLower(strings.TrimSpace(output))
62
+			switch output {
63
+			case "", "text":
64
+				output = "text"
65
+			case "json":
66
+			default:
67
+				return errors.New("admin runner register: --output must be text or json")
68
+			}
69
+			if expiresIn < 0 {
70
+				return errors.New("admin runner register: --expires-in must be non-negative")
71
+			}
5672
 
5773
 			cfg, err := config.Load(nil)
5874
 			if err != nil {
@@ -70,6 +86,13 @@ func newAdminRunnerRegisterCmd() *cobra.Command {
7086
 			if err != nil {
7187
 				return fmt.Errorf("admin runner register: mint token: %w", err)
7288
 			}
89
+			var expiresAt pgtype.Timestamptz
90
+			var outputExpiresAt *time.Time
91
+			if expiresIn > 0 {
92
+				t := time.Now().UTC().Add(expiresIn)
93
+				expiresAt = pgtype.Timestamptz{Time: t, Valid: true}
94
+				outputExpiresAt = &t
95
+			}
7396
 
7497
 			q := actionsdb.New()
7598
 			tx, err := pool.Begin(ctx)
@@ -95,7 +118,7 @@ func newAdminRunnerRegisterCmd() *cobra.Command {
95118
 			if _, err := q.InsertRunnerToken(ctx, tx, actionsdb.InsertRunnerTokenParams{
96119
 				RunnerID:  runner.ID,
97120
 				TokenHash: tokenHash,
98
-				ExpiresAt: pgtype.Timestamptz{},
121
+				ExpiresAt: expiresAt,
99122
 			}); err != nil {
100123
 				return fmt.Errorf("admin runner register: insert token: %w", err)
101124
 			}
@@ -105,18 +128,49 @@ func newAdminRunnerRegisterCmd() *cobra.Command {
105128
 			committed = true
106129
 			metrics.ActionsRunnerRegistrationsTotal.Inc()
107130
 
108
-			_, _ = fmt.Fprintf(cmd.OutOrStdout(),
109
-				"runner registered\nid: %d\nname: %s\nlabels: %s\ncapacity: %d\ntoken: %s\n\nStore this token now; shithub never shows it again.\n",
110
-				runner.ID, runner.Name, strings.Join(runner.Labels, ","), runner.Capacity, token)
111
-			return nil
131
+			return writeRunnerRegisterOutput(cmd.OutOrStdout(), output, runnerRegisterOutput{
132
+				ID:             runner.ID,
133
+				Name:           runner.Name,
134
+				Labels:         runner.Labels,
135
+				Capacity:       runner.Capacity,
136
+				Token:          token,
137
+				TokenExpiresAt: outputExpiresAt,
138
+			})
112139
 		},
113140
 	}
114141
 	cmd.Flags().StringVar(&name, "name", "", "Runner name (letters, numbers, underscore, dash)")
115
-	cmd.Flags().StringVar(&labelsRaw, "labels", "", "Comma-separated runner labels")
142
+	cmd.Flags().StringVar(&labelsRaw, "labels", strings.Join(runnerlabels.DefaultShared(), ","), "Comma-separated runner labels")
116143
 	cmd.Flags().IntVar(&capacity, "capacity", 1, "Maximum concurrent jobs this runner may execute")
144
+	cmd.Flags().DurationVar(&expiresIn, "expires-in", 0, "Registration token lifetime (0 means no expiration)")
145
+	cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
117146
 	return cmd
118147
 }
119148
 
149
+type runnerRegisterOutput struct {
150
+	ID             int64      `json:"id"`
151
+	Name           string     `json:"name"`
152
+	Labels         []string   `json:"labels"`
153
+	Capacity       int32      `json:"capacity"`
154
+	Token          string     `json:"token"`
155
+	TokenExpiresAt *time.Time `json:"token_expires_at,omitempty"`
156
+}
157
+
158
+func writeRunnerRegisterOutput(w io.Writer, format string, out runnerRegisterOutput) error {
159
+	if format == "json" {
160
+		enc := json.NewEncoder(w)
161
+		enc.SetIndent("", "  ")
162
+		return enc.Encode(out)
163
+	}
164
+	expires := "never"
165
+	if out.TokenExpiresAt != nil {
166
+		expires = out.TokenExpiresAt.Format(time.RFC3339)
167
+	}
168
+	_, err := fmt.Fprintf(w,
169
+		"runner registered\nid: %d\nname: %s\nlabels: %s\ncapacity: %d\ntoken_expires_at: %s\ntoken: %s\n\nStore this token now; shithub never shows it again.\n",
170
+		out.ID, out.Name, strings.Join(out.Labels, ","), out.Capacity, expires, out.Token)
171
+	return err
172
+}
173
+
120174
 func newAdminRunnerListCmd() *cobra.Command {
121175
 	return &cobra.Command{
122176
 		Use:   "list",
@@ -153,6 +207,85 @@ func newAdminRunnerListCmd() *cobra.Command {
153207
 	}
154208
 }
155209
 
210
+func newAdminRunnerQueueCmd() *cobra.Command {
211
+	var output string
212
+	cmd := &cobra.Command{
213
+		Use:   "queue",
214
+		Short: "Summarize queued Actions jobs by runs-on label",
215
+		RunE: func(cmd *cobra.Command, _ []string) error {
216
+			output = strings.ToLower(strings.TrimSpace(output))
217
+			switch output {
218
+			case "", "text":
219
+				output = "text"
220
+			case "json":
221
+			default:
222
+				return errors.New("admin runner queue: --output must be text or json")
223
+			}
224
+			cfg, err := config.Load(nil)
225
+			if err != nil {
226
+				return err
227
+			}
228
+			ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
229
+			defer cancel()
230
+			pool, err := openAdminRunnerPool(ctx, cfg, "queue")
231
+			if err != nil {
232
+				return err
233
+			}
234
+			defer pool.Close()
235
+
236
+			rows, err := actionsdb.New().ListQueuedWorkflowJobRunsOn(ctx, pool)
237
+			if err != nil {
238
+				return fmt.Errorf("admin runner queue: %w", err)
239
+			}
240
+			return writeRunnerQueueOutput(cmd.OutOrStdout(), output, rows, time.Now().UTC())
241
+		},
242
+	}
243
+	cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
244
+	return cmd
245
+}
246
+
247
+type runnerQueueOutputRow struct {
248
+	RunsOn              string `json:"runs_on"`
249
+	QueuedJobs          int32  `json:"queued_jobs"`
250
+	MatchingRunnerCount int32  `json:"matching_runner_count"`
251
+	OldestQueuedAt      string `json:"oldest_queued_at,omitempty"`
252
+	OldestQueuedSeconds int64  `json:"oldest_queued_seconds,omitempty"`
253
+}
254
+
255
+func writeRunnerQueueOutput(w io.Writer, format string, rows []actionsdb.ListQueuedWorkflowJobRunsOnRow, now time.Time) error {
256
+	out := make([]runnerQueueOutputRow, 0, len(rows))
257
+	for _, row := range rows {
258
+		item := runnerQueueOutputRow{
259
+			RunsOn:              row.RunsOn,
260
+			QueuedJobs:          row.QueuedJobs,
261
+			MatchingRunnerCount: row.MatchingRunnerCount,
262
+		}
263
+		if row.OldestQueuedAt.Valid {
264
+			item.OldestQueuedAt = row.OldestQueuedAt.Time.UTC().Format(time.RFC3339)
265
+			if d := now.Sub(row.OldestQueuedAt.Time); d > 0 {
266
+				item.OldestQueuedSeconds = int64(d.Seconds())
267
+			}
268
+		}
269
+		out = append(out, item)
270
+	}
271
+	if format == "json" {
272
+		enc := json.NewEncoder(w)
273
+		enc.SetIndent("", "  ")
274
+		return enc.Encode(out)
275
+	}
276
+
277
+	tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
278
+	_, _ = fmt.Fprintln(tw, "RUNS_ON\tQUEUED_JOBS\tMATCHING_RUNNERS\tOLDEST_QUEUED")
279
+	for _, row := range out {
280
+		oldest := "-"
281
+		if row.OldestQueuedAt != "" {
282
+			oldest = row.OldestQueuedAt
283
+		}
284
+		_, _ = fmt.Fprintf(tw, "%s\t%d\t%d\t%s\n", row.RunsOn, row.QueuedJobs, row.MatchingRunnerCount, oldest)
285
+	}
286
+	return tw.Flush()
287
+}
288
+
156289
 func newAdminRunnerRevokeCmd() *cobra.Command {
157290
 	var idRaw string
158291
 	cmd := &cobra.Command{
cmd/shithubd/admin_runner_test.gomodified
@@ -3,8 +3,16 @@
33
 package main
44
 
55
 import (
6
+	"bytes"
7
+	"encoding/json"
68
 	"reflect"
9
+	"strings"
710
 	"testing"
11
+	"time"
12
+
13
+	"github.com/jackc/pgx/v5/pgtype"
14
+
15
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
816
 )
917
 
1018
 func TestParseRunnerLabels(t *testing.T) {
@@ -45,3 +53,95 @@ func TestParseRunnerLabelsRejectsInvalid(t *testing.T) {
4553
 		})
4654
 	}
4755
 }
56
+
57
+func TestWriteRunnerRegisterOutputJSON(t *testing.T) {
58
+	expiresAt := time.Date(2026, 5, 12, 16, 30, 0, 0, time.UTC)
59
+	var buf bytes.Buffer
60
+	err := writeRunnerRegisterOutput(&buf, "json", runnerRegisterOutput{
61
+		ID:             7,
62
+		Name:           "runner-7",
63
+		Labels:         []string{"self-hosted", "linux", "ubuntu-latest", "x64"},
64
+		Capacity:       2,
65
+		Token:          "shithub_runner_token",
66
+		TokenExpiresAt: &expiresAt,
67
+	})
68
+	if err != nil {
69
+		t.Fatalf("writeRunnerRegisterOutput: %v", err)
70
+	}
71
+	if strings.Contains(buf.String(), "Store this token") {
72
+		t.Fatalf("json output included human prose: %s", buf.String())
73
+	}
74
+	var got struct {
75
+		ID             int64     `json:"id"`
76
+		Name           string    `json:"name"`
77
+		Labels         []string  `json:"labels"`
78
+		Capacity       int32     `json:"capacity"`
79
+		Token          string    `json:"token"`
80
+		TokenExpiresAt time.Time `json:"token_expires_at"`
81
+	}
82
+	if err := json.Unmarshal(buf.Bytes(), &got); err != nil {
83
+		t.Fatalf("json.Unmarshal: %v", err)
84
+	}
85
+	if got.ID != 7 || got.Name != "runner-7" || got.Capacity != 2 || got.Token != "shithub_runner_token" {
86
+		t.Fatalf("unexpected output: %+v", got)
87
+	}
88
+	if !reflect.DeepEqual(got.Labels, []string{"self-hosted", "linux", "ubuntu-latest", "x64"}) {
89
+		t.Fatalf("labels: got %#v", got.Labels)
90
+	}
91
+	if !got.TokenExpiresAt.Equal(expiresAt) {
92
+		t.Fatalf("expires_at: got %s want %s", got.TokenExpiresAt, expiresAt)
93
+	}
94
+}
95
+
96
+func TestWriteRunnerRegisterOutputText(t *testing.T) {
97
+	var buf bytes.Buffer
98
+	err := writeRunnerRegisterOutput(&buf, "text", runnerRegisterOutput{
99
+		ID:       8,
100
+		Name:     "runner-8",
101
+		Labels:   []string{"self-hosted", "linux", "ubuntu-latest", "x64"},
102
+		Capacity: 1,
103
+		Token:    "token-once",
104
+	})
105
+	if err != nil {
106
+		t.Fatalf("writeRunnerRegisterOutput: %v", err)
107
+	}
108
+	body := buf.String()
109
+	for _, want := range []string{
110
+		"runner registered",
111
+		"labels: self-hosted,linux,ubuntu-latest,x64",
112
+		"token_expires_at: never",
113
+		"token: token-once",
114
+		"Store this token now",
115
+	} {
116
+		if !strings.Contains(body, want) {
117
+			t.Fatalf("text output missing %q in %s", want, body)
118
+		}
119
+	}
120
+}
121
+
122
+func TestWriteRunnerQueueOutputJSON(t *testing.T) {
123
+	now := time.Date(2026, 5, 12, 16, 30, 0, 0, time.UTC)
124
+	rows := []actionsdb.ListQueuedWorkflowJobRunsOnRow{{
125
+		RunsOn:              "windows-latest",
126
+		QueuedJobs:          3,
127
+		MatchingRunnerCount: 0,
128
+		OldestQueuedAt:      pgtype.Timestamptz{Time: now.Add(-90 * time.Second), Valid: true},
129
+	}}
130
+	var buf bytes.Buffer
131
+	if err := writeRunnerQueueOutput(&buf, "json", rows, now); err != nil {
132
+		t.Fatalf("writeRunnerQueueOutput: %v", err)
133
+	}
134
+	var got []runnerQueueOutputRow
135
+	if err := json.Unmarshal(buf.Bytes(), &got); err != nil {
136
+		t.Fatalf("json.Unmarshal: %v", err)
137
+	}
138
+	if len(got) != 1 {
139
+		t.Fatalf("rows=%d body=%s", len(got), buf.String())
140
+	}
141
+	if got[0].RunsOn != "windows-latest" || got[0].QueuedJobs != 3 || got[0].MatchingRunnerCount != 0 {
142
+		t.Fatalf("unexpected row: %+v", got[0])
143
+	}
144
+	if got[0].OldestQueuedSeconds != 90 {
145
+		t.Fatalf("oldest seconds=%d", got[0].OldestQueuedSeconds)
146
+	}
147
+}
internal/actions/queries/workflow_jobs.sqlmodified
@@ -172,3 +172,19 @@ SELECT id, run_id, job_index, job_key, job_name, runs_on, status,
172172
 FROM workflow_jobs
173173
 WHERE run_id = $1
174174
 ORDER BY job_index ASC;
175
+
176
+-- name: ListQueuedWorkflowJobRunsOn :many
177
+SELECT
178
+    COALESCE(NULLIF(j.runs_on, ''), '(none)')::text AS runs_on,
179
+    COUNT(*)::integer AS queued_jobs,
180
+    COUNT(DISTINCT wr.id)::integer AS matching_runner_count,
181
+    MIN(j.created_at)::timestamptz AS oldest_queued_at
182
+FROM workflow_jobs j
183
+LEFT JOIN workflow_runners wr
184
+  ON (j.runs_on = '' OR j.runs_on = ANY(wr.labels))
185
+ AND wr.status IN ('idle', 'busy')
186
+WHERE j.status = 'queued'
187
+  AND j.cancel_requested = false
188
+  AND j.runner_id IS NULL
189
+GROUP BY COALESCE(NULLIF(j.runs_on, ''), '(none)')
190
+ORDER BY queued_jobs DESC, runs_on ASC;
internal/actions/sqlc/querier.gomodified
@@ -77,6 +77,7 @@ type Querier interface {
7777
 	ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]ListJobsForRunRow, error)
7878
 	ListOrgSecrets(ctx context.Context, db DBTX, orgID pgtype.Int8) ([]ListOrgSecretsRow, error)
7979
 	ListOrgVariables(ctx context.Context, db DBTX, orgID pgtype.Int8) ([]ListOrgVariablesRow, error)
80
+	ListQueuedWorkflowJobRunsOn(ctx context.Context, db DBTX) ([]ListQueuedWorkflowJobRunsOnRow, error)
8081
 	ListRepoSecrets(ctx context.Context, db DBTX, repoID pgtype.Int8) ([]ListRepoSecretsRow, error)
8182
 	ListRepoVariables(ctx context.Context, db DBTX, repoID pgtype.Int8) ([]ListRepoVariablesRow, error)
8283
 	ListRunnerStepsForJob(ctx context.Context, db DBTX, jobID int64) ([]ListRunnerStepsForJobRow, error)
internal/actions/sqlc/workflow_jobs.sql.gomodified
@@ -334,6 +334,55 @@ func (q *Queries) ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]L
334334
 	return items, nil
335335
 }
336336
 
337
+const listQueuedWorkflowJobRunsOn = `-- name: ListQueuedWorkflowJobRunsOn :many
338
+SELECT
339
+    COALESCE(NULLIF(j.runs_on, ''), '(none)')::text AS runs_on,
340
+    COUNT(*)::integer AS queued_jobs,
341
+    COUNT(DISTINCT wr.id)::integer AS matching_runner_count,
342
+    MIN(j.created_at)::timestamptz AS oldest_queued_at
343
+FROM workflow_jobs j
344
+LEFT JOIN workflow_runners wr
345
+  ON (j.runs_on = '' OR j.runs_on = ANY(wr.labels))
346
+ AND wr.status IN ('idle', 'busy')
347
+WHERE j.status = 'queued'
348
+  AND j.cancel_requested = false
349
+  AND j.runner_id IS NULL
350
+GROUP BY COALESCE(NULLIF(j.runs_on, ''), '(none)')
351
+ORDER BY queued_jobs DESC, runs_on ASC
352
+`
353
+
354
+type ListQueuedWorkflowJobRunsOnRow struct {
355
+	RunsOn              string
356
+	QueuedJobs          int32
357
+	MatchingRunnerCount int32
358
+	OldestQueuedAt      pgtype.Timestamptz
359
+}
360
+
361
+func (q *Queries) ListQueuedWorkflowJobRunsOn(ctx context.Context, db DBTX) ([]ListQueuedWorkflowJobRunsOnRow, error) {
362
+	rows, err := db.Query(ctx, listQueuedWorkflowJobRunsOn)
363
+	if err != nil {
364
+		return nil, err
365
+	}
366
+	defer rows.Close()
367
+	items := []ListQueuedWorkflowJobRunsOnRow{}
368
+	for rows.Next() {
369
+		var i ListQueuedWorkflowJobRunsOnRow
370
+		if err := rows.Scan(
371
+			&i.RunsOn,
372
+			&i.QueuedJobs,
373
+			&i.MatchingRunnerCount,
374
+			&i.OldestQueuedAt,
375
+		); err != nil {
376
+			return nil, err
377
+		}
378
+		items = append(items, i)
379
+	}
380
+	if err := rows.Err(); err != nil {
381
+		return nil, err
382
+	}
383
+	return items, nil
384
+}
385
+
337386
 const requestWorkflowJobCancel = `-- name: RequestWorkflowJobCancel :one
338387
 UPDATE workflow_jobs
339388
 SET cancel_requested = true,
internal/actions/trigger/enqueue_test.gomodified
@@ -149,6 +149,69 @@ func TestEnqueue_HappyPath(t *testing.T) {
149149
 	})
150150
 }
151151
 
152
+func TestListQueuedWorkflowJobRunsOnGroupsByRequestedLabel(t *testing.T) {
153
+	f := setupEnq(t)
154
+	ctx := context.Background()
155
+	q := actionsdb.New()
156
+	runner, err := q.InsertRunner(ctx, f.pool, actionsdb.InsertRunnerParams{
157
+		Name:     "runner-linux",
158
+		Labels:   []string{"self-hosted", "linux", "ubuntu-latest", "x64"},
159
+		Capacity: 1,
160
+	})
161
+	if err != nil {
162
+		t.Fatalf("InsertRunner: %v", err)
163
+	}
164
+	if _, err := q.HeartbeatRunner(ctx, f.pool, actionsdb.HeartbeatRunnerParams{
165
+		ID:       runner.ID,
166
+		Labels:   runner.Labels,
167
+		Capacity: runner.Capacity,
168
+		Status:   actionsdb.WorkflowRunnerStatusIdle,
169
+	}); err != nil {
170
+		t.Fatalf("HeartbeatRunner: %v", err)
171
+	}
172
+
173
+	for name, runsOn := range map[string]string{
174
+		"linux":   "ubuntu-latest",
175
+		"windows": "windows-latest",
176
+	} {
177
+		if _, err := trigger.Enqueue(ctx, f.deps, trigger.EnqueueParams{
178
+			RepoID:         f.repoID,
179
+			WorkflowFile:   ".shithub/workflows/" + name + ".yml",
180
+			HeadSHA:        strings.Repeat(name[:1], 40),
181
+			HeadRef:        "refs/heads/trunk",
182
+			EventKind:      trigger.EventPush,
183
+			EventPayload:   map[string]any{"ref": "refs/heads/trunk"},
184
+			ActorUserID:    f.userID,
185
+			TriggerEventID: "push:queue-label-" + name,
186
+			Workflow: workflowFromYAML(t, fmt.Sprintf(`name: %s
187
+on: push
188
+jobs:
189
+  build:
190
+    runs-on: %s
191
+    steps:
192
+      - run: echo hello
193
+`, name, runsOn)),
194
+		}); err != nil {
195
+			t.Fatalf("Enqueue %s: %v", name, err)
196
+		}
197
+	}
198
+
199
+	rows, err := q.ListQueuedWorkflowJobRunsOn(ctx, f.pool)
200
+	if err != nil {
201
+		t.Fatalf("ListQueuedWorkflowJobRunsOn: %v", err)
202
+	}
203
+	got := map[string]actionsdb.ListQueuedWorkflowJobRunsOnRow{}
204
+	for _, row := range rows {
205
+		got[row.RunsOn] = row
206
+	}
207
+	if got["ubuntu-latest"].QueuedJobs != 1 || got["ubuntu-latest"].MatchingRunnerCount != 1 {
208
+		t.Fatalf("ubuntu-latest row: %+v", got["ubuntu-latest"])
209
+	}
210
+	if got["windows-latest"].QueuedJobs != 1 || got["windows-latest"].MatchingRunnerCount != 0 {
211
+		t.Fatalf("windows-latest row: %+v", got["windows-latest"])
212
+	}
213
+}
214
+
152215
 func TestEnqueue_ResolvesConcurrencyGroupExpression(t *testing.T) {
153216
 	f := setupEnq(t)
154217
 	ctx := context.Background()