tenseleyflow/shithub / 732831b

Browse files

runner: add claim loop and binary

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
732831b01bfa53a951e9ba0fb47e948ce35ec1a4
Parents
7acb840
Tree
f1d77a1

6 changed files

StatusFile+-
A cmd/shithubd-runner/main.go 8 0
A cmd/shithubd-runner/root.go 29 0
A cmd/shithubd-runner/run.go 121 0
A cmd/shithubd-runner/version.go 24 0
A internal/runner/runner.go 226 0
A internal/runner/runner_test.go 167 0
cmd/shithubd-runner/main.goadded
@@ -0,0 +1,8 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Command shithubd-runner executes shithub Actions workflow jobs.
4
+package main
5
+
6
+func main() {
7
+	Execute()
8
+}
cmd/shithubd-runner/root.goadded
@@ -0,0 +1,29 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package main
4
+
5
+import (
6
+	"fmt"
7
+	"os"
8
+
9
+	"github.com/spf13/cobra"
10
+)
11
+
12
+var rootCmd = &cobra.Command{
13
+	Use:           "shithubd-runner",
14
+	Short:         "Run shithub Actions jobs on a Docker or Podman host",
15
+	SilenceUsage:  true,
16
+	SilenceErrors: true,
17
+}
18
+
19
+func Execute() {
20
+	if err := rootCmd.Execute(); err != nil {
21
+		fmt.Fprintln(os.Stderr, "shithubd-runner:", err)
22
+		os.Exit(1)
23
+	}
24
+}
25
+
26
+func init() {
27
+	rootCmd.AddCommand(runCmd)
28
+	rootCmd.AddCommand(versionCmd)
29
+}
cmd/shithubd-runner/run.goadded
@@ -0,0 +1,121 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package main
4
+
5
+import (
6
+	"os"
7
+	"time"
8
+
9
+	"github.com/spf13/cobra"
10
+	"github.com/spf13/pflag"
11
+
12
+	infralog "github.com/tenseleyFlow/shithub/internal/infra/log"
13
+	runnerpkg "github.com/tenseleyFlow/shithub/internal/runner"
14
+	runnerapi "github.com/tenseleyFlow/shithub/internal/runner/api"
15
+	runnerconfig "github.com/tenseleyFlow/shithub/internal/runner/config"
16
+	"github.com/tenseleyFlow/shithub/internal/runner/engine"
17
+	"github.com/tenseleyFlow/shithub/internal/runner/workspace"
18
+)
19
+
20
+var runConfigPath string
21
+
22
+var runCmd = &cobra.Command{
23
+	Use:   "run",
24
+	Short: "Claim and execute shithub Actions jobs",
25
+	RunE: func(cmd *cobra.Command, _ []string) error {
26
+		cfg, err := runnerconfig.Load(runnerconfig.LoadOptions{
27
+			ConfigPath: runConfigPath,
28
+			Overrides:  flagOverrides(cmd),
29
+		})
30
+		if err != nil {
31
+			return err
32
+		}
33
+		logger := infralog.New(infralog.Options{
34
+			Level:  cfg.Log.Level,
35
+			Format: cfg.Log.Format,
36
+			Writer: os.Stderr,
37
+		}).With("component", "runner")
38
+
39
+		workspaces := workspace.New(cfg.Runner.WorkspaceRoot)
40
+		removed, err := workspaces.Sweep(cfg.Runner.WorkspaceTTL, time.Now().UTC())
41
+		if err != nil {
42
+			return err
43
+		}
44
+		if removed > 0 {
45
+			logger.InfoContext(cmd.Context(), "swept stale workspaces", "count", removed)
46
+		}
47
+
48
+		client, err := runnerapi.New(runnerapi.Config{
49
+			BaseURL:     cfg.Server.BaseURL,
50
+			RunnerToken: cfg.Runner.Token,
51
+		})
52
+		if err != nil {
53
+			return err
54
+		}
55
+		execEngine := engine.NewDocker(engine.DockerConfig{
56
+			Binary:       cfg.Engine.Kind,
57
+			DefaultImage: cfg.Engine.DefaultImage,
58
+			Network:      cfg.Engine.Network,
59
+			Memory:       cfg.Engine.Memory,
60
+			CPUs:         cfg.Engine.CPUs,
61
+			Stdout:       os.Stdout,
62
+			Stderr:       os.Stderr,
63
+		})
64
+		r := runnerpkg.New(runnerpkg.Options{
65
+			API:          client,
66
+			Engine:       execEngine,
67
+			Workspaces:   workspaces,
68
+			Logger:       logger,
69
+			Labels:       cfg.Runner.Labels,
70
+			Capacity:     cfg.Runner.Capacity,
71
+			PollInterval: cfg.Runner.PollInterval,
72
+			DefaultImage: cfg.Engine.DefaultImage,
73
+			Clock:        func() time.Time { return time.Now().UTC() },
74
+		})
75
+		return r.Run(cmd.Context())
76
+	},
77
+}
78
+
79
+func init() {
80
+	runCmd.Flags().StringVar(&runConfigPath, "config", "", "Path to runner config file")
81
+	runCmd.Flags().String("server-url", "", "shithub base URL")
82
+	runCmd.Flags().String("token", "", "Runner registration token")
83
+	runCmd.Flags().String("labels", "", "Comma-separated runner labels")
84
+	runCmd.Flags().Int("capacity", 0, "Maximum concurrent jobs this runner advertises")
85
+	runCmd.Flags().Duration("poll-interval", 0, "Idle heartbeat interval")
86
+	runCmd.Flags().String("workspace-root", "", "Workspace root directory")
87
+	runCmd.Flags().Duration("workspace-ttl", 0, "Startup sweep TTL for stale workspaces")
88
+	runCmd.Flags().String("engine", "", "Execution engine: docker or podman")
89
+	runCmd.Flags().String("image", "", "Default container image")
90
+	runCmd.Flags().String("network", "", "Container network")
91
+	runCmd.Flags().String("memory", "", "Container memory limit")
92
+	runCmd.Flags().String("cpus", "", "Container CPU limit")
93
+	runCmd.Flags().String("log-level", "", "Log level: debug, info, warn, error")
94
+	runCmd.Flags().String("log-format", "", "Log format: text or json")
95
+}
96
+
97
+func flagOverrides(cmd *cobra.Command) map[string]string {
98
+	keys := map[string]string{
99
+		"server-url":     "server.base_url",
100
+		"token":          "runner.token",
101
+		"labels":         "runner.labels",
102
+		"capacity":       "runner.capacity",
103
+		"poll-interval":  "runner.poll_interval",
104
+		"workspace-root": "runner.workspace_root",
105
+		"workspace-ttl":  "runner.workspace_ttl",
106
+		"engine":         "engine.kind",
107
+		"image":          "engine.default_image",
108
+		"network":        "engine.network",
109
+		"memory":         "engine.memory",
110
+		"cpus":           "engine.cpus",
111
+		"log-level":      "log.level",
112
+		"log-format":     "log.format",
113
+	}
114
+	out := make(map[string]string)
115
+	cmd.Flags().Visit(func(f *pflag.Flag) {
116
+		if key, ok := keys[f.Name]; ok {
117
+			out[key] = f.Value.String()
118
+		}
119
+	})
120
+	return out
121
+}
cmd/shithubd-runner/version.goadded
@@ -0,0 +1,24 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package main
4
+
5
+import (
6
+	"fmt"
7
+	"runtime"
8
+
9
+	"github.com/spf13/cobra"
10
+
11
+	"github.com/tenseleyFlow/shithub/internal/version"
12
+)
13
+
14
+var versionCmd = &cobra.Command{
15
+	Use:   "version",
16
+	Short: "Print version, commit, build time, and Go runtime",
17
+	Run: func(_ *cobra.Command, _ []string) {
18
+		fmt.Printf("shithubd-runner %s\n", version.Version)
19
+		fmt.Printf("  commit: %s\n", version.Commit)
20
+		fmt.Printf("  built:  %s\n", version.BuiltAt)
21
+		fmt.Printf("  go:     %s\n", runtime.Version())
22
+		fmt.Printf("  os:     %s/%s\n", runtime.GOOS, runtime.GOARCH)
23
+	},
24
+}
internal/runner/runner.goadded
@@ -0,0 +1,226 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package runner orchestrates the shithubd-runner claim/execute/status loop.
4
+package runner
5
+
6
+import (
7
+	"context"
8
+	"errors"
9
+	"fmt"
10
+	"io"
11
+	"log/slog"
12
+	"time"
13
+
14
+	"github.com/tenseleyFlow/shithub/internal/runner/api"
15
+	"github.com/tenseleyFlow/shithub/internal/runner/engine"
16
+)
17
+
18
+type API interface {
19
+	Heartbeat(ctx context.Context, req api.HeartbeatRequest) (*api.Claim, error)
20
+	UpdateStatus(ctx context.Context, jobID int64, token string, req api.StatusRequest) (api.StatusResponse, error)
21
+}
22
+
23
+type Workspaces interface {
24
+	Prepare(runID, jobID int64) (string, error)
25
+	Remove(runID, jobID int64) error
26
+}
27
+
28
+type SleepFunc func(ctx context.Context, d time.Duration) error
29
+
30
+type Options struct {
31
+	API          API
32
+	Engine       engine.Engine
33
+	Workspaces   Workspaces
34
+	Logger       *slog.Logger
35
+	Labels       []string
36
+	Capacity     int
37
+	PollInterval time.Duration
38
+	DefaultImage string
39
+	Clock        func() time.Time
40
+	Sleep        SleepFunc
41
+}
42
+
43
+type Runner struct {
44
+	api          API
45
+	engine       engine.Engine
46
+	workspaces   Workspaces
47
+	logger       *slog.Logger
48
+	labels       []string
49
+	capacity     int
50
+	pollInterval time.Duration
51
+	defaultImage string
52
+	clock        func() time.Time
53
+	sleep        SleepFunc
54
+}
55
+
56
+func New(opts Options) *Runner {
57
+	logger := opts.Logger
58
+	if logger == nil {
59
+		logger = slog.New(slog.NewTextHandler(io.Discard, nil))
60
+	}
61
+	clock := opts.Clock
62
+	if clock == nil {
63
+		clock = func() time.Time { return time.Now().UTC() }
64
+	}
65
+	sleep := opts.Sleep
66
+	if sleep == nil {
67
+		sleep = defaultSleep
68
+	}
69
+	poll := opts.PollInterval
70
+	if poll <= 0 {
71
+		poll = 5 * time.Second
72
+	}
73
+	capacity := opts.Capacity
74
+	if capacity <= 0 {
75
+		capacity = 1
76
+	}
77
+	return &Runner{
78
+		api:          opts.API,
79
+		engine:       opts.Engine,
80
+		workspaces:   opts.Workspaces,
81
+		logger:       logger,
82
+		labels:       append([]string{}, opts.Labels...),
83
+		capacity:     capacity,
84
+		pollInterval: poll,
85
+		defaultImage: opts.DefaultImage,
86
+		clock:        clock,
87
+		sleep:        sleep,
88
+	}
89
+}
90
+
91
+func (r *Runner) Run(ctx context.Context) error {
92
+	for {
93
+		claimed, err := r.RunOnce(ctx)
94
+		if err != nil {
95
+			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
96
+				return err
97
+			}
98
+			r.logger.ErrorContext(ctx, "runner loop iteration failed", "error", err)
99
+		}
100
+		if claimed {
101
+			continue
102
+		}
103
+		if err := r.sleep(ctx, r.pollInterval); err != nil {
104
+			return err
105
+		}
106
+	}
107
+}
108
+
109
+func (r *Runner) RunOnce(ctx context.Context) (bool, error) {
110
+	claim, err := r.api.Heartbeat(ctx, api.HeartbeatRequest{Labels: r.labels, Capacity: r.capacity})
111
+	if err != nil {
112
+		return false, err
113
+	}
114
+	if claim == nil {
115
+		return false, nil
116
+	}
117
+	token := claim.Token
118
+	started := r.clock()
119
+	workspaceDir, err := r.workspaces.Prepare(claim.Job.RunID, claim.Job.ID)
120
+	if err != nil {
121
+		statusErr := r.complete(ctx, claim.Job.ID, token, engine.ConclusionFailure, started, r.clock())
122
+		return true, errors.Join(fmt.Errorf("prepare workspace: %w", err), statusErr)
123
+	}
124
+	defer func() {
125
+		if err := r.workspaces.Remove(claim.Job.RunID, claim.Job.ID); err != nil {
126
+			r.logger.WarnContext(ctx, "workspace cleanup failed", "run_id", claim.Job.RunID, "job_id", claim.Job.ID, "error", err)
127
+		}
128
+	}()
129
+
130
+	running, err := r.api.UpdateStatus(ctx, claim.Job.ID, token, api.StatusRequest{
131
+		Status:    "running",
132
+		StartedAt: started,
133
+	})
134
+	if err != nil {
135
+		return true, fmt.Errorf("mark job running: %w", err)
136
+	}
137
+	if running.NextToken == "" {
138
+		return true, errors.New("mark job running: server did not return next_token")
139
+	}
140
+	token = running.NextToken
141
+
142
+	outcome, execErr := r.engine.Execute(ctx, toEngineJob(claim.Job, workspaceDir, r.defaultImage))
143
+	conclusion := outcome.Conclusion
144
+	if conclusion == "" {
145
+		conclusion = engine.ConclusionFailure
146
+	}
147
+	completed := outcome.CompletedAt
148
+	if completed.IsZero() {
149
+		completed = r.clock()
150
+	}
151
+	if outcome.StartedAt.IsZero() {
152
+		outcome.StartedAt = started
153
+	}
154
+	if err := r.complete(ctx, claim.Job.ID, token, conclusion, outcome.StartedAt, completed); err != nil {
155
+		return true, err
156
+	}
157
+	if execErr != nil {
158
+		r.logger.WarnContext(ctx, "job completed with failing engine outcome", "job_id", claim.Job.ID, "conclusion", conclusion, "error", execErr)
159
+	}
160
+	return true, nil
161
+}
162
+
163
+func (r *Runner) complete(ctx context.Context, jobID int64, token, conclusion string, started, completed time.Time) error {
164
+	_, err := r.api.UpdateStatus(ctx, jobID, token, api.StatusRequest{
165
+		Status:      "completed",
166
+		Conclusion:  conclusion,
167
+		StartedAt:   started,
168
+		CompletedAt: completed,
169
+	})
170
+	if err != nil {
171
+		return fmt.Errorf("mark job completed: %w", err)
172
+	}
173
+	return nil
174
+}
175
+
176
+func toEngineJob(job api.Job, workspaceDir, defaultImage string) engine.Job {
177
+	steps := make([]engine.Step, 0, len(job.Steps))
178
+	for _, step := range job.Steps {
179
+		steps = append(steps, engine.Step{
180
+			ID:               step.ID,
181
+			Index:            step.Index,
182
+			StepID:           step.StepID,
183
+			Name:             step.Name,
184
+			If:               step.If,
185
+			Run:              step.Run,
186
+			Uses:             step.Uses,
187
+			WorkingDirectory: step.WorkingDirectory,
188
+			Env:              step.Env,
189
+			With:             step.With,
190
+			ContinueOnError:  step.ContinueOnError,
191
+		})
192
+	}
193
+	return engine.Job{
194
+		ID:             job.ID,
195
+		RunID:          job.RunID,
196
+		RepoID:         job.RepoID,
197
+		RunIndex:       job.RunIndex,
198
+		WorkflowFile:   job.WorkflowFile,
199
+		WorkflowName:   job.WorkflowName,
200
+		HeadSHA:        job.HeadSHA,
201
+		HeadRef:        job.HeadRef,
202
+		Event:          job.Event,
203
+		JobKey:         job.JobKey,
204
+		JobName:        job.JobName,
205
+		RunsOn:         job.RunsOn,
206
+		Needs:          append([]string{}, job.Needs...),
207
+		If:             job.If,
208
+		TimeoutMinutes: job.TimeoutMinutes,
209
+		Permissions:    job.Permissions,
210
+		Env:            job.Env,
211
+		Steps:          steps,
212
+		WorkspaceDir:   workspaceDir,
213
+		Image:          defaultImage,
214
+	}
215
+}
216
+
217
+func defaultSleep(ctx context.Context, d time.Duration) error {
218
+	timer := time.NewTimer(d)
219
+	defer timer.Stop()
220
+	select {
221
+	case <-ctx.Done():
222
+		return ctx.Err()
223
+	case <-timer.C:
224
+		return nil
225
+	}
226
+}
internal/runner/runner_test.goadded
@@ -0,0 +1,167 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package runner
4
+
5
+import (
6
+	"context"
7
+	"errors"
8
+	"testing"
9
+	"time"
10
+
11
+	"github.com/tenseleyFlow/shithub/internal/runner/api"
12
+	"github.com/tenseleyFlow/shithub/internal/runner/engine"
13
+)
14
+
15
+type fakeAPI struct {
16
+	claim    *api.Claim
17
+	statuses []api.StatusRequest
18
+	tokens   []string
19
+}
20
+
21
+func (f *fakeAPI) Heartbeat(_ context.Context, _ api.HeartbeatRequest) (*api.Claim, error) {
22
+	return f.claim, nil
23
+}
24
+
25
+func (f *fakeAPI) UpdateStatus(_ context.Context, _ int64, token string, req api.StatusRequest) (api.StatusResponse, error) {
26
+	f.tokens = append(f.tokens, token)
27
+	f.statuses = append(f.statuses, req)
28
+	if req.Status == "running" {
29
+		return api.StatusResponse{NextToken: "next-token"}, nil
30
+	}
31
+	return api.StatusResponse{}, nil
32
+}
33
+
34
+type fakeEngine struct {
35
+	job engine.Job
36
+	out engine.Outcome
37
+	err error
38
+}
39
+
40
+func (f *fakeEngine) Execute(_ context.Context, job engine.Job) (engine.Outcome, error) {
41
+	f.job = job
42
+	return f.out, f.err
43
+}
44
+
45
+func (f *fakeEngine) StreamLogs(_ context.Context, _ int64) (<-chan engine.LogChunk, error) {
46
+	ch := make(chan engine.LogChunk)
47
+	close(ch)
48
+	return ch, nil
49
+}
50
+
51
+func (f *fakeEngine) Cancel(_ context.Context, _ int64) error { return nil }
52
+
53
+type fakeWorkspaces struct {
54
+	dir     string
55
+	removed bool
56
+	err     error
57
+}
58
+
59
+func (f *fakeWorkspaces) Prepare(_, _ int64) (string, error) {
60
+	return f.dir, f.err
61
+}
62
+
63
+func (f *fakeWorkspaces) Remove(_, _ int64) error {
64
+	f.removed = true
65
+	return nil
66
+}
67
+
68
+func TestRunOnce_NoClaim(t *testing.T) {
69
+	t.Parallel()
70
+	r := New(Options{API: &fakeAPI{}, Engine: &fakeEngine{}, Workspaces: &fakeWorkspaces{}})
71
+	claimed, err := r.RunOnce(t.Context())
72
+	if err != nil {
73
+		t.Fatalf("RunOnce: %v", err)
74
+	}
75
+	if claimed {
76
+		t.Fatal("claimed = true")
77
+	}
78
+}
79
+
80
+func TestRunOnce_ExecutesAndCompletesSuccess(t *testing.T) {
81
+	t.Parallel()
82
+	now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
83
+	claim := &api.Claim{
84
+		Token: "job-token",
85
+		Job: api.Job{
86
+			ID:    10,
87
+			RunID: 20,
88
+			Env:   map[string]string{"A": "B"},
89
+			Steps: []api.Step{{ID: 30, Run: "echo hi"}},
90
+		},
91
+	}
92
+	fapi := &fakeAPI{claim: claim}
93
+	fengine := &fakeEngine{out: engine.Outcome{Conclusion: engine.ConclusionSuccess, StartedAt: now, CompletedAt: now.Add(time.Second)}}
94
+	workspaces := &fakeWorkspaces{dir: "/tmp/workspace"}
95
+	r := New(Options{
96
+		API:          fapi,
97
+		Engine:       fengine,
98
+		Workspaces:   workspaces,
99
+		DefaultImage: "runner-image",
100
+		Clock:        func() time.Time { return now },
101
+	})
102
+	claimed, err := r.RunOnce(t.Context())
103
+	if err != nil {
104
+		t.Fatalf("RunOnce: %v", err)
105
+	}
106
+	if !claimed {
107
+		t.Fatal("claimed = false")
108
+	}
109
+	if fengine.job.WorkspaceDir != "/tmp/workspace" || fengine.job.Image != "runner-image" {
110
+		t.Fatalf("engine job: %#v", fengine.job)
111
+	}
112
+	if len(fapi.statuses) != 2 {
113
+		t.Fatalf("statuses: %#v", fapi.statuses)
114
+	}
115
+	if fapi.statuses[0].Status != "running" || fapi.statuses[1].Conclusion != engine.ConclusionSuccess {
116
+		t.Fatalf("statuses: %#v", fapi.statuses)
117
+	}
118
+	if fapi.tokens[0] != "job-token" || fapi.tokens[1] != "next-token" {
119
+		t.Fatalf("tokens: %#v", fapi.tokens)
120
+	}
121
+	if !workspaces.removed {
122
+		t.Fatal("workspace was not removed")
123
+	}
124
+}
125
+
126
+func TestRunOnce_EngineFailureStillCompletesJob(t *testing.T) {
127
+	t.Parallel()
128
+	now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
129
+	fapi := &fakeAPI{claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}}
130
+	fengine := &fakeEngine{
131
+		out: engine.Outcome{Conclusion: engine.ConclusionFailure, StartedAt: now, CompletedAt: now.Add(time.Second)},
132
+		err: errors.New("exit 1"),
133
+	}
134
+	r := New(Options{
135
+		API:        fapi,
136
+		Engine:     fengine,
137
+		Workspaces: &fakeWorkspaces{dir: "/tmp/workspace"},
138
+		Clock:      func() time.Time { return now },
139
+	})
140
+	if _, err := r.RunOnce(t.Context()); err != nil {
141
+		t.Fatalf("RunOnce: %v", err)
142
+	}
143
+	if fapi.statuses[1].Conclusion != engine.ConclusionFailure {
144
+		t.Fatalf("completion: %#v", fapi.statuses[1])
145
+	}
146
+}
147
+
148
+func TestRunOnce_PrepareFailureMarksJobFailed(t *testing.T) {
149
+	t.Parallel()
150
+	fapi := &fakeAPI{claim: &api.Claim{Token: "job-token", Job: api.Job{ID: 10, RunID: 20}}}
151
+	r := New(Options{
152
+		API:        fapi,
153
+		Engine:     &fakeEngine{},
154
+		Workspaces: &fakeWorkspaces{err: errors.New("disk full")},
155
+		Clock:      func() time.Time { return time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC) },
156
+	})
157
+	claimed, err := r.RunOnce(t.Context())
158
+	if err == nil {
159
+		t.Fatal("RunOnce returned nil error")
160
+	}
161
+	if !claimed {
162
+		t.Fatal("claimed = false")
163
+	}
164
+	if len(fapi.statuses) != 1 || fapi.statuses[0].Conclusion != engine.ConclusionFailure {
165
+		t.Fatalf("statuses: %#v", fapi.statuses)
166
+	}
167
+}