tenseleyflow/shithub / 85747bd

Browse files

runner: add API client and workspace manager

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
85747bd28e38e13a141cf1a9879bc2f95180e65c
Parents
13db5fe
Tree
1e5c5d3

4 changed files

StatusFile+-
A internal/runner/api/client.go 231 0
A internal/runner/api/client_test.go 126 0
A internal/runner/workspace/workspace.go 82 0
A internal/runner/workspace/workspace_test.go 64 0
internal/runner/api/client.goadded
@@ -0,0 +1,231 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package api is the shithubd-runner client for the S41c runner HTTP API.
4
+package api
5
+
6
+import (
7
+	"bytes"
8
+	"context"
9
+	"encoding/base64"
10
+	"encoding/json"
11
+	"fmt"
12
+	"io"
13
+	"net/http"
14
+	"net/url"
15
+	"strconv"
16
+	"strings"
17
+	"time"
18
+)
19
+
20
+type Config struct {
21
+	BaseURL     string
22
+	RunnerToken string
23
+	HTTPClient  *http.Client
24
+}
25
+
26
+type Client struct {
27
+	base        *url.URL
28
+	runnerToken string
29
+	http        *http.Client
30
+}
31
+
32
+func New(cfg Config) (*Client, error) {
33
+	base, err := url.Parse(strings.TrimRight(strings.TrimSpace(cfg.BaseURL), "/"))
34
+	if err != nil || base.Scheme == "" || base.Host == "" {
35
+		return nil, fmt.Errorf("runner api: invalid base URL %q", cfg.BaseURL)
36
+	}
37
+	if strings.TrimSpace(cfg.RunnerToken) == "" {
38
+		return nil, fmt.Errorf("runner api: runner token is required")
39
+	}
40
+	hc := cfg.HTTPClient
41
+	if hc == nil {
42
+		hc = http.DefaultClient
43
+	}
44
+	return &Client{base: base, runnerToken: strings.TrimSpace(cfg.RunnerToken), http: hc}, nil
45
+}
46
+
47
+type HeartbeatRequest struct {
48
+	Labels   []string `json:"labels"`
49
+	Capacity int      `json:"capacity"`
50
+}
51
+
52
+type Claim struct {
53
+	Token     string    `json:"token"`
54
+	ExpiresAt time.Time `json:"expires_at"`
55
+	Job       Job       `json:"job"`
56
+}
57
+
58
+type Job struct {
59
+	ID             int64             `json:"id"`
60
+	RunID          int64             `json:"run_id"`
61
+	RepoID         int64             `json:"repo_id"`
62
+	RunIndex       int64             `json:"run_index"`
63
+	WorkflowFile   string            `json:"workflow_file"`
64
+	WorkflowName   string            `json:"workflow_name"`
65
+	HeadSHA        string            `json:"head_sha"`
66
+	HeadRef        string            `json:"head_ref"`
67
+	Event          string            `json:"event"`
68
+	JobKey         string            `json:"job_key"`
69
+	JobName        string            `json:"job_name"`
70
+	RunsOn         string            `json:"runs_on"`
71
+	Needs          []string          `json:"needs"`
72
+	If             string            `json:"if"`
73
+	TimeoutMinutes int32             `json:"timeout_minutes"`
74
+	Permissions    json.RawMessage   `json:"permissions"`
75
+	Env            map[string]string `json:"env"`
76
+	Steps          []Step            `json:"steps"`
77
+}
78
+
79
+type Step struct {
80
+	ID               int64             `json:"id"`
81
+	Index            int32             `json:"index"`
82
+	StepID           string            `json:"step_id"`
83
+	Name             string            `json:"name"`
84
+	If               string            `json:"if"`
85
+	Run              string            `json:"run"`
86
+	Uses             string            `json:"uses"`
87
+	WorkingDirectory string            `json:"working_directory"`
88
+	Env              map[string]string `json:"env"`
89
+	With             map[string]string `json:"with"`
90
+	ContinueOnError  bool              `json:"continue_on_error"`
91
+}
92
+
93
+type StatusRequest struct {
94
+	Status      string    `json:"status"`
95
+	Conclusion  string    `json:"conclusion,omitempty"`
96
+	StartedAt   time.Time `json:"-"`
97
+	CompletedAt time.Time `json:"-"`
98
+}
99
+
100
+func (r StatusRequest) MarshalJSON() ([]byte, error) {
101
+	type wire struct {
102
+		Status      string `json:"status"`
103
+		Conclusion  string `json:"conclusion,omitempty"`
104
+		StartedAt   string `json:"started_at,omitempty"`
105
+		CompletedAt string `json:"completed_at,omitempty"`
106
+	}
107
+	out := wire{Status: r.Status, Conclusion: r.Conclusion}
108
+	if !r.StartedAt.IsZero() {
109
+		out.StartedAt = r.StartedAt.UTC().Format(time.RFC3339Nano)
110
+	}
111
+	if !r.CompletedAt.IsZero() {
112
+		out.CompletedAt = r.CompletedAt.UTC().Format(time.RFC3339Nano)
113
+	}
114
+	return json.Marshal(out)
115
+}
116
+
117
+type StatusResponse struct {
118
+	Status             string    `json:"status"`
119
+	Conclusion         *string   `json:"conclusion"`
120
+	RunStatus          string    `json:"run_status,omitempty"`
121
+	RunConclusion      string    `json:"run_conclusion,omitempty"`
122
+	NextToken          string    `json:"next_token,omitempty"`
123
+	NextTokenExpiresAt time.Time `json:"next_token_expires_at,omitempty"`
124
+}
125
+
126
+type LogRequest struct {
127
+	Seq    int32  `json:"seq"`
128
+	Chunk  []byte `json:"-"`
129
+	StepID int64  `json:"step_id,omitempty"`
130
+}
131
+
132
+func (r LogRequest) MarshalJSON() ([]byte, error) {
133
+	type wire struct {
134
+		Seq    int32  `json:"seq"`
135
+		Chunk  string `json:"chunk"`
136
+		StepID int64  `json:"step_id,omitempty"`
137
+	}
138
+	return json.Marshal(wire{
139
+		Seq:    r.Seq,
140
+		Chunk:  base64.StdEncoding.EncodeToString(r.Chunk),
141
+		StepID: r.StepID,
142
+	})
143
+}
144
+
145
+type LogResponse struct {
146
+	Accepted           bool      `json:"accepted"`
147
+	NextToken          string    `json:"next_token"`
148
+	NextTokenExpiresAt time.Time `json:"next_token_expires_at"`
149
+}
150
+
151
+type CancelCheckResponse struct {
152
+	Cancelled          bool      `json:"cancelled"`
153
+	NextToken          string    `json:"next_token"`
154
+	NextTokenExpiresAt time.Time `json:"next_token_expires_at"`
155
+}
156
+
157
+func (c *Client) Heartbeat(ctx context.Context, req HeartbeatRequest) (*Claim, error) {
158
+	var claim Claim
159
+	status, err := c.do(ctx, http.MethodPost, "/api/v1/runners/heartbeat", c.runnerToken, req, &claim)
160
+	if err != nil {
161
+		return nil, err
162
+	}
163
+	if status == http.StatusNoContent {
164
+		return nil, nil
165
+	}
166
+	return &claim, nil
167
+}
168
+
169
+func (c *Client) UpdateStatus(ctx context.Context, jobID int64, token string, req StatusRequest) (StatusResponse, error) {
170
+	var out StatusResponse
171
+	_, err := c.do(ctx, http.MethodPost, jobPath(jobID, "status"), token, req, &out)
172
+	return out, err
173
+}
174
+
175
+func (c *Client) AppendLog(ctx context.Context, jobID int64, token string, req LogRequest) (LogResponse, error) {
176
+	var out LogResponse
177
+	_, err := c.do(ctx, http.MethodPost, jobPath(jobID, "logs"), token, req, &out)
178
+	return out, err
179
+}
180
+
181
+func (c *Client) CancelCheck(ctx context.Context, jobID int64, token string) (CancelCheckResponse, error) {
182
+	var out CancelCheckResponse
183
+	_, err := c.do(ctx, http.MethodPost, jobPath(jobID, "cancel-check"), token, map[string]string{}, &out)
184
+	return out, err
185
+}
186
+
187
+func jobPath(jobID int64, suffix string) string {
188
+	return "/api/v1/jobs/" + strconv.FormatInt(jobID, 10) + "/" + suffix
189
+}
190
+
191
+func (c *Client) do(ctx context.Context, method, path, bearer string, body, out any) (int, error) {
192
+	var r io.Reader
193
+	if body != nil {
194
+		var buf bytes.Buffer
195
+		if err := json.NewEncoder(&buf).Encode(body); err != nil {
196
+			return 0, fmt.Errorf("runner api: encode %s %s: %w", method, path, err)
197
+		}
198
+		r = &buf
199
+	}
200
+	u := c.base.ResolveReference(&url.URL{Path: path})
201
+	req, err := http.NewRequestWithContext(ctx, method, u.String(), r)
202
+	if err != nil {
203
+		return 0, err
204
+	}
205
+	req.Header.Set("Accept", "application/json")
206
+	if body != nil {
207
+		req.Header.Set("Content-Type", "application/json")
208
+	}
209
+	if strings.TrimSpace(bearer) != "" {
210
+		req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(bearer))
211
+	}
212
+	resp, err := c.http.Do(req)
213
+	if err != nil {
214
+		return 0, fmt.Errorf("runner api: %s %s: %w", method, path, err)
215
+	}
216
+	defer resp.Body.Close()
217
+	if resp.StatusCode == http.StatusNoContent {
218
+		return resp.StatusCode, nil
219
+	}
220
+	if resp.StatusCode < 200 || resp.StatusCode > 299 {
221
+		msg, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
222
+		return resp.StatusCode, fmt.Errorf("runner api: %s %s returned %d: %s", method, path, resp.StatusCode, strings.TrimSpace(string(msg)))
223
+	}
224
+	if out == nil {
225
+		return resp.StatusCode, nil
226
+	}
227
+	if err := json.NewDecoder(resp.Body).Decode(out); err != nil {
228
+		return resp.StatusCode, fmt.Errorf("runner api: decode %s %s: %w", method, path, err)
229
+	}
230
+	return resp.StatusCode, nil
231
+}
internal/runner/api/client_test.goadded
@@ -0,0 +1,126 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package api
4
+
5
+import (
6
+	"encoding/json"
7
+	"net/http"
8
+	"net/http/httptest"
9
+	"strings"
10
+	"testing"
11
+	"time"
12
+)
13
+
14
+func TestHeartbeat_ClaimsJob(t *testing.T) {
15
+	t.Parallel()
16
+	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
17
+		if r.URL.Path != "/api/v1/runners/heartbeat" {
18
+			t.Fatalf("path: %s", r.URL.Path)
19
+		}
20
+		if got := r.Header.Get("Authorization"); got != "Bearer runner-token" {
21
+			t.Fatalf("Authorization: %q", got)
22
+		}
23
+		var req HeartbeatRequest
24
+		if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
25
+			t.Fatalf("Decode: %v", err)
26
+		}
27
+		if req.Capacity != 2 || strings.Join(req.Labels, ",") != "self-hosted,linux" {
28
+			t.Fatalf("request: %#v", req)
29
+		}
30
+		w.Header().Set("Content-Type", "application/json")
31
+		_, _ = w.Write([]byte(`{
32
+			"token":"job-token",
33
+			"expires_at":"2026-05-10T21:00:00Z",
34
+			"job":{"id":10,"run_id":20,"repo_id":30,"run_index":1,"workflow_file":"ci.yml","workflow_name":"CI","head_sha":"abc","head_ref":"trunk","event":"push","job_key":"test","job_name":"test","runs_on":"ubuntu-latest","needs":[],"if":"","timeout_minutes":30,"permissions":{},"env":{"A":"B"},"steps":[{"id":40,"index":0,"step_id":"s1","name":"Run","if":"","run":"echo hi","uses":"","working_directory":"","env":{},"with":{},"continue_on_error":false}]}
35
+		}`))
36
+	}))
37
+	defer srv.Close()
38
+
39
+	client, err := New(Config{BaseURL: srv.URL, RunnerToken: "runner-token", HTTPClient: srv.Client()})
40
+	if err != nil {
41
+		t.Fatalf("New: %v", err)
42
+	}
43
+	claim, err := client.Heartbeat(t.Context(), HeartbeatRequest{Labels: []string{"self-hosted", "linux"}, Capacity: 2})
44
+	if err != nil {
45
+		t.Fatalf("Heartbeat: %v", err)
46
+	}
47
+	if claim.Token != "job-token" || claim.Job.ID != 10 || claim.Job.Steps[0].Run != "echo hi" {
48
+		t.Fatalf("claim: %#v", claim)
49
+	}
50
+}
51
+
52
+func TestHeartbeat_NoJob(t *testing.T) {
53
+	t.Parallel()
54
+	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
55
+		w.WriteHeader(http.StatusNoContent)
56
+	}))
57
+	defer srv.Close()
58
+	client, err := New(Config{BaseURL: srv.URL, RunnerToken: "runner-token", HTTPClient: srv.Client()})
59
+	if err != nil {
60
+		t.Fatalf("New: %v", err)
61
+	}
62
+	claim, err := client.Heartbeat(t.Context(), HeartbeatRequest{Capacity: 1})
63
+	if err != nil {
64
+		t.Fatalf("Heartbeat: %v", err)
65
+	}
66
+	if claim != nil {
67
+		t.Fatalf("claim: %#v", claim)
68
+	}
69
+}
70
+
71
+func TestUpdateStatus_UsesJobTokenAndParsesNextToken(t *testing.T) {
72
+	t.Parallel()
73
+	started := time.Date(2026, 5, 10, 21, 0, 0, 123, time.UTC)
74
+	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
75
+		if r.URL.Path != "/api/v1/jobs/10/status" {
76
+			t.Fatalf("path: %s", r.URL.Path)
77
+		}
78
+		if got := r.Header.Get("Authorization"); got != "Bearer job-token" {
79
+			t.Fatalf("Authorization: %q", got)
80
+		}
81
+		var body map[string]string
82
+		if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
83
+			t.Fatalf("Decode: %v", err)
84
+		}
85
+		if body["status"] != "running" || !strings.HasPrefix(body["started_at"], "2026-05-10T21:00:00.") {
86
+			t.Fatalf("body: %#v", body)
87
+		}
88
+		w.Header().Set("Content-Type", "application/json")
89
+		_, _ = w.Write([]byte(`{"status":"running","conclusion":null,"next_token":"next","next_token_expires_at":"2026-05-10T21:15:00Z"}`))
90
+	}))
91
+	defer srv.Close()
92
+	client, err := New(Config{BaseURL: srv.URL, RunnerToken: "runner-token", HTTPClient: srv.Client()})
93
+	if err != nil {
94
+		t.Fatalf("New: %v", err)
95
+	}
96
+	resp, err := client.UpdateStatus(t.Context(), 10, "job-token", StatusRequest{Status: "running", StartedAt: started})
97
+	if err != nil {
98
+		t.Fatalf("UpdateStatus: %v", err)
99
+	}
100
+	if resp.NextToken != "next" {
101
+		t.Fatalf("NextToken: %q", resp.NextToken)
102
+	}
103
+}
104
+
105
+func TestAppendLog_Base64EncodesChunk(t *testing.T) {
106
+	t.Parallel()
107
+	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
108
+		var body map[string]any
109
+		if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
110
+			t.Fatalf("Decode: %v", err)
111
+		}
112
+		if body["chunk"] != "aGkK" || body["seq"].(float64) != 7 {
113
+			t.Fatalf("body: %#v", body)
114
+		}
115
+		w.Header().Set("Content-Type", "application/json")
116
+		_, _ = w.Write([]byte(`{"accepted":true,"next_token":"next","next_token_expires_at":"2026-05-10T21:15:00Z"}`))
117
+	}))
118
+	defer srv.Close()
119
+	client, err := New(Config{BaseURL: srv.URL, RunnerToken: "runner-token", HTTPClient: srv.Client()})
120
+	if err != nil {
121
+		t.Fatalf("New: %v", err)
122
+	}
123
+	if _, err := client.AppendLog(t.Context(), 10, "job-token", LogRequest{Seq: 7, Chunk: []byte("hi\n")}); err != nil {
124
+		t.Fatalf("AppendLog: %v", err)
125
+	}
126
+}
internal/runner/workspace/workspace.goadded
@@ -0,0 +1,82 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package workspace owns shithubd-runner's on-disk job workspace layout.
4
+package workspace
5
+
6
+import (
7
+	"fmt"
8
+	"os"
9
+	"path/filepath"
10
+	"strconv"
11
+	"time"
12
+)
13
+
14
+type Manager struct {
15
+	Root string
16
+}
17
+
18
+func New(root string) Manager {
19
+	return Manager{Root: root}
20
+}
21
+
22
+func (m Manager) Prepare(runID, jobID int64) (string, error) {
23
+	dir := JobDir(m.Root, runID, jobID)
24
+	if err := os.MkdirAll(dir, 0o700); err != nil {
25
+		return "", fmt.Errorf("runner workspace: prepare %s: %w", dir, err)
26
+	}
27
+	return dir, nil
28
+}
29
+
30
+func (m Manager) Remove(runID, jobID int64) error {
31
+	dir := JobDir(m.Root, runID, jobID)
32
+	if err := os.RemoveAll(dir); err != nil {
33
+		return fmt.Errorf("runner workspace: remove %s: %w", dir, err)
34
+	}
35
+	_ = os.Remove(filepath.Dir(dir))
36
+	return nil
37
+}
38
+
39
+func (m Manager) Sweep(ttl time.Duration, now time.Time) (int, error) {
40
+	if err := os.MkdirAll(m.Root, 0o700); err != nil {
41
+		return 0, fmt.Errorf("runner workspace: create root: %w", err)
42
+	}
43
+	cutoff := now.Add(-ttl)
44
+	runDirs, err := os.ReadDir(m.Root)
45
+	if err != nil {
46
+		return 0, fmt.Errorf("runner workspace: read root: %w", err)
47
+	}
48
+	removed := 0
49
+	for _, runDir := range runDirs {
50
+		if !runDir.IsDir() {
51
+			continue
52
+		}
53
+		runPath := filepath.Join(m.Root, runDir.Name())
54
+		jobDirs, err := os.ReadDir(runPath)
55
+		if err != nil {
56
+			return removed, fmt.Errorf("runner workspace: read %s: %w", runPath, err)
57
+		}
58
+		for _, jobDir := range jobDirs {
59
+			if !jobDir.IsDir() {
60
+				continue
61
+			}
62
+			jobPath := filepath.Join(runPath, jobDir.Name())
63
+			info, err := jobDir.Info()
64
+			if err != nil {
65
+				return removed, fmt.Errorf("runner workspace: stat %s: %w", jobPath, err)
66
+			}
67
+			if info.ModTime().After(cutoff) {
68
+				continue
69
+			}
70
+			if err := os.RemoveAll(jobPath); err != nil {
71
+				return removed, fmt.Errorf("runner workspace: sweep %s: %w", jobPath, err)
72
+			}
73
+			removed++
74
+		}
75
+		_ = os.Remove(runPath)
76
+	}
77
+	return removed, nil
78
+}
79
+
80
+func JobDir(root string, runID, jobID int64) string {
81
+	return filepath.Join(root, strconv.FormatInt(runID, 10), strconv.FormatInt(jobID, 10))
82
+}
internal/runner/workspace/workspace_test.goadded
@@ -0,0 +1,64 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package workspace
4
+
5
+import (
6
+	"os"
7
+	"path/filepath"
8
+	"testing"
9
+	"time"
10
+)
11
+
12
+func TestPrepareAndRemove(t *testing.T) {
13
+	t.Parallel()
14
+	m := New(t.TempDir())
15
+	dir, err := m.Prepare(10, 20)
16
+	if err != nil {
17
+		t.Fatalf("Prepare: %v", err)
18
+	}
19
+	if dir != filepath.Join(m.Root, "10", "20") {
20
+		t.Fatalf("dir: %s", dir)
21
+	}
22
+	if _, err := os.Stat(dir); err != nil {
23
+		t.Fatalf("Stat: %v", err)
24
+	}
25
+	if err := m.Remove(10, 20); err != nil {
26
+		t.Fatalf("Remove: %v", err)
27
+	}
28
+	if _, err := os.Stat(dir); !os.IsNotExist(err) {
29
+		t.Fatalf("Stat after remove: %v", err)
30
+	}
31
+}
32
+
33
+func TestSweepRemovesExpiredJobWorkspaces(t *testing.T) {
34
+	t.Parallel()
35
+	now := time.Date(2026, 5, 10, 21, 0, 0, 0, time.UTC)
36
+	m := New(t.TempDir())
37
+	stale, err := m.Prepare(1, 1)
38
+	if err != nil {
39
+		t.Fatalf("Prepare stale: %v", err)
40
+	}
41
+	fresh, err := m.Prepare(1, 2)
42
+	if err != nil {
43
+		t.Fatalf("Prepare fresh: %v", err)
44
+	}
45
+	if err := os.Chtimes(stale, now.Add(-25*time.Hour), now.Add(-25*time.Hour)); err != nil {
46
+		t.Fatalf("Chtimes stale: %v", err)
47
+	}
48
+	if err := os.Chtimes(fresh, now.Add(-time.Hour), now.Add(-time.Hour)); err != nil {
49
+		t.Fatalf("Chtimes fresh: %v", err)
50
+	}
51
+	removed, err := m.Sweep(24*time.Hour, now)
52
+	if err != nil {
53
+		t.Fatalf("Sweep: %v", err)
54
+	}
55
+	if removed != 1 {
56
+		t.Fatalf("removed: %d", removed)
57
+	}
58
+	if _, err := os.Stat(stale); !os.IsNotExist(err) {
59
+		t.Fatalf("stale still exists: %v", err)
60
+	}
61
+	if _, err := os.Stat(fresh); err != nil {
62
+		t.Fatalf("fresh stat: %v", err)
63
+	}
64
+}