Go · 15955 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package engine
4
5 import (
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "os"
13 "os/exec"
14 "path"
15 "regexp"
16 "sort"
17 "strconv"
18 "strings"
19 "sync"
20 "time"
21
22 "github.com/tenseleyFlow/shithub/internal/actions/expr"
23 runnerexec "github.com/tenseleyFlow/shithub/internal/runner/exec"
24 "github.com/tenseleyFlow/shithub/internal/runner/scrub"
25 )
26
27 var (
28 ErrUnsupportedUses = errors.New("runner engine: unsupported uses step")
29 ErrUnsupported = errors.New("runner engine: unsupported operation")
30 )
31
32 const (
33 defaultSeccompProfile = "/etc/shithubd-runner/seccomp.json"
34 defaultContainerUser = "65534:65534"
35 defaultPidsLimit = 512
36 defaultNofileLimit = "4096:4096"
37 defaultNprocLimit = "512:512"
38
39 // rootPermissionKey is an intentionally shithub-specific escape hatch.
40 // It requires an explicit per-job permissions entry rather than treating
41 // broad write-all permissions as permission to run the container as root.
42 rootPermissionKey = "shithub-runner-root"
43 )
44
45 type CommandRunner interface {
46 Run(ctx context.Context, name string, args []string, stdout, stderr io.Writer) error
47 }
48
49 type ExecRunner struct{}
50
51 func (ExecRunner) Run(ctx context.Context, name string, args []string, stdout, stderr io.Writer) error {
52 cmd := exec.CommandContext(ctx, name, args...)
53 cmd.Stdout = stdout
54 cmd.Stderr = stderr
55 return cmd.Run()
56 }
57
58 type DockerConfig struct {
59 Binary string
60 DefaultImage string
61 Network string
62 Memory string
63 CPUs string
64 SeccompProfile string
65 User string
66 PidsLimit int
67 DNSServers []string
68 LogChunkBytes int
69 LogFlushInterval time.Duration
70 StepLogLimit int64
71 Stdout io.Writer
72 Stderr io.Writer
73 Runner CommandRunner
74 MaskValues []string
75 Logger *slog.Logger
76 }
77
78 type Docker struct {
79 cfg DockerConfig
80 streams map[int64]chan LogChunk
81 eventSubs map[int64]chan Event
82 mu sync.Mutex
83 }
84
85 func NewDocker(cfg DockerConfig) *Docker {
86 if cfg.Binary == "" {
87 cfg.Binary = "docker"
88 }
89 if cfg.LogChunkBytes <= 0 {
90 cfg.LogChunkBytes = 4 * 1024
91 }
92 if cfg.LogFlushInterval <= 0 {
93 cfg.LogFlushInterval = time.Second
94 }
95 if cfg.StepLogLimit <= 0 {
96 cfg.StepLogLimit = 10 * 1024 * 1024
97 }
98 if cfg.Stdout == nil {
99 cfg.Stdout = io.Discard
100 }
101 if cfg.Stderr == nil {
102 cfg.Stderr = io.Discard
103 }
104 if cfg.Runner == nil {
105 cfg.Runner = ExecRunner{}
106 }
107 if cfg.SeccompProfile == "" {
108 cfg.SeccompProfile = defaultSeccompProfile
109 }
110 if cfg.User == "" {
111 cfg.User = defaultContainerUser
112 }
113 if cfg.PidsLimit <= 0 {
114 cfg.PidsLimit = defaultPidsLimit
115 }
116 if cfg.Logger == nil {
117 cfg.Logger = slog.New(slog.NewTextHandler(io.Discard, nil))
118 }
119 return &Docker{cfg: cfg, streams: make(map[int64]chan LogChunk), eventSubs: make(map[int64]chan Event)}
120 }
121
122 func (d *Docker) Execute(ctx context.Context, job Job) (Outcome, error) {
123 started := time.Now().UTC()
124 outcome := Outcome{Conclusion: ConclusionSuccess, StartedAt: started}
125 defer d.closeStream(job.ID)
126 defer d.closeEventStream(job.ID)
127 if job.TimeoutMinutes > 0 {
128 var cancel context.CancelFunc
129 ctx, cancel = context.WithTimeout(ctx, time.Duration(job.TimeoutMinutes)*time.Minute)
130 defer cancel()
131 }
132 if err := os.MkdirAll(job.WorkspaceDir, 0o700); err != nil {
133 outcome.Conclusion = ConclusionFailure
134 outcome.CompletedAt = time.Now().UTC()
135 return outcome, fmt.Errorf("runner engine: prepare workspace: %w", err)
136 }
137 for _, step := range job.Steps {
138 stepStarted := time.Now().UTC()
139 if err := d.executeStep(ctx, job, step); err != nil {
140 stepCompleted := time.Now().UTC()
141 stepOutcome := StepOutcome{
142 StepID: step.ID,
143 Status: "completed",
144 Conclusion: conclusionForError(err),
145 StartedAt: stepStarted,
146 CompletedAt: stepCompleted,
147 }
148 outcome.StepOutcomes = append(outcome.StepOutcomes, stepOutcome)
149 if emitErr := d.emitStepOutcome(ctx, job.ID, stepOutcome); emitErr != nil {
150 outcome.Conclusion = conclusionForError(emitErr)
151 outcome.CompletedAt = time.Now().UTC()
152 return outcome, emitErr
153 }
154 if step.ContinueOnError {
155 continue
156 }
157 outcome.Conclusion = conclusionForError(err)
158 outcome.CompletedAt = stepCompleted
159 return outcome, err
160 }
161 stepOutcome := StepOutcome{
162 StepID: step.ID,
163 Status: "completed",
164 Conclusion: ConclusionSuccess,
165 StartedAt: stepStarted,
166 CompletedAt: time.Now().UTC(),
167 }
168 outcome.StepOutcomes = append(outcome.StepOutcomes, stepOutcome)
169 if err := d.emitStepOutcome(ctx, job.ID, stepOutcome); err != nil {
170 outcome.Conclusion = conclusionForError(err)
171 outcome.CompletedAt = time.Now().UTC()
172 return outcome, err
173 }
174 }
175 outcome.CompletedAt = time.Now().UTC()
176 return outcome, nil
177 }
178
179 func (d *Docker) executeStep(ctx context.Context, job Job, step Step) error {
180 if strings.TrimSpace(step.Uses) != "" {
181 return fmt.Errorf("%w: %s is not executable until checkout/artifact support lands", ErrUnsupportedUses, step.Uses)
182 }
183 if strings.TrimSpace(step.Run) == "" {
184 return nil
185 }
186 invocation, err := d.dockerInvocation(job, step)
187 if err != nil {
188 return err
189 }
190 d.logStep(ctx, "runner step starting", job, step, invocation, "")
191 writer := d.newStepLogWriter(ctx, job.ID, step.ID, job.MaskValues)
192 out := io.MultiWriter(d.cfg.Stdout, writer)
193 errOut := io.MultiWriter(d.cfg.Stderr, writer)
194 if err := d.cfg.Runner.Run(ctx, d.cfg.Binary, invocation.args, out, errOut); err != nil {
195 d.logStep(ctx, "runner step completed", job, step, invocation, conclusionForError(err))
196 if closeErr := writer.Close(); closeErr != nil {
197 return fmt.Errorf("runner engine: step %q failed: %w", stepLabel(step), errors.Join(err, closeErr))
198 }
199 return fmt.Errorf("runner engine: step %q failed: %w", stepLabel(step), err)
200 }
201 d.logStep(ctx, "runner step completed", job, step, invocation, ConclusionSuccess)
202 if err := writer.Close(); err != nil {
203 return fmt.Errorf("runner engine: flush step %q logs: %w", stepLabel(step), err)
204 }
205 return nil
206 }
207
208 type dockerInvocation struct {
209 args []string
210 image string
211 network string
212 memory string
213 cpus string
214 user string
215 seccompProfile string
216 pidsLimit int
217 }
218
219 func (d *Docker) dockerInvocation(job Job, step Step) (dockerInvocation, error) {
220 workdir, err := containerWorkdir(step.WorkingDirectory)
221 if err != nil {
222 return dockerInvocation{}, err
223 }
224 image := strings.TrimSpace(job.Image)
225 if image == "" {
226 image = d.cfg.DefaultImage
227 }
228 if image == "" {
229 return dockerInvocation{}, errors.New("runner engine: image is required")
230 }
231 rendered, err := runnerexec.RenderStep(runnerexec.StepInput{
232 Run: step.Run,
233 JobEnv: job.Env,
234 StepEnv: step.Env,
235 Context: expressionContext(job),
236 })
237 if err != nil {
238 return dockerInvocation{}, fmt.Errorf("runner engine: render step %q: %w", stepLabel(step), err)
239 }
240 user := d.cfg.User
241 if permissionsRequestRoot(job.Permissions) {
242 user = "0:0"
243 }
244 args := []string{
245 "run",
246 "--rm",
247 "--network=" + d.cfg.Network,
248 "--memory=" + d.cfg.Memory,
249 "--cpus=" + d.cfg.CPUs,
250 "--pids-limit=" + strconv.Itoa(d.cfg.PidsLimit),
251 "--read-only",
252 "--tmpfs", "/tmp:rw,exec,nosuid,nodev,size=1g",
253 "--cap-drop=ALL",
254 "--cap-add=DAC_OVERRIDE",
255 "--cap-add=SETGID",
256 "--cap-add=SETUID",
257 "--security-opt=no-new-privileges",
258 "--security-opt=seccomp=" + d.cfg.SeccompProfile,
259 "--ulimit", "nofile=" + defaultNofileLimit,
260 "--ulimit", "nproc=" + defaultNprocLimit,
261 "--user", user,
262 "--workdir=" + workdir,
263 "--mount", "type=bind,src=" + job.WorkspaceDir + ",dst=/workspace,rw",
264 }
265 for _, dns := range d.cfg.DNSServers {
266 dns = strings.TrimSpace(dns)
267 if dns != "" {
268 args = append(args, "--dns", dns)
269 }
270 }
271 env, err := validateEnv(rendered.Env)
272 if err != nil {
273 return dockerInvocation{}, err
274 }
275 for _, key := range sortedKeys(env) {
276 args = append(args, "-e", key+"="+env[key])
277 }
278 args = append(args, image, "bash", "-c", rendered.Run)
279 return dockerInvocation{
280 args: args,
281 image: image,
282 network: d.cfg.Network,
283 memory: d.cfg.Memory,
284 cpus: d.cfg.CPUs,
285 user: user,
286 seccompProfile: d.cfg.SeccompProfile,
287 pidsLimit: d.cfg.PidsLimit,
288 }, nil
289 }
290
291 func (d *Docker) logStep(ctx context.Context, msg string, job Job, step Step, invocation dockerInvocation, conclusion string) {
292 attrs := []any{
293 "run_id", job.RunID,
294 "job_id", job.ID,
295 "step_id", step.ID,
296 "image", invocation.image,
297 "network", invocation.network,
298 "cpu_limit", invocation.cpus,
299 "memory_limit", invocation.memory,
300 "pids_limit", invocation.pidsLimit,
301 "container_user", invocation.user,
302 "seccomp_profile", invocation.seccompProfile,
303 }
304 if conclusion != "" {
305 attrs = append(attrs, "conclusion", conclusion)
306 }
307 d.cfg.Logger.InfoContext(ctx, msg, attrs...)
308 }
309
310 func expressionContext(job Job) expr.Context {
311 event := job.EventPayload
312 if len(event) == 0 && strings.TrimSpace(job.Event) != "" && json.Valid([]byte(job.Event)) {
313 _ = json.Unmarshal([]byte(job.Event), &event)
314 }
315 return expr.Context{
316 Secrets: job.Secrets,
317 Shithub: expr.ShithubContext{
318 Event: event,
319 RunID: fmt.Sprintf("%d", job.RunID),
320 SHA: job.HeadSHA,
321 Ref: job.HeadRef,
322 },
323 Untrusted: expr.DefaultUntrusted(),
324 }
325 }
326
327 func permissionsRequestRoot(raw json.RawMessage) bool {
328 if len(raw) == 0 || !json.Valid(raw) {
329 return false
330 }
331 var shaped struct {
332 Per map[string]string `json:"per"`
333 }
334 if err := json.Unmarshal(raw, &shaped); err == nil && strings.EqualFold(shaped.Per[rootPermissionKey], "write") {
335 return true
336 }
337 var flat map[string]string
338 if err := json.Unmarshal(raw, &flat); err != nil {
339 return false
340 }
341 return strings.EqualFold(flat[rootPermissionKey], "write")
342 }
343
344 func (d *Docker) StreamLogs(_ context.Context, jobID int64) (<-chan LogChunk, error) {
345 return d.ensureStream(jobID), nil
346 }
347
348 func (d *Docker) StreamEvents(_ context.Context, jobID int64) (<-chan Event, error) {
349 return d.ensureEventStream(jobID), nil
350 }
351
352 func (d *Docker) Cancel(_ context.Context, _ int64) error {
353 return ErrUnsupported
354 }
355
356 func (d *Docker) ensureStream(jobID int64) chan LogChunk {
357 d.mu.Lock()
358 defer d.mu.Unlock()
359 if ch, ok := d.streams[jobID]; ok {
360 return ch
361 }
362 ch := make(chan LogChunk, 128)
363 d.streams[jobID] = ch
364 return ch
365 }
366
367 func (d *Docker) closeStream(jobID int64) {
368 d.mu.Lock()
369 ch, ok := d.streams[jobID]
370 if ok {
371 delete(d.streams, jobID)
372 }
373 d.mu.Unlock()
374 if ok {
375 close(ch)
376 }
377 }
378
379 func (d *Docker) logStream(jobID int64) chan LogChunk {
380 d.mu.Lock()
381 defer d.mu.Unlock()
382 return d.streams[jobID]
383 }
384
385 func (d *Docker) ensureEventStream(jobID int64) chan Event {
386 d.mu.Lock()
387 defer d.mu.Unlock()
388 if ch, ok := d.eventSubs[jobID]; ok {
389 return ch
390 }
391 ch := make(chan Event, 128)
392 d.eventSubs[jobID] = ch
393 return ch
394 }
395
396 func (d *Docker) closeEventStream(jobID int64) {
397 d.mu.Lock()
398 ch, ok := d.eventSubs[jobID]
399 if ok {
400 delete(d.eventSubs, jobID)
401 }
402 d.mu.Unlock()
403 if ok {
404 close(ch)
405 }
406 }
407
408 func (d *Docker) eventStream(jobID int64) chan Event {
409 d.mu.Lock()
410 defer d.mu.Unlock()
411 return d.eventSubs[jobID]
412 }
413
414 func (d *Docker) emitStepOutcome(ctx context.Context, jobID int64, step StepOutcome) error {
415 ch := d.eventStream(jobID)
416 if ch == nil {
417 return nil
418 }
419 copied := step
420 select {
421 case <-ctx.Done():
422 return ctx.Err()
423 case ch <- Event{Step: &copied}:
424 return nil
425 }
426 }
427
428 func (d *Docker) newStepLogWriter(ctx context.Context, jobID, stepID int64, jobMasks []string) *stepLogWriter {
429 w := &stepLogWriter{
430 ctx: ctx,
431 ch: d.logStream(jobID),
432 events: d.eventStream(jobID),
433 jobID: jobID,
434 stepID: stepID,
435 maxChunk: d.cfg.LogChunkBytes,
436 interval: d.cfg.LogFlushInterval,
437 limit: d.cfg.StepLogLimit,
438 masker: scrub.New(append(append([]string{}, d.cfg.MaskValues...), jobMasks...)),
439 done: make(chan struct{}),
440 }
441 go w.flushLoop()
442 return w
443 }
444
445 type stepLogWriter struct {
446 ctx context.Context
447 ch chan<- LogChunk
448 events chan<- Event
449 jobID int64
450 stepID int64
451 seq int32
452 maxChunk int
453 interval time.Duration
454 limit int64
455 written int64
456 truncated bool
457 masker *scrub.Scrubber
458 buf []byte
459 done chan struct{}
460 once sync.Once
461 mu sync.Mutex
462 closed bool
463 }
464
465 func (w *stepLogWriter) Write(p []byte) (int, error) {
466 w.mu.Lock()
467 defer w.mu.Unlock()
468 if w.closed {
469 return 0, io.ErrClosedPipe
470 }
471 w.appendWithinLimit(p)
472 for len(w.buf) >= w.maxChunk {
473 if err := w.emitLocked(w.buf[:w.maxChunk]); err != nil {
474 return 0, err
475 }
476 w.buf = w.buf[w.maxChunk:]
477 }
478 return len(p), nil
479 }
480
481 func (w *stepLogWriter) Close() error {
482 w.once.Do(func() {
483 close(w.done)
484 w.mu.Lock()
485 defer w.mu.Unlock()
486 _ = w.flushLocked()
487 _ = w.flushMaskerLocked()
488 w.closed = true
489 })
490 return nil
491 }
492
493 func (w *stepLogWriter) appendWithinLimit(p []byte) {
494 if w.limit <= 0 {
495 w.buf = append(w.buf, p...)
496 return
497 }
498 remaining := w.limit - w.written
499 if remaining > 0 {
500 if int64(len(p)) <= remaining {
501 w.buf = append(w.buf, p...)
502 w.written += int64(len(p))
503 return
504 }
505 w.buf = append(w.buf, p[:int(remaining)]...)
506 w.written += remaining
507 }
508 if !w.truncated {
509 w.buf = append(w.buf, []byte("\n[shithub-runner: step log truncated after 10 MiB]\n")...)
510 w.truncated = true
511 }
512 }
513
514 func (w *stepLogWriter) flushLoop() {
515 ticker := time.NewTicker(w.interval)
516 defer ticker.Stop()
517 for {
518 select {
519 case <-w.done:
520 return
521 case <-w.ctx.Done():
522 return
523 case <-ticker.C:
524 w.mu.Lock()
525 _ = w.flushLocked()
526 w.mu.Unlock()
527 }
528 }
529 }
530
531 func (w *stepLogWriter) flushLocked() error {
532 if len(w.buf) == 0 {
533 return nil
534 }
535 if err := w.emitLocked(w.buf); err != nil {
536 return err
537 }
538 w.buf = nil
539 return nil
540 }
541
542 func (w *stepLogWriter) emitLocked(chunk []byte) error {
543 if w.masker != nil {
544 chunk = w.masker.Scrub(chunk)
545 if len(chunk) == 0 {
546 return nil
547 }
548 }
549 return w.emitChunkLocked(chunk)
550 }
551
552 func (w *stepLogWriter) flushMaskerLocked() error {
553 if w.masker == nil {
554 return nil
555 }
556 chunk := w.masker.Flush()
557 if len(chunk) == 0 {
558 return nil
559 }
560 return w.emitChunkLocked(chunk)
561 }
562
563 func (w *stepLogWriter) emitChunkLocked(chunk []byte) error {
564 copied := LogChunk{JobID: w.jobID, StepID: w.stepID, Seq: w.seq, Chunk: append([]byte(nil), chunk...)}
565 if w.ch != nil {
566 select {
567 case <-w.ctx.Done():
568 return w.ctx.Err()
569 case w.ch <- copied:
570 }
571 }
572 if w.events != nil {
573 eventChunk := copied
574 select {
575 case <-w.ctx.Done():
576 return w.ctx.Err()
577 case w.events <- Event{Log: &eventChunk}:
578 }
579 }
580 w.seq++
581 return nil
582 }
583
584 func conclusionForError(err error) string {
585 if errors.Is(err, context.Canceled) {
586 return ConclusionCancelled
587 }
588 if errors.Is(err, context.DeadlineExceeded) {
589 return ConclusionTimedOut
590 }
591 return ConclusionFailure
592 }
593
594 func containerWorkdir(wd string) (string, error) {
595 wd = strings.TrimSpace(wd)
596 if wd == "" {
597 return "/workspace", nil
598 }
599 if strings.HasPrefix(wd, "/") {
600 return "", fmt.Errorf("runner engine: working-directory must be relative, got %q", wd)
601 }
602 clean := path.Clean("/workspace/" + wd)
603 if clean != "/workspace" && !strings.HasPrefix(clean, "/workspace/") {
604 return "", fmt.Errorf("runner engine: working-directory escapes workspace: %q", wd)
605 }
606 return clean, nil
607 }
608
609 var envNameRE = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`)
610
611 func validateEnv(env map[string]string) (map[string]string, error) {
612 out := make(map[string]string, len(env))
613 for k, v := range env {
614 if !envNameRE.MatchString(k) {
615 return nil, fmt.Errorf("runner engine: invalid env name %q", k)
616 }
617 out[k] = v
618 }
619 return out, nil
620 }
621
622 func sortedKeys(m map[string]string) []string {
623 keys := make([]string, 0, len(m))
624 for k := range m {
625 keys = append(keys, k)
626 }
627 sort.Strings(keys)
628 return keys
629 }
630
631 func stepLabel(step Step) string {
632 if step.Name != "" {
633 return step.Name
634 }
635 if step.StepID != "" {
636 return step.StepID
637 }
638 return fmt.Sprintf("#%d", step.Index)
639 }
640