tenseleyflow/shithub / 3b6ce7e

Browse files

admin/actions: add runner pool ops controls (S41j-4)

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
3b6ce7eba2f1dc4ddb6c6bf44b884a52f3c19c2d
Parents
6931e9a
Tree
d8dc198

5 changed files

StatusFile+-
M cmd/shithubd/admin_runner.go 488 24
M cmd/shithubd/admin_runner_test.go 67 0
M internal/infra/metrics/actionsobserver.go 53 5
M internal/infra/metrics/actionsobserver_test.go 24 1
M internal/infra/metrics/metrics.go 47 1
cmd/shithubd/admin_runner.gomodified
@@ -13,6 +13,7 @@ import (
1313
 	"text/tabwriter"
1414
 	"time"
1515
 
16
+	"github.com/jackc/pgx/v5"
1617
 	"github.com/jackc/pgx/v5/pgtype"
1718
 	"github.com/jackc/pgx/v5/pgxpool"
1819
 	"github.com/spf13/cobra"
@@ -28,12 +29,16 @@ import (
2829
 func newAdminRunnerCmd() *cobra.Command {
2930
 	cmd := &cobra.Command{
3031
 		Use:   "runner",
31
-		Short: "Register, list, and revoke Actions runners",
32
+		Short: "Register and operate Actions runners",
3233
 	}
3334
 	cmd.AddCommand(newAdminRunnerRegisterCmd())
3435
 	cmd.AddCommand(newAdminRunnerListCmd())
3536
 	cmd.AddCommand(newAdminRunnerQueueCmd())
37
+	cmd.AddCommand(newAdminRunnerDrainCmd())
38
+	cmd.AddCommand(newAdminRunnerUndrainCmd())
39
+	cmd.AddCommand(newAdminRunnerRotateTokenCmd())
3640
 	cmd.AddCommand(newAdminRunnerRevokeCmd())
41
+	cmd.AddCommand(newAdminRunnerCleanupStaleCmd())
3742
 	return cmd
3843
 }
3944
 
@@ -156,6 +161,10 @@ type runnerRegisterOutput struct {
156161
 }
157162
 
158163
 func writeRunnerRegisterOutput(w io.Writer, format string, out runnerRegisterOutput) error {
164
+	return writeRunnerTokenOutput(w, format, "runner registered", out)
165
+}
166
+
167
+func writeRunnerTokenOutput(w io.Writer, format, heading string, out runnerRegisterOutput) error {
159168
 	if format == "json" {
160169
 		enc := json.NewEncoder(w)
161170
 		enc.SetIndent("", "  ")
@@ -166,16 +175,22 @@ func writeRunnerRegisterOutput(w io.Writer, format string, out runnerRegisterOut
166175
 		expires = out.TokenExpiresAt.Format(time.RFC3339)
167176
 	}
168177
 	_, err := fmt.Fprintf(w,
169
-		"runner registered\nid: %d\nname: %s\nlabels: %s\ncapacity: %d\ntoken_expires_at: %s\ntoken: %s\n\nStore this token now; shithub never shows it again.\n",
170
-		out.ID, out.Name, strings.Join(out.Labels, ","), out.Capacity, expires, out.Token)
178
+		"%s\nid: %d\nname: %s\nlabels: %s\ncapacity: %d\ntoken_expires_at: %s\ntoken: %s\n\nStore this token now; shithub never shows it again.\n",
179
+		heading, out.ID, out.Name, strings.Join(out.Labels, ","), out.Capacity, expires, out.Token)
171180
 	return err
172181
 }
173182
 
174183
 func newAdminRunnerListCmd() *cobra.Command {
175
-	return &cobra.Command{
184
+	var output string
185
+	cmd := &cobra.Command{
176186
 		Use:   "list",
177187
 		Short: "List registered Actions runners",
178188
 		RunE: func(cmd *cobra.Command, _ []string) error {
189
+			var err error
190
+			output, err = normalizeRunnerOutput("admin runner list", output)
191
+			if err != nil {
192
+				return err
193
+			}
179194
 			cfg, err := config.Load(nil)
180195
 			if err != nil {
181196
 				return err
@@ -192,19 +207,89 @@ func newAdminRunnerListCmd() *cobra.Command {
192207
 			if err != nil {
193208
 				return fmt.Errorf("admin runner list: %w", err)
194209
 			}
195
-			tw := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 2, ' ', 0)
196
-			_, _ = fmt.Fprintln(tw, "ID\tNAME\tSTATUS\tCAPACITY\tLABELS\tLAST_HEARTBEAT")
197
-			for _, r := range rows {
198
-				last := "never"
199
-				if r.LastHeartbeatAt.Valid {
200
-					last = r.LastHeartbeatAt.Time.Format(time.RFC3339)
201
-				}
202
-				_, _ = fmt.Fprintf(tw, "%d\t%s\t%s\t%d\t%s\t%s\n",
203
-					r.ID, r.Name, r.Status, r.Capacity, strings.Join(r.Labels, ","), last)
204
-			}
205
-			return tw.Flush()
210
+			return writeRunnerListOutput(cmd.OutOrStdout(), output, rows, time.Now().UTC())
206211
 		},
207212
 	}
213
+	cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
214
+	return cmd
215
+}
216
+
217
+type runnerListOutputRow struct {
218
+	ID                      int64    `json:"id"`
219
+	Name                    string   `json:"name"`
220
+	Status                  string   `json:"status"`
221
+	Capacity                int32    `json:"capacity"`
222
+	ActiveJobCount          int32    `json:"active_job_count"`
223
+	Labels                  []string `json:"labels"`
224
+	HostName                string   `json:"host_name,omitempty"`
225
+	Version                 string   `json:"version,omitempty"`
226
+	LastHeartbeatAt         string   `json:"last_heartbeat_at,omitempty"`
227
+	LastHeartbeatAgeSeconds int64    `json:"last_heartbeat_age_seconds,omitempty"`
228
+	DrainingAt              string   `json:"draining_at,omitempty"`
229
+	DrainReason             string   `json:"drain_reason,omitempty"`
230
+	RevokedAt               string   `json:"revoked_at,omitempty"`
231
+	RevokedReason           string   `json:"revoked_reason,omitempty"`
232
+	CreatedAt               string   `json:"created_at,omitempty"`
233
+}
234
+
235
+func writeRunnerListOutput(w io.Writer, format string, rows []actionsdb.ListRunnersRow, now time.Time) error {
236
+	out := make([]runnerListOutputRow, 0, len(rows))
237
+	for _, row := range rows {
238
+		item := runnerListOutputRow{
239
+			ID:             row.ID,
240
+			Name:           row.Name,
241
+			Status:         string(row.Status),
242
+			Capacity:       row.Capacity,
243
+			ActiveJobCount: row.ActiveJobCount,
244
+			Labels:         append([]string{}, row.Labels...),
245
+			HostName:       row.HostName,
246
+			Version:        row.Version,
247
+		}
248
+		if row.LastHeartbeatAt.Valid {
249
+			item.LastHeartbeatAt = row.LastHeartbeatAt.Time.UTC().Format(time.RFC3339)
250
+			if d := now.Sub(row.LastHeartbeatAt.Time); d > 0 {
251
+				item.LastHeartbeatAgeSeconds = int64(d.Seconds())
252
+			}
253
+		}
254
+		if row.DrainingAt.Valid {
255
+			item.DrainingAt = row.DrainingAt.Time.UTC().Format(time.RFC3339)
256
+			item.DrainReason = row.DrainReason
257
+		}
258
+		if row.RevokedAt.Valid {
259
+			item.RevokedAt = row.RevokedAt.Time.UTC().Format(time.RFC3339)
260
+			item.RevokedReason = row.RevokedReason
261
+		}
262
+		if row.CreatedAt.Valid {
263
+			item.CreatedAt = row.CreatedAt.Time.UTC().Format(time.RFC3339)
264
+		}
265
+		out = append(out, item)
266
+	}
267
+	if format == "json" {
268
+		enc := json.NewEncoder(w)
269
+		enc.SetIndent("", "  ")
270
+		return enc.Encode(out)
271
+	}
272
+
273
+	tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
274
+	_, _ = fmt.Fprintln(tw, "ID\tNAME\tSTATUS\tCAPACITY\tACTIVE\tLABELS\tHOST\tVERSION\tLAST_HEARTBEAT\tDRAINING\tREVOKED")
275
+	for _, row := range out {
276
+		last := "never"
277
+		if row.LastHeartbeatAt != "" {
278
+			last = row.LastHeartbeatAt
279
+		}
280
+		draining := "-"
281
+		if row.DrainingAt != "" {
282
+			draining = row.DrainingAt
283
+		}
284
+		revoked := "-"
285
+		if row.RevokedAt != "" {
286
+			revoked = row.RevokedAt
287
+		}
288
+		_, _ = fmt.Fprintf(tw, "%d\t%s\t%s\t%d\t%d\t%s\t%s\t%s\t%s\t%s\t%s\n",
289
+			row.ID, row.Name, row.Status, row.Capacity, row.ActiveJobCount, strings.Join(row.Labels, ","),
290
+			emptyDash(row.HostName), emptyDash(row.Version), last, draining, revoked)
291
+	}
292
+	return tw.Flush()
208293
 }
209294
 
210295
 func newAdminRunnerQueueCmd() *cobra.Command {
@@ -286,15 +371,227 @@ func writeRunnerQueueOutput(w io.Writer, format string, rows []actionsdb.ListQue
286371
 	return tw.Flush()
287372
 }
288373
 
374
+func newAdminRunnerDrainCmd() *cobra.Command {
375
+	var idRaw string
376
+	var reason string
377
+	var output string
378
+	cmd := &cobra.Command{
379
+		Use:   "drain --id <id> [--reason <text>]",
380
+		Short: "Stop an Actions runner from claiming new jobs",
381
+		RunE: func(cmd *cobra.Command, _ []string) error {
382
+			id, err := parseRunnerID("admin runner drain", idRaw)
383
+			if err != nil {
384
+				return err
385
+			}
386
+			output, err = normalizeRunnerOutput("admin runner drain", output)
387
+			if err != nil {
388
+				return err
389
+			}
390
+			reason, err = normalizeRunnerReason(reason, "operator requested drain")
391
+			if err != nil {
392
+				return err
393
+			}
394
+			cfg, err := config.Load(nil)
395
+			if err != nil {
396
+				return err
397
+			}
398
+			ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
399
+			defer cancel()
400
+			pool, err := openAdminRunnerPool(ctx, cfg, "drain")
401
+			if err != nil {
402
+				return err
403
+			}
404
+			defer pool.Close()
405
+
406
+			row, err := actionsdb.New().SetRunnerDraining(ctx, pool, actionsdb.SetRunnerDrainingParams{
407
+				ID:          id,
408
+				DrainReason: reason,
409
+			})
410
+			if err != nil {
411
+				if errors.Is(err, pgx.ErrNoRows) {
412
+					return fmt.Errorf("admin runner drain: runner %d not found or already revoked", id)
413
+				}
414
+				return fmt.Errorf("admin runner drain: %w", err)
415
+			}
416
+			return writeRunnerStateOutput(cmd.OutOrStdout(), output, "runner draining", runnerStateOutput{
417
+				ID:          row.ID,
418
+				Name:        row.Name,
419
+				Status:      string(row.Status),
420
+				DrainingAt:  formatOptionalTime(row.DrainingAt),
421
+				DrainReason: row.DrainReason,
422
+				RevokedAt:   formatOptionalTime(row.RevokedAt),
423
+			})
424
+		},
425
+	}
426
+	cmd.Flags().StringVar(&idRaw, "id", "", "Runner id")
427
+	cmd.Flags().StringVar(&reason, "reason", "", "Drain reason recorded for operators")
428
+	cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
429
+	return cmd
430
+}
431
+
432
+func newAdminRunnerUndrainCmd() *cobra.Command {
433
+	var idRaw string
434
+	var output string
435
+	cmd := &cobra.Command{
436
+		Use:   "undrain --id <id>",
437
+		Short: "Allow a drained Actions runner to claim jobs again",
438
+		RunE: func(cmd *cobra.Command, _ []string) error {
439
+			id, err := parseRunnerID("admin runner undrain", idRaw)
440
+			if err != nil {
441
+				return err
442
+			}
443
+			output, err = normalizeRunnerOutput("admin runner undrain", output)
444
+			if err != nil {
445
+				return err
446
+			}
447
+			cfg, err := config.Load(nil)
448
+			if err != nil {
449
+				return err
450
+			}
451
+			ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
452
+			defer cancel()
453
+			pool, err := openAdminRunnerPool(ctx, cfg, "undrain")
454
+			if err != nil {
455
+				return err
456
+			}
457
+			defer pool.Close()
458
+
459
+			row, err := actionsdb.New().ClearRunnerDraining(ctx, pool, id)
460
+			if err != nil {
461
+				if errors.Is(err, pgx.ErrNoRows) {
462
+					return fmt.Errorf("admin runner undrain: runner %d not found or already revoked", id)
463
+				}
464
+				return fmt.Errorf("admin runner undrain: %w", err)
465
+			}
466
+			return writeRunnerStateOutput(cmd.OutOrStdout(), output, "runner undrained", runnerStateOutput{
467
+				ID:          row.ID,
468
+				Name:        row.Name,
469
+				Status:      string(row.Status),
470
+				DrainingAt:  formatOptionalTime(row.DrainingAt),
471
+				DrainReason: row.DrainReason,
472
+				RevokedAt:   formatOptionalTime(row.RevokedAt),
473
+			})
474
+		},
475
+	}
476
+	cmd.Flags().StringVar(&idRaw, "id", "", "Runner id")
477
+	cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
478
+	return cmd
479
+}
480
+
481
+func newAdminRunnerRotateTokenCmd() *cobra.Command {
482
+	var idRaw string
483
+	var output string
484
+	var expiresIn time.Duration
485
+	cmd := &cobra.Command{
486
+		Use:   "rotate-token --id <id>",
487
+		Short: "Revoke existing registration tokens and print one replacement token",
488
+		RunE: func(cmd *cobra.Command, _ []string) error {
489
+			id, err := parseRunnerID("admin runner rotate-token", idRaw)
490
+			if err != nil {
491
+				return err
492
+			}
493
+			output, err = normalizeRunnerOutput("admin runner rotate-token", output)
494
+			if err != nil {
495
+				return err
496
+			}
497
+			if expiresIn < 0 {
498
+				return errors.New("admin runner rotate-token: --expires-in must be non-negative")
499
+			}
500
+			cfg, err := config.Load(nil)
501
+			if err != nil {
502
+				return err
503
+			}
504
+			ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
505
+			defer cancel()
506
+			pool, err := openAdminRunnerPool(ctx, cfg, "rotate-token")
507
+			if err != nil {
508
+				return err
509
+			}
510
+			defer pool.Close()
511
+
512
+			token, tokenHash, err := runnertoken.New()
513
+			if err != nil {
514
+				return fmt.Errorf("admin runner rotate-token: mint token: %w", err)
515
+			}
516
+			var expiresAt pgtype.Timestamptz
517
+			var outputExpiresAt *time.Time
518
+			if expiresIn > 0 {
519
+				t := time.Now().UTC().Add(expiresIn)
520
+				expiresAt = pgtype.Timestamptz{Time: t, Valid: true}
521
+				outputExpiresAt = &t
522
+			}
523
+
524
+			q := actionsdb.New()
525
+			tx, err := pool.Begin(ctx)
526
+			if err != nil {
527
+				return fmt.Errorf("admin runner rotate-token: begin: %w", err)
528
+			}
529
+			committed := false
530
+			defer func() {
531
+				if !committed {
532
+					_ = tx.Rollback(ctx)
533
+				}
534
+			}()
535
+			runner, err := q.LockRunnerByID(ctx, tx, id)
536
+			if err != nil {
537
+				if errors.Is(err, pgx.ErrNoRows) {
538
+					return fmt.Errorf("admin runner rotate-token: runner %d not found", id)
539
+				}
540
+				return fmt.Errorf("admin runner rotate-token: lock runner: %w", err)
541
+			}
542
+			if runner.RevokedAt.Valid {
543
+				return fmt.Errorf("admin runner rotate-token: runner %d is revoked", id)
544
+			}
545
+			if err := q.RevokeAllTokensForRunner(ctx, tx, id); err != nil {
546
+				return fmt.Errorf("admin runner rotate-token: revoke old tokens: %w", err)
547
+			}
548
+			if _, err := q.InsertRunnerToken(ctx, tx, actionsdb.InsertRunnerTokenParams{
549
+				RunnerID:  runner.ID,
550
+				TokenHash: tokenHash,
551
+				ExpiresAt: expiresAt,
552
+			}); err != nil {
553
+				return fmt.Errorf("admin runner rotate-token: insert token: %w", err)
554
+			}
555
+			if err := tx.Commit(ctx); err != nil {
556
+				return fmt.Errorf("admin runner rotate-token: commit: %w", err)
557
+			}
558
+			committed = true
559
+
560
+			return writeRunnerTokenOutput(cmd.OutOrStdout(), output, "runner token rotated", runnerRegisterOutput{
561
+				ID:             runner.ID,
562
+				Name:           runner.Name,
563
+				Labels:         runner.Labels,
564
+				Capacity:       runner.Capacity,
565
+				Token:          token,
566
+				TokenExpiresAt: outputExpiresAt,
567
+			})
568
+		},
569
+	}
570
+	cmd.Flags().StringVar(&idRaw, "id", "", "Runner id")
571
+	cmd.Flags().DurationVar(&expiresIn, "expires-in", 0, "Registration token lifetime (0 means no expiration)")
572
+	cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
573
+	return cmd
574
+}
575
+
289576
 func newAdminRunnerRevokeCmd() *cobra.Command {
290577
 	var idRaw string
578
+	var reason string
579
+	var output string
291580
 	cmd := &cobra.Command{
292581
 		Use:   "revoke --id <id>",
293
-		Short: "Revoke all registration tokens for an Actions runner",
582
+		Short: "Hard-revoke an Actions runner and all registration tokens",
294583
 		RunE: func(cmd *cobra.Command, _ []string) error {
295
-			id, err := strconv.ParseInt(strings.TrimSpace(idRaw), 10, 64)
296
-			if err != nil || id <= 0 {
297
-				return errors.New("admin runner revoke: --id must be a positive integer")
584
+			id, err := parseRunnerID("admin runner revoke", idRaw)
585
+			if err != nil {
586
+				return err
587
+			}
588
+			output, err = normalizeRunnerOutput("admin runner revoke", output)
589
+			if err != nil {
590
+				return err
591
+			}
592
+			reason, err = normalizeRunnerReason(reason, "operator requested revoke")
593
+			if err != nil {
594
+				return err
298595
 			}
299596
 			cfg, err := config.Load(nil)
300597
 			if err != nil {
@@ -309,21 +606,143 @@ func newAdminRunnerRevokeCmd() *cobra.Command {
309606
 			defer pool.Close()
310607
 
311608
 			q := actionsdb.New()
312
-			runner, err := q.GetRunnerByID(ctx, pool, id)
609
+			tx, err := pool.Begin(ctx)
313610
 			if err != nil {
314
-				return fmt.Errorf("admin runner revoke: runner %d not found", id)
611
+				return fmt.Errorf("admin runner revoke: begin: %w", err)
315612
 			}
316
-			if err := q.RevokeAllTokensForRunner(ctx, pool, id); err != nil {
613
+			committed := false
614
+			defer func() {
615
+				if !committed {
616
+					_ = tx.Rollback(ctx)
617
+				}
618
+			}()
619
+			runner, err := q.RevokeRunner(ctx, tx, actionsdb.RevokeRunnerParams{
620
+				ID:            id,
621
+				RevokedReason: reason,
622
+			})
623
+			if err != nil {
624
+				if errors.Is(err, pgx.ErrNoRows) {
625
+					return fmt.Errorf("admin runner revoke: runner %d not found", id)
626
+				}
317627
 				return fmt.Errorf("admin runner revoke: %w", err)
318628
 			}
319
-			_, _ = fmt.Fprintf(cmd.OutOrStdout(), "runner revoked\nid: %d\nname: %s\n", runner.ID, runner.Name)
320
-			return nil
629
+			if err := q.RevokeAllTokensForRunner(ctx, tx, id); err != nil {
630
+				return fmt.Errorf("admin runner revoke: revoke tokens: %w", err)
631
+			}
632
+			if err := tx.Commit(ctx); err != nil {
633
+				return fmt.Errorf("admin runner revoke: commit: %w", err)
634
+			}
635
+			committed = true
636
+			metrics.ActionsRunnerRevocationsTotal.Inc()
637
+			return writeRunnerStateOutput(cmd.OutOrStdout(), output, "runner revoked", runnerStateOutput{
638
+				ID:            runner.ID,
639
+				Name:          runner.Name,
640
+				Status:        string(runner.Status),
641
+				DrainingAt:    formatOptionalTime(runner.DrainingAt),
642
+				DrainReason:   runner.DrainReason,
643
+				RevokedAt:     formatOptionalTime(runner.RevokedAt),
644
+				RevokedReason: runner.RevokedReason,
645
+			})
321646
 		},
322647
 	}
323648
 	cmd.Flags().StringVar(&idRaw, "id", "", "Runner id")
649
+	cmd.Flags().StringVar(&reason, "reason", "", "Revocation reason recorded for operators")
650
+	cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
651
+	return cmd
652
+}
653
+
654
+func newAdminRunnerCleanupStaleCmd() *cobra.Command {
655
+	var olderThan time.Duration
656
+	var output string
657
+	cmd := &cobra.Command{
658
+		Use:   "cleanup-stale",
659
+		Short: "Mark stale non-revoked runners offline",
660
+		RunE: func(cmd *cobra.Command, _ []string) error {
661
+			var err error
662
+			output, err = normalizeRunnerOutput("admin runner cleanup-stale", output)
663
+			if err != nil {
664
+				return err
665
+			}
666
+			if olderThan <= 0 {
667
+				return errors.New("admin runner cleanup-stale: --older-than must be positive")
668
+			}
669
+			cfg, err := config.Load(nil)
670
+			if err != nil {
671
+				return err
672
+			}
673
+			ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
674
+			defer cancel()
675
+			pool, err := openAdminRunnerPool(ctx, cfg, "cleanup-stale")
676
+			if err != nil {
677
+				return err
678
+			}
679
+			defer pool.Close()
680
+
681
+			cutoff := time.Now().UTC().Add(-olderThan)
682
+			rows, err := actionsdb.New().MarkStaleRunnersOffline(ctx, pool, pgtype.Timestamptz{Time: cutoff, Valid: true})
683
+			if err != nil {
684
+				return fmt.Errorf("admin runner cleanup-stale: %w", err)
685
+			}
686
+			return writeRunnerCleanupOutput(cmd.OutOrStdout(), output, rows)
687
+		},
688
+	}
689
+	cmd.Flags().DurationVar(&olderThan, "older-than", 2*time.Minute, "Heartbeat age after which non-revoked runners are marked offline")
690
+	cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
324691
 	return cmd
325692
 }
326693
 
694
+type runnerStateOutput struct {
695
+	ID            int64  `json:"id"`
696
+	Name          string `json:"name"`
697
+	Status        string `json:"status"`
698
+	DrainingAt    string `json:"draining_at,omitempty"`
699
+	DrainReason   string `json:"drain_reason,omitempty"`
700
+	RevokedAt     string `json:"revoked_at,omitempty"`
701
+	RevokedReason string `json:"revoked_reason,omitempty"`
702
+}
703
+
704
+func writeRunnerStateOutput(w io.Writer, format, heading string, out runnerStateOutput) error {
705
+	if format == "json" {
706
+		enc := json.NewEncoder(w)
707
+		enc.SetIndent("", "  ")
708
+		return enc.Encode(out)
709
+	}
710
+	_, err := fmt.Fprintf(w, "%s\nid: %d\nname: %s\nstatus: %s\ndraining_at: %s\ndrain_reason: %s\nrevoked_at: %s\nrevoked_reason: %s\n",
711
+		heading, out.ID, out.Name, out.Status, emptyDash(out.DrainingAt), emptyDash(out.DrainReason),
712
+		emptyDash(out.RevokedAt), emptyDash(out.RevokedReason))
713
+	return err
714
+}
715
+
716
+type runnerCleanupOutputRow struct {
717
+	ID              int64  `json:"id"`
718
+	Name            string `json:"name"`
719
+	Status          string `json:"status"`
720
+	LastHeartbeatAt string `json:"last_heartbeat_at,omitempty"`
721
+}
722
+
723
+func writeRunnerCleanupOutput(w io.Writer, format string, rows []actionsdb.MarkStaleRunnersOfflineRow) error {
724
+	out := make([]runnerCleanupOutputRow, 0, len(rows))
725
+	for _, row := range rows {
726
+		out = append(out, runnerCleanupOutputRow{
727
+			ID:              row.ID,
728
+			Name:            row.Name,
729
+			Status:          string(row.Status),
730
+			LastHeartbeatAt: formatOptionalTime(row.LastHeartbeatAt),
731
+		})
732
+	}
733
+	if format == "json" {
734
+		enc := json.NewEncoder(w)
735
+		enc.SetIndent("", "  ")
736
+		return enc.Encode(out)
737
+	}
738
+	tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
739
+	_, _ = fmt.Fprintln(tw, "ID\tNAME\tSTATUS\tLAST_HEARTBEAT")
740
+	for _, row := range out {
741
+		_, _ = fmt.Fprintf(tw, "%d\t%s\t%s\t%s\n", row.ID, row.Name, row.Status, emptyDash(row.LastHeartbeatAt))
742
+	}
743
+	return tw.Flush()
744
+}
745
+
327746
 func openAdminRunnerPool(ctx context.Context, cfg config.Config, op string) (*pgxpool.Pool, error) {
328747
 	if cfg.DB.URL == "" {
329748
 		return nil, fmt.Errorf("admin runner %s: DB not configured (set SHITHUB_DATABASE_URL)", op)
@@ -342,6 +761,51 @@ func parseRunnerLabels(raw string) ([]string, error) {
342761
 	return runnerlabels.ParseCSV(raw)
343762
 }
344763
 
764
+func parseRunnerID(op, raw string) (int64, error) {
765
+	id, err := strconv.ParseInt(strings.TrimSpace(raw), 10, 64)
766
+	if err != nil || id <= 0 {
767
+		return 0, fmt.Errorf("%s: --id must be a positive integer", op)
768
+	}
769
+	return id, nil
770
+}
771
+
772
+func normalizeRunnerOutput(op, output string) (string, error) {
773
+	output = strings.ToLower(strings.TrimSpace(output))
774
+	switch output {
775
+	case "", "text":
776
+		return "text", nil
777
+	case "json":
778
+		return "json", nil
779
+	default:
780
+		return "", fmt.Errorf("%s: --output must be text or json", op)
781
+	}
782
+}
783
+
784
+func normalizeRunnerReason(reason, fallback string) (string, error) {
785
+	reason = strings.TrimSpace(reason)
786
+	if reason == "" {
787
+		reason = fallback
788
+	}
789
+	if len(reason) > 1000 {
790
+		return "", errors.New("runner reason must be 1000 bytes or fewer")
791
+	}
792
+	return reason, nil
793
+}
794
+
795
+func formatOptionalTime(t pgtype.Timestamptz) string {
796
+	if !t.Valid {
797
+		return ""
798
+	}
799
+	return t.Time.UTC().Format(time.RFC3339)
800
+}
801
+
802
+func emptyDash(value string) string {
803
+	if strings.TrimSpace(value) == "" {
804
+		return "-"
805
+	}
806
+	return value
807
+}
808
+
345809
 func init() {
346810
 	adminCmd.AddCommand(newAdminRunnerCmd())
347811
 	adminActionsCmd.AddCommand(newAdminRunnerCmd())
cmd/shithubd/admin_runner_test.gomodified
@@ -145,3 +145,70 @@ func TestWriteRunnerQueueOutputJSON(t *testing.T) {
145145
 		t.Fatalf("oldest seconds=%d", got[0].OldestQueuedSeconds)
146146
 	}
147147
 }
148
+
149
+func TestWriteRunnerListOutputJSONIncludesOpsFields(t *testing.T) {
150
+	now := time.Date(2026, 5, 12, 16, 30, 0, 0, time.UTC)
151
+	rows := []actionsdb.ListRunnersRow{{
152
+		ID:              7,
153
+		Name:            "runner-a",
154
+		Labels:          []string{"self-hosted", "linux"},
155
+		Capacity:        2,
156
+		Status:          actionsdb.WorkflowRunnerStatusBusy,
157
+		LastHeartbeatAt: pgtype.Timestamptz{Time: now.Add(-75 * time.Second), Valid: true},
158
+		HostName:        "host-a",
159
+		Version:         "dev-test",
160
+		DrainingAt:      pgtype.Timestamptz{Time: now.Add(-30 * time.Second), Valid: true},
161
+		DrainReason:     "maintenance",
162
+		ActiveJobCount:  1,
163
+	}}
164
+	var buf bytes.Buffer
165
+	if err := writeRunnerListOutput(&buf, "json", rows, now); err != nil {
166
+		t.Fatalf("writeRunnerListOutput: %v", err)
167
+	}
168
+	var got []runnerListOutputRow
169
+	if err := json.Unmarshal(buf.Bytes(), &got); err != nil {
170
+		t.Fatalf("json.Unmarshal: %v", err)
171
+	}
172
+	if len(got) != 1 {
173
+		t.Fatalf("rows=%d body=%s", len(got), buf.String())
174
+	}
175
+	if got[0].HostName != "host-a" || got[0].Version != "dev-test" ||
176
+		got[0].ActiveJobCount != 1 || got[0].LastHeartbeatAgeSeconds != 75 ||
177
+		got[0].DrainingAt == "" || got[0].DrainReason != "maintenance" {
178
+		t.Fatalf("unexpected row: %+v", got[0])
179
+	}
180
+}
181
+
182
+func TestWriteRunnerStateOutputText(t *testing.T) {
183
+	var buf bytes.Buffer
184
+	if err := writeRunnerStateOutput(&buf, "text", "runner draining", runnerStateOutput{
185
+		ID:          7,
186
+		Name:        "runner-a",
187
+		Status:      "busy",
188
+		DrainingAt:  "2026-05-12T16:30:00Z",
189
+		DrainReason: "maintenance",
190
+	}); err != nil {
191
+		t.Fatalf("writeRunnerStateOutput: %v", err)
192
+	}
193
+	body := buf.String()
194
+	for _, want := range []string{
195
+		"runner draining",
196
+		"id: 7",
197
+		"name: runner-a",
198
+		"drain_reason: maintenance",
199
+	} {
200
+		if !strings.Contains(body, want) {
201
+			t.Fatalf("text output missing %q in %s", want, body)
202
+		}
203
+	}
204
+}
205
+
206
+func TestParseRunnerIDRejectsInvalid(t *testing.T) {
207
+	for _, raw := range []string{"", "0", "-1", "abc"} {
208
+		t.Run(raw, func(t *testing.T) {
209
+			if _, err := parseRunnerID("admin runner test", raw); err == nil {
210
+				t.Fatal("parseRunnerID returned nil error")
211
+			}
212
+		})
213
+	}
214
+}
internal/infra/metrics/actionsobserver.gomodified
@@ -9,6 +9,8 @@ import (
99
 	"github.com/jackc/pgx/v5/pgxpool"
1010
 )
1111
 
12
+const actionsRunnerStaleAfter = 60 * time.Second
13
+
1214
 // ObserveActions starts a goroutine that periodically refreshes DB-backed
1315
 // Actions gauges. The goroutine exits when ctx is canceled.
1416
 func ObserveActions(ctx context.Context, pool *pgxpool.Pool, interval time.Duration) {
@@ -45,6 +47,7 @@ func refreshActions(ctx context.Context, pool *pgxpool.Pool) {
4547
 func refreshActionQueueGauges(ctx context.Context, pool *pgxpool.Pool) {
4648
 	ActionsQueueDepth.WithLabelValues("runs").Set(0)
4749
 	ActionsQueueDepth.WithLabelValues("jobs").Set(0)
50
+	ActionsQueueDepthByLabels.Reset()
4851
 	ActionsActive.WithLabelValues("runs").Set(0)
4952
 	ActionsActive.WithLabelValues("jobs").Set(0)
5053
 
@@ -75,31 +78,76 @@ GROUP BY status`)
7578
 			ActionsActive.WithLabelValues(resource).Set(count)
7679
 		}
7780
 	}
81
+	rows.Close()
82
+
83
+	labelRows, err := pool.Query(ctx, `
84
+SELECT COALESCE(NULLIF(runs_on, ''), '(none)')::text AS labels,
85
+       count(*)::double precision
86
+FROM workflow_jobs
87
+WHERE status = 'queued'
88
+  AND cancel_requested = false
89
+  AND runner_id IS NULL
90
+GROUP BY COALESCE(NULLIF(runs_on, ''), '(none)')`)
91
+	if err != nil {
92
+		return
93
+	}
94
+	defer labelRows.Close()
95
+	for labelRows.Next() {
96
+		var labels string
97
+		var count float64
98
+		if err := labelRows.Scan(&labels, &count); err != nil {
99
+			return
100
+		}
101
+		ActionsQueueDepthByLabels.WithLabelValues(labels).Set(count)
102
+	}
78103
 }
79104
 
80105
 func refreshActionRunnerGauges(ctx context.Context, pool *pgxpool.Pool) {
81106
 	ActionsRunnerHeartbeatAgeSeconds.Reset()
107
+	ActionsRunnerOnline.Reset()
108
+	ActionsRunnerDraining.Reset()
82109
 	ActionsRunnerCapacity.Reset()
110
+	ActionsRunnerStaleTotal.Set(0)
83111
 	rows, err := pool.Query(ctx, `
84112
 SELECT name::text,
85113
        status::text,
86114
        capacity::double precision,
87
-       EXTRACT(EPOCH FROM (now() - last_heartbeat_at))::double precision AS heartbeat_age_seconds
88
-FROM workflow_runners
89
-WHERE last_heartbeat_at IS NOT NULL`)
115
+       COALESCE(EXTRACT(EPOCH FROM (now() - last_heartbeat_at))::double precision, -1) AS heartbeat_age_seconds,
116
+       (draining_at IS NOT NULL)::boolean AS draining,
117
+       (revoked_at IS NOT NULL)::boolean AS revoked
118
+FROM workflow_runners`)
90119
 	if err != nil {
91120
 		return
92121
 	}
93122
 	defer rows.Close()
123
+	var stale float64
94124
 	for rows.Next() {
95125
 		var name, status string
96126
 		var capacity, age float64
97
-		if err := rows.Scan(&name, &status, &capacity, &age); err != nil {
127
+		var draining, revoked bool
128
+		if err := rows.Scan(&name, &status, &capacity, &age, &draining, &revoked); err != nil {
98129
 			return
99130
 		}
100131
 		ActionsRunnerCapacity.WithLabelValues(name, status).Set(capacity)
101
-		ActionsRunnerHeartbeatAgeSeconds.WithLabelValues(name, status).Set(age)
132
+		if age >= 0 {
133
+			ActionsRunnerHeartbeatAgeSeconds.WithLabelValues(name, status).Set(age)
134
+		}
135
+		online := !revoked && status != "offline" && age >= 0 && age <= actionsRunnerStaleAfter.Seconds()
136
+		if online {
137
+			ActionsRunnerOnline.WithLabelValues(name).Set(1)
138
+		} else {
139
+			ActionsRunnerOnline.WithLabelValues(name).Set(0)
140
+		}
141
+		if draining {
142
+			ActionsRunnerDraining.WithLabelValues(name).Set(1)
143
+		} else {
144
+			ActionsRunnerDraining.WithLabelValues(name).Set(0)
145
+		}
146
+		if !revoked && status != "offline" && age > actionsRunnerStaleAfter.Seconds() {
147
+			stale++
148
+		}
102149
 	}
150
+	ActionsRunnerStaleTotal.Set(stale)
103151
 }
104152
 
105153
 func refreshActionStorageGauges(ctx context.Context, pool *pgxpool.Pool) {
internal/infra/metrics/actionsobserver_test.gomodified
@@ -60,6 +60,7 @@ func TestRefreshActionsPublishesQueueRunnerAndStorageGauges(t *testing.T) {
6060
 		JobKey:         "build",
6161
 		JobName:        "Build",
6262
 		RunsOn:         `["ubuntu-latest"]`,
63
+		NeedsJobs:      []string{},
6364
 		TimeoutMinutes: 30,
6465
 		Permissions:    []byte(`{}`),
6566
 		JobEnv:         []byte(`{}`),
@@ -111,7 +112,7 @@ func TestRefreshActionsPublishesQueueRunnerAndStorageGauges(t *testing.T) {
111112
 	if err != nil {
112113
 		t.Fatalf("InsertRunner: %v", err)
113114
 	}
114
-	if _, err := pool.Exec(ctx, `UPDATE workflow_runners SET status = 'busy', last_heartbeat_at = now() - interval '75 seconds' WHERE id = $1`, runner.ID); err != nil {
115
+	if _, err := pool.Exec(ctx, `UPDATE workflow_runners SET status = 'busy', last_heartbeat_at = now() - interval '75 seconds', draining_at = now(), drain_reason = 'maintenance' WHERE id = $1`, runner.ID); err != nil {
115116
 		t.Fatalf("touch runner heartbeat: %v", err)
116117
 	}
117118
 
@@ -120,9 +121,13 @@ func TestRefreshActionsPublishesQueueRunnerAndStorageGauges(t *testing.T) {
120121
 
121122
 	assertGauge(t, ActionsQueueDepth, []string{"runs"}, 1)
122123
 	assertGauge(t, ActionsQueueDepth, []string{"jobs"}, 1)
124
+	assertGauge(t, ActionsQueueDepthByLabels, []string{`["ubuntu-latest"]`}, 1)
123125
 	assertGauge(t, ActionsActive, []string{"runs"}, 0)
124126
 	assertGauge(t, ActionsActive, []string{"jobs"}, 0)
125127
 	assertGauge(t, ActionsRunnerCapacity, []string{"runner-a", "busy"}, 3)
128
+	assertGauge(t, ActionsRunnerOnline, []string{"runner-a"}, 0)
129
+	assertGauge(t, ActionsRunnerDraining, []string{"runner-a"}, 1)
130
+	assertPlainGauge(t, ActionsRunnerStaleTotal, 1)
126131
 	if got := gaugeValue(t, ActionsRunnerHeartbeatAgeSeconds, []string{"runner-a", "busy"}); got < 60 {
127132
 		t.Fatalf("runner heartbeat age = %v, want >= 60", got)
128133
 	}
@@ -140,8 +145,12 @@ type labeledGauge interface {
140145
 
141146
 func resetActionsObserverGauges() {
142147
 	ActionsQueueDepth.Reset()
148
+	ActionsQueueDepthByLabels.Reset()
143149
 	ActionsActive.Reset()
144150
 	ActionsRunnerHeartbeatAgeSeconds.Reset()
151
+	ActionsRunnerOnline.Reset()
152
+	ActionsRunnerDraining.Reset()
153
+	ActionsRunnerStaleTotal.Set(0)
145154
 	ActionsRunnerCapacity.Reset()
146155
 	ActionsStorageObjects.Reset()
147156
 	ActionsStorageBytes.Reset()
@@ -165,3 +174,17 @@ func gaugeValue(t *testing.T, vec labeledGauge, labels []string) float64 {
165174
 	}
166175
 	return metric.Gauge.GetValue()
167176
 }
177
+
178
+func assertPlainGauge(t *testing.T, gauge prometheus.Gauge, want float64) {
179
+	t.Helper()
180
+	var metric dto.Metric
181
+	if err := gauge.Write(&metric); err != nil {
182
+		t.Fatalf("read gauge: %v", err)
183
+	}
184
+	if metric.Gauge == nil {
185
+		t.Fatal("gauge missing")
186
+	}
187
+	if got := metric.Gauge.GetValue(); got != want {
188
+		t.Fatalf("gauge = %v, want %v", got, want)
189
+	}
190
+}
internal/infra/metrics/metrics.gomodified
@@ -138,7 +138,7 @@ var (
138138
 	ActionsRunnerHeartbeatsTotal = prometheus.NewCounterVec(
139139
 		prometheus.CounterOpts{
140140
 			Name: "shithub_actions_runner_heartbeats_total",
141
-			Help: "Total runner heartbeats by result (claimed, no_job).",
141
+			Help: "Total runner heartbeats by result (claimed, no_job, rejected).",
142142
 		},
143143
 		[]string{"result"},
144144
 	)
@@ -225,6 +225,13 @@ var (
225225
 		},
226226
 		[]string{"resource"},
227227
 	)
228
+	ActionsQueueDepthByLabels = prometheus.NewGaugeVec(
229
+		prometheus.GaugeOpts{
230
+			Name: "shithub_actions_queue_depth_by_labels",
231
+			Help: "Current queued Actions jobs by exact runs-on label expression.",
232
+		},
233
+		[]string{"labels"},
234
+	)
228235
 	ActionsActive = prometheus.NewGaugeVec(
229236
 		prometheus.GaugeOpts{
230237
 			Name: "shithub_actions_active",
@@ -232,6 +239,13 @@ var (
232239
 		},
233240
 		[]string{"resource"},
234241
 	)
242
+	ActionsJobClaimLatencySeconds = prometheus.NewHistogram(
243
+		prometheus.HistogramOpts{
244
+			Name:    "shithub_actions_job_claim_latency_seconds",
245
+			Help:    "Seconds between job enqueue and runner claim.",
246
+			Buckets: prometheus.ExponentialBuckets(0.1, 2.5, 12),
247
+		},
248
+	)
235249
 	ActionsRunnerHeartbeatAgeSeconds = prometheus.NewGaugeVec(
236250
 		prometheus.GaugeOpts{
237251
 			Name: "shithub_actions_runner_heartbeat_age_seconds",
@@ -239,6 +253,26 @@ var (
239253
 		},
240254
 		[]string{"runner", "status"},
241255
 	)
256
+	ActionsRunnerOnline = prometheus.NewGaugeVec(
257
+		prometheus.GaugeOpts{
258
+			Name: "shithub_actions_runner_online",
259
+			Help: "Current runner online state by runner (1 online, 0 unavailable).",
260
+		},
261
+		[]string{"runner"},
262
+	)
263
+	ActionsRunnerStaleTotal = prometheus.NewGauge(
264
+		prometheus.GaugeOpts{
265
+			Name: "shithub_actions_runner_stale_total",
266
+			Help: "Current count of non-revoked runners whose heartbeat is past the stale threshold.",
267
+		},
268
+	)
269
+	ActionsRunnerDraining = prometheus.NewGaugeVec(
270
+		prometheus.GaugeOpts{
271
+			Name: "shithub_actions_runner_draining",
272
+			Help: "Current runner drain state by runner (1 draining, 0 not draining).",
273
+		},
274
+		[]string{"runner"},
275
+	)
242276
 	ActionsRunnerCapacity = prometheus.NewGaugeVec(
243277
 		prometheus.GaugeOpts{
244278
 			Name: "shithub_actions_runner_capacity",
@@ -246,6 +280,12 @@ var (
246280
 		},
247281
 		[]string{"runner", "status"},
248282
 	)
283
+	ActionsRunnerRevocationsTotal = prometheus.NewCounter(
284
+		prometheus.CounterOpts{
285
+			Name: "shithub_actions_runner_revocations_total",
286
+			Help: "Total Actions runner hard revocations performed by operator tooling.",
287
+		},
288
+	)
249289
 	ActionsStorageObjects = prometheus.NewGaugeVec(
250290
 		prometheus.GaugeOpts{
251291
 			Name: "shithub_actions_storage_objects",
@@ -291,9 +331,15 @@ func init() {
291331
 		ActionsRunsPrunedTotal,
292332
 		ActionsStepTimeoutsTotal,
293333
 		ActionsQueueDepth,
334
+		ActionsQueueDepthByLabels,
294335
 		ActionsActive,
336
+		ActionsJobClaimLatencySeconds,
295337
 		ActionsRunnerHeartbeatAgeSeconds,
338
+		ActionsRunnerOnline,
339
+		ActionsRunnerStaleTotal,
340
+		ActionsRunnerDraining,
296341
 		ActionsRunnerCapacity,
342
+		ActionsRunnerRevocationsTotal,
297343
 		ActionsStorageObjects,
298344
 		ActionsStorageBytes,
299345
 	)