Go · 25660 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package main
4
5 import (
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "strconv"
12 "strings"
13 "text/tabwriter"
14 "time"
15
16 "github.com/jackc/pgx/v5"
17 "github.com/jackc/pgx/v5/pgtype"
18 "github.com/jackc/pgx/v5/pgxpool"
19 "github.com/spf13/cobra"
20
21 "github.com/tenseleyFlow/shithub/internal/actions/runnerlabels"
22 "github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
23 actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
24 "github.com/tenseleyFlow/shithub/internal/infra/config"
25 "github.com/tenseleyFlow/shithub/internal/infra/db"
26 "github.com/tenseleyFlow/shithub/internal/infra/metrics"
27 )
28
29 func newAdminRunnerCmd() *cobra.Command {
30 cmd := &cobra.Command{
31 Use: "runner",
32 Short: "Register and operate Actions runners",
33 }
34 cmd.AddCommand(newAdminRunnerRegisterCmd())
35 cmd.AddCommand(newAdminRunnerListCmd())
36 cmd.AddCommand(newAdminRunnerQueueCmd())
37 cmd.AddCommand(newAdminRunnerDrainCmd())
38 cmd.AddCommand(newAdminRunnerUndrainCmd())
39 cmd.AddCommand(newAdminRunnerRotateTokenCmd())
40 cmd.AddCommand(newAdminRunnerRevokeCmd())
41 cmd.AddCommand(newAdminRunnerCleanupStaleCmd())
42 return cmd
43 }
44
45 func newAdminRunnerRegisterCmd() *cobra.Command {
46 var name string
47 var labelsRaw string
48 var capacity int
49 var output string
50 var expiresIn time.Duration
51 cmd := &cobra.Command{
52 Use: "register --name <name> [--labels self-hosted,linux,ubuntu-latest,x64] [--capacity 1]",
53 Short: "Register an Actions runner and print its token once",
54 RunE: func(cmd *cobra.Command, _ []string) error {
55 name = strings.TrimSpace(name)
56 if name == "" {
57 return errors.New("admin runner register: --name is required")
58 }
59 labels, err := parseRunnerLabels(labelsRaw)
60 if err != nil {
61 return err
62 }
63 if capacity < 1 || capacity > 64 {
64 return errors.New("admin runner register: --capacity must be between 1 and 64")
65 }
66 output = strings.ToLower(strings.TrimSpace(output))
67 switch output {
68 case "", "text":
69 output = "text"
70 case "json":
71 default:
72 return errors.New("admin runner register: --output must be text or json")
73 }
74 if expiresIn < 0 {
75 return errors.New("admin runner register: --expires-in must be non-negative")
76 }
77
78 cfg, err := config.Load(nil)
79 if err != nil {
80 return err
81 }
82 ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
83 defer cancel()
84 pool, err := openAdminRunnerPool(ctx, cfg, "register")
85 if err != nil {
86 return err
87 }
88 defer pool.Close()
89
90 token, tokenHash, err := runnertoken.New()
91 if err != nil {
92 return fmt.Errorf("admin runner register: mint token: %w", err)
93 }
94 var expiresAt pgtype.Timestamptz
95 var outputExpiresAt *time.Time
96 if expiresIn > 0 {
97 t := time.Now().UTC().Add(expiresIn)
98 expiresAt = pgtype.Timestamptz{Time: t, Valid: true}
99 outputExpiresAt = &t
100 }
101
102 q := actionsdb.New()
103 tx, err := pool.Begin(ctx)
104 if err != nil {
105 return fmt.Errorf("admin runner register: begin: %w", err)
106 }
107 committed := false
108 defer func() {
109 if !committed {
110 _ = tx.Rollback(ctx)
111 }
112 }()
113
114 runner, err := q.InsertRunner(ctx, tx, actionsdb.InsertRunnerParams{
115 Name: name,
116 Labels: labels,
117 Capacity: int32(capacity),
118 RegisteredByUserID: pgtype.Int8{},
119 })
120 if err != nil {
121 return fmt.Errorf("admin runner register: insert runner: %w", err)
122 }
123 if _, err := q.InsertRunnerToken(ctx, tx, actionsdb.InsertRunnerTokenParams{
124 RunnerID: runner.ID,
125 TokenHash: tokenHash,
126 ExpiresAt: expiresAt,
127 }); err != nil {
128 return fmt.Errorf("admin runner register: insert token: %w", err)
129 }
130 if err := tx.Commit(ctx); err != nil {
131 return fmt.Errorf("admin runner register: commit: %w", err)
132 }
133 committed = true
134 metrics.ActionsRunnerRegistrationsTotal.Inc()
135
136 return writeRunnerRegisterOutput(cmd.OutOrStdout(), output, runnerRegisterOutput{
137 ID: runner.ID,
138 Name: runner.Name,
139 Labels: runner.Labels,
140 Capacity: runner.Capacity,
141 Token: token,
142 TokenExpiresAt: outputExpiresAt,
143 })
144 },
145 }
146 cmd.Flags().StringVar(&name, "name", "", "Runner name (letters, numbers, underscore, dash)")
147 cmd.Flags().StringVar(&labelsRaw, "labels", strings.Join(runnerlabels.DefaultShared(), ","), "Comma-separated runner labels")
148 cmd.Flags().IntVar(&capacity, "capacity", 1, "Maximum concurrent jobs this runner may execute")
149 cmd.Flags().DurationVar(&expiresIn, "expires-in", 0, "Registration token lifetime (0 means no expiration)")
150 cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
151 return cmd
152 }
153
154 type runnerRegisterOutput struct {
155 ID int64 `json:"id"`
156 Name string `json:"name"`
157 Labels []string `json:"labels"`
158 Capacity int32 `json:"capacity"`
159 Token string `json:"token"`
160 TokenExpiresAt *time.Time `json:"token_expires_at,omitempty"`
161 }
162
163 func writeRunnerRegisterOutput(w io.Writer, format string, out runnerRegisterOutput) error {
164 return writeRunnerTokenOutput(w, format, "runner registered", out)
165 }
166
167 func writeRunnerTokenOutput(w io.Writer, format, heading string, out runnerRegisterOutput) error {
168 if format == "json" {
169 enc := json.NewEncoder(w)
170 enc.SetIndent("", " ")
171 return enc.Encode(out)
172 }
173 expires := "never"
174 if out.TokenExpiresAt != nil {
175 expires = out.TokenExpiresAt.Format(time.RFC3339)
176 }
177 _, err := fmt.Fprintf(w,
178 "%s\nid: %d\nname: %s\nlabels: %s\ncapacity: %d\ntoken_expires_at: %s\ntoken: %s\n\nStore this token now; shithub never shows it again.\n",
179 heading, out.ID, out.Name, strings.Join(out.Labels, ","), out.Capacity, expires, out.Token)
180 return err
181 }
182
183 func newAdminRunnerListCmd() *cobra.Command {
184 var output string
185 cmd := &cobra.Command{
186 Use: "list",
187 Short: "List registered Actions runners",
188 RunE: func(cmd *cobra.Command, _ []string) error {
189 var err error
190 output, err = normalizeRunnerOutput("admin runner list", output)
191 if err != nil {
192 return err
193 }
194 cfg, err := config.Load(nil)
195 if err != nil {
196 return err
197 }
198 ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
199 defer cancel()
200 pool, err := openAdminRunnerPool(ctx, cfg, "list")
201 if err != nil {
202 return err
203 }
204 defer pool.Close()
205
206 rows, err := actionsdb.New().ListRunners(ctx, pool)
207 if err != nil {
208 return fmt.Errorf("admin runner list: %w", err)
209 }
210 return writeRunnerListOutput(cmd.OutOrStdout(), output, rows, time.Now().UTC())
211 },
212 }
213 cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
214 return cmd
215 }
216
217 type runnerListOutputRow struct {
218 ID int64 `json:"id"`
219 Name string `json:"name"`
220 Status string `json:"status"`
221 Capacity int32 `json:"capacity"`
222 ActiveJobCount int32 `json:"active_job_count"`
223 Labels []string `json:"labels"`
224 HostName string `json:"host_name,omitempty"`
225 Version string `json:"version,omitempty"`
226 LastHeartbeatAt string `json:"last_heartbeat_at,omitempty"`
227 LastHeartbeatAgeSeconds int64 `json:"last_heartbeat_age_seconds,omitempty"`
228 DrainingAt string `json:"draining_at,omitempty"`
229 DrainReason string `json:"drain_reason,omitempty"`
230 RevokedAt string `json:"revoked_at,omitempty"`
231 RevokedReason string `json:"revoked_reason,omitempty"`
232 CreatedAt string `json:"created_at,omitempty"`
233 }
234
235 func writeRunnerListOutput(w io.Writer, format string, rows []actionsdb.ListRunnersRow, now time.Time) error {
236 out := make([]runnerListOutputRow, 0, len(rows))
237 for _, row := range rows {
238 item := runnerListOutputRow{
239 ID: row.ID,
240 Name: row.Name,
241 Status: string(row.Status),
242 Capacity: row.Capacity,
243 ActiveJobCount: row.ActiveJobCount,
244 Labels: append([]string{}, row.Labels...),
245 HostName: row.HostName,
246 Version: row.Version,
247 }
248 if row.LastHeartbeatAt.Valid {
249 item.LastHeartbeatAt = row.LastHeartbeatAt.Time.UTC().Format(time.RFC3339)
250 if d := now.Sub(row.LastHeartbeatAt.Time); d > 0 {
251 item.LastHeartbeatAgeSeconds = int64(d.Seconds())
252 }
253 }
254 if row.DrainingAt.Valid {
255 item.DrainingAt = row.DrainingAt.Time.UTC().Format(time.RFC3339)
256 item.DrainReason = row.DrainReason
257 }
258 if row.RevokedAt.Valid {
259 item.RevokedAt = row.RevokedAt.Time.UTC().Format(time.RFC3339)
260 item.RevokedReason = row.RevokedReason
261 }
262 if row.CreatedAt.Valid {
263 item.CreatedAt = row.CreatedAt.Time.UTC().Format(time.RFC3339)
264 }
265 out = append(out, item)
266 }
267 if format == "json" {
268 enc := json.NewEncoder(w)
269 enc.SetIndent("", " ")
270 return enc.Encode(out)
271 }
272
273 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
274 _, _ = fmt.Fprintln(tw, "ID\tNAME\tSTATUS\tCAPACITY\tACTIVE\tLABELS\tHOST\tVERSION\tLAST_HEARTBEAT\tDRAINING\tREVOKED")
275 for _, row := range out {
276 last := "never"
277 if row.LastHeartbeatAt != "" {
278 last = row.LastHeartbeatAt
279 }
280 draining := "-"
281 if row.DrainingAt != "" {
282 draining = row.DrainingAt
283 }
284 revoked := "-"
285 if row.RevokedAt != "" {
286 revoked = row.RevokedAt
287 }
288 _, _ = fmt.Fprintf(tw, "%d\t%s\t%s\t%d\t%d\t%s\t%s\t%s\t%s\t%s\t%s\n",
289 row.ID, row.Name, row.Status, row.Capacity, row.ActiveJobCount, strings.Join(row.Labels, ","),
290 emptyDash(row.HostName), emptyDash(row.Version), last, draining, revoked)
291 }
292 return tw.Flush()
293 }
294
295 func newAdminRunnerQueueCmd() *cobra.Command {
296 var output string
297 cmd := &cobra.Command{
298 Use: "queue",
299 Short: "Summarize queued Actions jobs by runs-on label",
300 RunE: func(cmd *cobra.Command, _ []string) error {
301 output = strings.ToLower(strings.TrimSpace(output))
302 switch output {
303 case "", "text":
304 output = "text"
305 case "json":
306 default:
307 return errors.New("admin runner queue: --output must be text or json")
308 }
309 cfg, err := config.Load(nil)
310 if err != nil {
311 return err
312 }
313 ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
314 defer cancel()
315 pool, err := openAdminRunnerPool(ctx, cfg, "queue")
316 if err != nil {
317 return err
318 }
319 defer pool.Close()
320
321 rows, err := actionsdb.New().ListQueuedWorkflowJobRunsOn(ctx, pool)
322 if err != nil {
323 return fmt.Errorf("admin runner queue: %w", err)
324 }
325 return writeRunnerQueueOutput(cmd.OutOrStdout(), output, rows, time.Now().UTC())
326 },
327 }
328 cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
329 return cmd
330 }
331
332 type runnerQueueOutputRow struct {
333 RunsOn string `json:"runs_on"`
334 QueuedJobs int32 `json:"queued_jobs"`
335 MatchingRunnerCount int32 `json:"matching_runner_count"`
336 OldestQueuedAt string `json:"oldest_queued_at,omitempty"`
337 OldestQueuedSeconds int64 `json:"oldest_queued_seconds,omitempty"`
338 }
339
340 func writeRunnerQueueOutput(w io.Writer, format string, rows []actionsdb.ListQueuedWorkflowJobRunsOnRow, now time.Time) error {
341 out := make([]runnerQueueOutputRow, 0, len(rows))
342 for _, row := range rows {
343 item := runnerQueueOutputRow{
344 RunsOn: row.RunsOn,
345 QueuedJobs: row.QueuedJobs,
346 MatchingRunnerCount: row.MatchingRunnerCount,
347 }
348 if row.OldestQueuedAt.Valid {
349 item.OldestQueuedAt = row.OldestQueuedAt.Time.UTC().Format(time.RFC3339)
350 if d := now.Sub(row.OldestQueuedAt.Time); d > 0 {
351 item.OldestQueuedSeconds = int64(d.Seconds())
352 }
353 }
354 out = append(out, item)
355 }
356 if format == "json" {
357 enc := json.NewEncoder(w)
358 enc.SetIndent("", " ")
359 return enc.Encode(out)
360 }
361
362 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
363 _, _ = fmt.Fprintln(tw, "RUNS_ON\tQUEUED_JOBS\tMATCHING_RUNNERS\tOLDEST_QUEUED")
364 for _, row := range out {
365 oldest := "-"
366 if row.OldestQueuedAt != "" {
367 oldest = row.OldestQueuedAt
368 }
369 _, _ = fmt.Fprintf(tw, "%s\t%d\t%d\t%s\n", row.RunsOn, row.QueuedJobs, row.MatchingRunnerCount, oldest)
370 }
371 return tw.Flush()
372 }
373
374 func newAdminRunnerDrainCmd() *cobra.Command {
375 var idRaw string
376 var reason string
377 var output string
378 cmd := &cobra.Command{
379 Use: "drain --id <id> [--reason <text>]",
380 Short: "Stop an Actions runner from claiming new jobs",
381 RunE: func(cmd *cobra.Command, _ []string) error {
382 id, err := parseRunnerID("admin runner drain", idRaw)
383 if err != nil {
384 return err
385 }
386 output, err = normalizeRunnerOutput("admin runner drain", output)
387 if err != nil {
388 return err
389 }
390 reason, err = normalizeRunnerReason(reason, "operator requested drain")
391 if err != nil {
392 return err
393 }
394 cfg, err := config.Load(nil)
395 if err != nil {
396 return err
397 }
398 ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
399 defer cancel()
400 pool, err := openAdminRunnerPool(ctx, cfg, "drain")
401 if err != nil {
402 return err
403 }
404 defer pool.Close()
405
406 row, err := actionsdb.New().SetRunnerDraining(ctx, pool, actionsdb.SetRunnerDrainingParams{
407 ID: id,
408 DrainReason: reason,
409 })
410 if err != nil {
411 if errors.Is(err, pgx.ErrNoRows) {
412 return fmt.Errorf("admin runner drain: runner %d not found or already revoked", id)
413 }
414 return fmt.Errorf("admin runner drain: %w", err)
415 }
416 return writeRunnerStateOutput(cmd.OutOrStdout(), output, "runner draining", runnerStateOutput{
417 ID: row.ID,
418 Name: row.Name,
419 Status: string(row.Status),
420 DrainingAt: formatOptionalTime(row.DrainingAt),
421 DrainReason: row.DrainReason,
422 RevokedAt: formatOptionalTime(row.RevokedAt),
423 })
424 },
425 }
426 cmd.Flags().StringVar(&idRaw, "id", "", "Runner id")
427 cmd.Flags().StringVar(&reason, "reason", "", "Drain reason recorded for operators")
428 cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
429 return cmd
430 }
431
432 func newAdminRunnerUndrainCmd() *cobra.Command {
433 var idRaw string
434 var output string
435 cmd := &cobra.Command{
436 Use: "undrain --id <id>",
437 Short: "Allow a drained Actions runner to claim jobs again",
438 RunE: func(cmd *cobra.Command, _ []string) error {
439 id, err := parseRunnerID("admin runner undrain", idRaw)
440 if err != nil {
441 return err
442 }
443 output, err = normalizeRunnerOutput("admin runner undrain", output)
444 if err != nil {
445 return err
446 }
447 cfg, err := config.Load(nil)
448 if err != nil {
449 return err
450 }
451 ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
452 defer cancel()
453 pool, err := openAdminRunnerPool(ctx, cfg, "undrain")
454 if err != nil {
455 return err
456 }
457 defer pool.Close()
458
459 row, err := actionsdb.New().ClearRunnerDraining(ctx, pool, id)
460 if err != nil {
461 if errors.Is(err, pgx.ErrNoRows) {
462 return fmt.Errorf("admin runner undrain: runner %d not found or already revoked", id)
463 }
464 return fmt.Errorf("admin runner undrain: %w", err)
465 }
466 return writeRunnerStateOutput(cmd.OutOrStdout(), output, "runner undrained", runnerStateOutput{
467 ID: row.ID,
468 Name: row.Name,
469 Status: string(row.Status),
470 DrainingAt: formatOptionalTime(row.DrainingAt),
471 DrainReason: row.DrainReason,
472 RevokedAt: formatOptionalTime(row.RevokedAt),
473 })
474 },
475 }
476 cmd.Flags().StringVar(&idRaw, "id", "", "Runner id")
477 cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
478 return cmd
479 }
480
481 func newAdminRunnerRotateTokenCmd() *cobra.Command {
482 var idRaw string
483 var output string
484 var expiresIn time.Duration
485 cmd := &cobra.Command{
486 Use: "rotate-token --id <id>",
487 Short: "Revoke existing registration tokens and print one replacement token",
488 RunE: func(cmd *cobra.Command, _ []string) error {
489 id, err := parseRunnerID("admin runner rotate-token", idRaw)
490 if err != nil {
491 return err
492 }
493 output, err = normalizeRunnerOutput("admin runner rotate-token", output)
494 if err != nil {
495 return err
496 }
497 if expiresIn < 0 {
498 return errors.New("admin runner rotate-token: --expires-in must be non-negative")
499 }
500 cfg, err := config.Load(nil)
501 if err != nil {
502 return err
503 }
504 ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
505 defer cancel()
506 pool, err := openAdminRunnerPool(ctx, cfg, "rotate-token")
507 if err != nil {
508 return err
509 }
510 defer pool.Close()
511
512 token, tokenHash, err := runnertoken.New()
513 if err != nil {
514 return fmt.Errorf("admin runner rotate-token: mint token: %w", err)
515 }
516 var expiresAt pgtype.Timestamptz
517 var outputExpiresAt *time.Time
518 if expiresIn > 0 {
519 t := time.Now().UTC().Add(expiresIn)
520 expiresAt = pgtype.Timestamptz{Time: t, Valid: true}
521 outputExpiresAt = &t
522 }
523
524 q := actionsdb.New()
525 tx, err := pool.Begin(ctx)
526 if err != nil {
527 return fmt.Errorf("admin runner rotate-token: begin: %w", err)
528 }
529 committed := false
530 defer func() {
531 if !committed {
532 _ = tx.Rollback(ctx)
533 }
534 }()
535 runner, err := q.LockRunnerByID(ctx, tx, id)
536 if err != nil {
537 if errors.Is(err, pgx.ErrNoRows) {
538 return fmt.Errorf("admin runner rotate-token: runner %d not found", id)
539 }
540 return fmt.Errorf("admin runner rotate-token: lock runner: %w", err)
541 }
542 if runner.RevokedAt.Valid {
543 return fmt.Errorf("admin runner rotate-token: runner %d is revoked", id)
544 }
545 if err := q.RevokeAllTokensForRunner(ctx, tx, id); err != nil {
546 return fmt.Errorf("admin runner rotate-token: revoke old tokens: %w", err)
547 }
548 if _, err := q.InsertRunnerToken(ctx, tx, actionsdb.InsertRunnerTokenParams{
549 RunnerID: runner.ID,
550 TokenHash: tokenHash,
551 ExpiresAt: expiresAt,
552 }); err != nil {
553 return fmt.Errorf("admin runner rotate-token: insert token: %w", err)
554 }
555 if err := tx.Commit(ctx); err != nil {
556 return fmt.Errorf("admin runner rotate-token: commit: %w", err)
557 }
558 committed = true
559
560 return writeRunnerTokenOutput(cmd.OutOrStdout(), output, "runner token rotated", runnerRegisterOutput{
561 ID: runner.ID,
562 Name: runner.Name,
563 Labels: runner.Labels,
564 Capacity: runner.Capacity,
565 Token: token,
566 TokenExpiresAt: outputExpiresAt,
567 })
568 },
569 }
570 cmd.Flags().StringVar(&idRaw, "id", "", "Runner id")
571 cmd.Flags().DurationVar(&expiresIn, "expires-in", 0, "Registration token lifetime (0 means no expiration)")
572 cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
573 return cmd
574 }
575
576 func newAdminRunnerRevokeCmd() *cobra.Command {
577 var idRaw string
578 var reason string
579 var output string
580 cmd := &cobra.Command{
581 Use: "revoke --id <id>",
582 Short: "Hard-revoke an Actions runner and all registration tokens",
583 RunE: func(cmd *cobra.Command, _ []string) error {
584 id, err := parseRunnerID("admin runner revoke", idRaw)
585 if err != nil {
586 return err
587 }
588 output, err = normalizeRunnerOutput("admin runner revoke", output)
589 if err != nil {
590 return err
591 }
592 reason, err = normalizeRunnerReason(reason, "operator requested revoke")
593 if err != nil {
594 return err
595 }
596 cfg, err := config.Load(nil)
597 if err != nil {
598 return err
599 }
600 ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
601 defer cancel()
602 pool, err := openAdminRunnerPool(ctx, cfg, "revoke")
603 if err != nil {
604 return err
605 }
606 defer pool.Close()
607
608 q := actionsdb.New()
609 tx, err := pool.Begin(ctx)
610 if err != nil {
611 return fmt.Errorf("admin runner revoke: begin: %w", err)
612 }
613 committed := false
614 defer func() {
615 if !committed {
616 _ = tx.Rollback(ctx)
617 }
618 }()
619 runner, err := q.RevokeRunner(ctx, tx, actionsdb.RevokeRunnerParams{
620 ID: id,
621 RevokedReason: reason,
622 })
623 if err != nil {
624 if errors.Is(err, pgx.ErrNoRows) {
625 return fmt.Errorf("admin runner revoke: runner %d not found", id)
626 }
627 return fmt.Errorf("admin runner revoke: %w", err)
628 }
629 if err := q.RevokeAllTokensForRunner(ctx, tx, id); err != nil {
630 return fmt.Errorf("admin runner revoke: revoke tokens: %w", err)
631 }
632 if err := tx.Commit(ctx); err != nil {
633 return fmt.Errorf("admin runner revoke: commit: %w", err)
634 }
635 committed = true
636 metrics.ActionsRunnerRevocationsTotal.Inc()
637 return writeRunnerStateOutput(cmd.OutOrStdout(), output, "runner revoked", runnerStateOutput{
638 ID: runner.ID,
639 Name: runner.Name,
640 Status: string(runner.Status),
641 DrainingAt: formatOptionalTime(runner.DrainingAt),
642 DrainReason: runner.DrainReason,
643 RevokedAt: formatOptionalTime(runner.RevokedAt),
644 RevokedReason: runner.RevokedReason,
645 })
646 },
647 }
648 cmd.Flags().StringVar(&idRaw, "id", "", "Runner id")
649 cmd.Flags().StringVar(&reason, "reason", "", "Revocation reason recorded for operators")
650 cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
651 return cmd
652 }
653
654 func newAdminRunnerCleanupStaleCmd() *cobra.Command {
655 var olderThan time.Duration
656 var output string
657 cmd := &cobra.Command{
658 Use: "cleanup-stale",
659 Short: "Mark stale non-revoked runners offline",
660 RunE: func(cmd *cobra.Command, _ []string) error {
661 var err error
662 output, err = normalizeRunnerOutput("admin runner cleanup-stale", output)
663 if err != nil {
664 return err
665 }
666 if olderThan <= 0 {
667 return errors.New("admin runner cleanup-stale: --older-than must be positive")
668 }
669 cfg, err := config.Load(nil)
670 if err != nil {
671 return err
672 }
673 ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
674 defer cancel()
675 pool, err := openAdminRunnerPool(ctx, cfg, "cleanup-stale")
676 if err != nil {
677 return err
678 }
679 defer pool.Close()
680
681 cutoff := time.Now().UTC().Add(-olderThan)
682 rows, err := actionsdb.New().MarkStaleRunnersOffline(ctx, pool, pgtype.Timestamptz{Time: cutoff, Valid: true})
683 if err != nil {
684 return fmt.Errorf("admin runner cleanup-stale: %w", err)
685 }
686 return writeRunnerCleanupOutput(cmd.OutOrStdout(), output, rows)
687 },
688 }
689 cmd.Flags().DurationVar(&olderThan, "older-than", 2*time.Minute, "Heartbeat age after which non-revoked runners are marked offline")
690 cmd.Flags().StringVar(&output, "output", "text", "Output format: text or json")
691 return cmd
692 }
693
694 type runnerStateOutput struct {
695 ID int64 `json:"id"`
696 Name string `json:"name"`
697 Status string `json:"status"`
698 DrainingAt string `json:"draining_at,omitempty"`
699 DrainReason string `json:"drain_reason,omitempty"`
700 RevokedAt string `json:"revoked_at,omitempty"`
701 RevokedReason string `json:"revoked_reason,omitempty"`
702 }
703
704 func writeRunnerStateOutput(w io.Writer, format, heading string, out runnerStateOutput) error {
705 if format == "json" {
706 enc := json.NewEncoder(w)
707 enc.SetIndent("", " ")
708 return enc.Encode(out)
709 }
710 _, err := fmt.Fprintf(w, "%s\nid: %d\nname: %s\nstatus: %s\ndraining_at: %s\ndrain_reason: %s\nrevoked_at: %s\nrevoked_reason: %s\n",
711 heading, out.ID, out.Name, out.Status, emptyDash(out.DrainingAt), emptyDash(out.DrainReason),
712 emptyDash(out.RevokedAt), emptyDash(out.RevokedReason))
713 return err
714 }
715
716 type runnerCleanupOutputRow struct {
717 ID int64 `json:"id"`
718 Name string `json:"name"`
719 Status string `json:"status"`
720 LastHeartbeatAt string `json:"last_heartbeat_at,omitempty"`
721 }
722
723 func writeRunnerCleanupOutput(w io.Writer, format string, rows []actionsdb.MarkStaleRunnersOfflineRow) error {
724 out := make([]runnerCleanupOutputRow, 0, len(rows))
725 for _, row := range rows {
726 out = append(out, runnerCleanupOutputRow{
727 ID: row.ID,
728 Name: row.Name,
729 Status: string(row.Status),
730 LastHeartbeatAt: formatOptionalTime(row.LastHeartbeatAt),
731 })
732 }
733 if format == "json" {
734 enc := json.NewEncoder(w)
735 enc.SetIndent("", " ")
736 return enc.Encode(out)
737 }
738 tw := tabwriter.NewWriter(w, 0, 0, 2, ' ', 0)
739 _, _ = fmt.Fprintln(tw, "ID\tNAME\tSTATUS\tLAST_HEARTBEAT")
740 for _, row := range out {
741 _, _ = fmt.Fprintf(tw, "%d\t%s\t%s\t%s\n", row.ID, row.Name, row.Status, emptyDash(row.LastHeartbeatAt))
742 }
743 return tw.Flush()
744 }
745
746 func openAdminRunnerPool(ctx context.Context, cfg config.Config, op string) (*pgxpool.Pool, error) {
747 if cfg.DB.URL == "" {
748 return nil, fmt.Errorf("admin runner %s: DB not configured (set SHITHUB_DATABASE_URL)", op)
749 }
750 pool, err := db.Open(ctx, db.Config{
751 URL: cfg.DB.URL, MaxConns: 2, MinConns: 0,
752 ConnectTimeout: cfg.DB.ConnectTimeout,
753 })
754 if err != nil {
755 return nil, fmt.Errorf("admin runner %s: db open: %w", op, err)
756 }
757 return pool, nil
758 }
759
760 func parseRunnerLabels(raw string) ([]string, error) {
761 return runnerlabels.ParseCSV(raw)
762 }
763
764 func parseRunnerID(op, raw string) (int64, error) {
765 id, err := strconv.ParseInt(strings.TrimSpace(raw), 10, 64)
766 if err != nil || id <= 0 {
767 return 0, fmt.Errorf("%s: --id must be a positive integer", op)
768 }
769 return id, nil
770 }
771
772 func normalizeRunnerOutput(op, output string) (string, error) {
773 output = strings.ToLower(strings.TrimSpace(output))
774 switch output {
775 case "", "text":
776 return "text", nil
777 case "json":
778 return "json", nil
779 default:
780 return "", fmt.Errorf("%s: --output must be text or json", op)
781 }
782 }
783
784 func normalizeRunnerReason(reason, fallback string) (string, error) {
785 reason = strings.TrimSpace(reason)
786 if reason == "" {
787 reason = fallback
788 }
789 if len(reason) > 1000 {
790 return "", errors.New("runner reason must be 1000 bytes or fewer")
791 }
792 return reason, nil
793 }
794
795 func formatOptionalTime(t pgtype.Timestamptz) string {
796 if !t.Valid {
797 return ""
798 }
799 return t.Time.UTC().Format(time.RFC3339)
800 }
801
802 func emptyDash(value string) string {
803 if strings.TrimSpace(value) == "" {
804 return "-"
805 }
806 return value
807 }
808
809 func init() {
810 adminCmd.AddCommand(newAdminRunnerCmd())
811 adminActionsCmd.AddCommand(newAdminRunnerCmd())
812 }
813