Go · 23831 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package engine
4
5 import (
6 "bytes"
7 "context"
8 "encoding/json"
9 "errors"
10 "fmt"
11 "io"
12 "log/slog"
13 "net/url"
14 "os"
15 "os/exec"
16 "path"
17 "regexp"
18 "sort"
19 "strconv"
20 "strings"
21 "sync"
22 "time"
23
24 "github.com/tenseleyFlow/shithub/internal/actions/expr"
25 runnerexec "github.com/tenseleyFlow/shithub/internal/runner/exec"
26 "github.com/tenseleyFlow/shithub/internal/runner/scrub"
27 )
28
29 var (
30 ErrUnsupportedUses = errors.New("runner engine: unsupported uses step")
31 ErrUnsupported = errors.New("runner engine: unsupported operation")
32 )
33
34 const (
35 defaultSeccompProfile = "/etc/shithubd-runner/seccomp.json"
36 defaultContainerUser = "65534:65534"
37 defaultPidsLimit = 512
38 defaultNofileLimit = "4096:4096"
39 defaultNprocLimit = "512:512"
40
41 // rootPermissionKey is an intentionally shithub-specific escape hatch.
42 // It requires an explicit per-job permissions entry rather than treating
43 // broad write-all permissions as permission to run the container as root.
44 rootPermissionKey = "shithub-runner-root"
45 )
46
47 type CommandRunner interface {
48 Run(ctx context.Context, name string, args []string, env []string, stdout, stderr io.Writer) error
49 }
50
51 type ExecRunner struct{}
52
53 func (ExecRunner) Run(ctx context.Context, name string, args []string, env []string, stdout, stderr io.Writer) error {
54 cmd := exec.CommandContext(ctx, name, args...)
55 if len(env) > 0 {
56 cmd.Env = append(os.Environ(), env...)
57 }
58 cmd.Stdout = stdout
59 cmd.Stderr = stderr
60 return cmd.Run()
61 }
62
63 type DockerConfig struct {
64 Binary string
65 GitBinary string
66 DefaultImage string
67 Network string
68 Memory string
69 CPUs string
70 SeccompProfile string
71 User string
72 PidsLimit int
73 DNSServers []string
74 LogChunkBytes int
75 LogFlushInterval time.Duration
76 StepLogLimit int64
77 TimeoutMinute time.Duration
78 Stdout io.Writer
79 Stderr io.Writer
80 Runner CommandRunner
81 MaskValues []string
82 AllowRoot bool
83 Logger *slog.Logger
84 }
85
86 type Docker struct {
87 cfg DockerConfig
88 streams map[int64]chan LogChunk
89 eventSubs map[int64]chan Event
90 active map[int64]string
91 mu sync.Mutex
92 }
93
94 func NewDocker(cfg DockerConfig) *Docker {
95 if cfg.Binary == "" {
96 cfg.Binary = "docker"
97 }
98 if cfg.GitBinary == "" {
99 cfg.GitBinary = "git"
100 }
101 if cfg.LogChunkBytes <= 0 {
102 cfg.LogChunkBytes = 4 * 1024
103 }
104 if cfg.LogFlushInterval <= 0 {
105 cfg.LogFlushInterval = time.Second
106 }
107 if cfg.StepLogLimit <= 0 {
108 cfg.StepLogLimit = 10 * 1024 * 1024
109 }
110 if cfg.TimeoutMinute <= 0 {
111 cfg.TimeoutMinute = time.Minute
112 }
113 if cfg.Stdout == nil {
114 cfg.Stdout = io.Discard
115 }
116 if cfg.Stderr == nil {
117 cfg.Stderr = io.Discard
118 }
119 if cfg.Runner == nil {
120 cfg.Runner = ExecRunner{}
121 }
122 if cfg.SeccompProfile == "" {
123 cfg.SeccompProfile = defaultSeccompProfile
124 }
125 if cfg.User == "" {
126 cfg.User = defaultContainerUser
127 }
128 if cfg.PidsLimit <= 0 {
129 cfg.PidsLimit = defaultPidsLimit
130 }
131 if cfg.Logger == nil {
132 cfg.Logger = slog.New(slog.NewTextHandler(io.Discard, nil))
133 }
134 return &Docker{
135 cfg: cfg,
136 streams: make(map[int64]chan LogChunk),
137 eventSubs: make(map[int64]chan Event),
138 active: make(map[int64]string),
139 }
140 }
141
142 func (d *Docker) Execute(ctx context.Context, job Job) (Outcome, error) {
143 started := time.Now().UTC()
144 outcome := Outcome{Conclusion: ConclusionSuccess, StartedAt: started}
145 defer d.closeStream(job.ID)
146 defer d.closeEventStream(job.ID)
147 if job.TimeoutMinutes > 0 {
148 var cancel context.CancelFunc
149 ctx, cancel = context.WithTimeoutCause(ctx, time.Duration(job.TimeoutMinutes)*d.cfg.TimeoutMinute, ErrJobTimedOut)
150 defer cancel()
151 }
152 if err := os.MkdirAll(job.WorkspaceDir, 0o700); err != nil {
153 outcome.Conclusion = ConclusionFailure
154 outcome.CompletedAt = time.Now().UTC()
155 return outcome, fmt.Errorf("runner engine: prepare workspace: %w", err)
156 }
157 for _, step := range job.Steps {
158 stepStarted := time.Now().UTC()
159 if err := d.executeStep(ctx, job, step); err != nil {
160 stepCompleted := time.Now().UTC()
161 stepOutcome := StepOutcome{
162 StepID: step.ID,
163 Status: "completed",
164 Conclusion: conclusionForError(err),
165 StartedAt: stepStarted,
166 CompletedAt: stepCompleted,
167 }
168 outcome.StepOutcomes = append(outcome.StepOutcomes, stepOutcome)
169 if emitErr := d.emitStepOutcomeAfterRun(ctx, job.ID, stepOutcome); emitErr != nil {
170 outcome.Conclusion = conclusionForError(emitErr)
171 outcome.CompletedAt = time.Now().UTC()
172 return outcome, emitErr
173 }
174 if step.ContinueOnError && !errors.Is(err, ErrJobTimedOut) {
175 continue
176 }
177 outcome.Conclusion = conclusionForError(err)
178 outcome.CompletedAt = stepCompleted
179 return outcome, err
180 }
181 stepOutcome := StepOutcome{
182 StepID: step.ID,
183 Status: "completed",
184 Conclusion: ConclusionSuccess,
185 StartedAt: stepStarted,
186 CompletedAt: time.Now().UTC(),
187 }
188 outcome.StepOutcomes = append(outcome.StepOutcomes, stepOutcome)
189 if err := d.emitStepOutcomeAfterRun(ctx, job.ID, stepOutcome); err != nil {
190 outcome.Conclusion = conclusionForError(err)
191 outcome.CompletedAt = time.Now().UTC()
192 return outcome, err
193 }
194 }
195 outcome.CompletedAt = time.Now().UTC()
196 return outcome, nil
197 }
198
199 func (d *Docker) executeStep(ctx context.Context, job Job, step Step) error {
200 if uses := strings.TrimSpace(step.Uses); uses != "" {
201 switch uses {
202 case "actions/checkout@v4":
203 return d.executeCheckout(ctx, job, step)
204 default:
205 return fmt.Errorf("%w: %s is not executable until that alias lands", ErrUnsupportedUses, uses)
206 }
207 }
208 if strings.TrimSpace(step.Run) == "" {
209 return nil
210 }
211 invocation, err := d.dockerInvocation(job, step)
212 if err != nil {
213 return err
214 }
215 d.setActiveContainer(job.ID, invocation.containerName)
216 defer d.clearActiveContainer(job.ID, invocation.containerName)
217 d.logStep(ctx, "runner step starting", job, step, invocation, "")
218 writer := d.newStepLogWriter(ctx, job.ID, step.ID, job.MaskValues)
219 out := io.MultiWriter(d.cfg.Stdout, writer)
220 errOut := io.MultiWriter(d.cfg.Stderr, writer)
221 if err := d.cfg.Runner.Run(ctx, d.cfg.Binary, invocation.args, invocation.env, out, errOut); err != nil {
222 if isJobTimeout(ctx, err) {
223 killCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
224 killErr := d.killContainer(killCtx, invocation.containerName)
225 cancel()
226 if killErr != nil {
227 err = errors.Join(err, killErr)
228 }
229 err = fmt.Errorf("%w: %w", ErrJobTimedOut, err)
230 }
231 d.logStep(ctx, "runner step completed", job, step, invocation, conclusionForError(err))
232 if closeErr := writer.Close(); closeErr != nil {
233 return fmt.Errorf("runner engine: step %q failed: %w", stepLabel(step), errors.Join(err, closeErr))
234 }
235 return fmt.Errorf("runner engine: step %q failed: %w", stepLabel(step), err)
236 }
237 d.logStep(ctx, "runner step completed", job, step, invocation, ConclusionSuccess)
238 if err := writer.Close(); err != nil {
239 return fmt.Errorf("runner engine: flush step %q logs: %w", stepLabel(step), err)
240 }
241 return nil
242 }
243
244 func (d *Docker) executeCheckout(ctx context.Context, job Job, step Step) error {
245 checkoutURL, err := validateCheckoutURL(job.CheckoutURL)
246 if err != nil {
247 return err
248 }
249 if strings.TrimSpace(job.CheckoutToken) == "" {
250 return errors.New("runner engine: checkout token is required")
251 }
252 headSHA := strings.TrimSpace(job.HeadSHA)
253 if !gitObjectIDRE.MatchString(headSHA) {
254 return fmt.Errorf("runner engine: invalid checkout sha %q", headSHA)
255 }
256 if err := os.MkdirAll(job.WorkspaceDir, 0o700); err != nil {
257 return fmt.Errorf("runner engine: prepare checkout workspace: %w", err)
258 }
259 depthArgs, err := checkoutDepthArgs(step.With)
260 if err != nil {
261 return err
262 }
263 fetchTarget := headSHA
264
265 writer := d.newStepLogWriter(ctx, job.ID, step.ID, job.MaskValues)
266 out := io.MultiWriter(d.cfg.Stdout, writer)
267 errOut := io.MultiWriter(d.cfg.Stderr, writer)
268 closeWriter := func(runErr error) error {
269 if closeErr := writer.Close(); closeErr != nil {
270 if runErr != nil {
271 return errors.Join(runErr, closeErr)
272 }
273 return closeErr
274 }
275 return runErr
276 }
277
278 fmt.Fprintf(out, "Checking out %s at %s\n", checkoutURL, shortObjectID(headSHA))
279 gitEnv := []string{
280 "GIT_TERMINAL_PROMPT=0",
281 "GIT_ASKPASS=/bin/false",
282 "SHITHUB_CHECKOUT_TOKEN=" + job.CheckoutToken,
283 }
284 //nolint:gosec // G101: this helper reads the token from env; it does not hard-code or argv-expose a secret.
285 credentialHelper := `credential.helper=!f() { echo username=shithub-actions; echo password=$SHITHUB_CHECKOUT_TOKEN; }; f`
286 runGit := func(args ...string) error {
287 if err := d.cfg.Runner.Run(ctx, d.cfg.GitBinary, args, gitEnv, out, errOut); err != nil {
288 return fmt.Errorf("git %s: %w", strings.Join(redactCheckoutArgs(args), " "), err)
289 }
290 return nil
291 }
292
293 if err := runGit("-C", job.WorkspaceDir, "init"); err != nil {
294 return closeWriter(fmt.Errorf("runner engine: checkout step %q failed: %w", stepLabel(step), err))
295 }
296 if err := runGit("-C", job.WorkspaceDir, "remote", "add", "origin", checkoutURL); err != nil {
297 return closeWriter(fmt.Errorf("runner engine: checkout step %q failed: %w", stepLabel(step), err))
298 }
299 fetchArgs := []string{"-C", job.WorkspaceDir, "-c", credentialHelper, "fetch", "--no-tags"}
300 fetchArgs = append(fetchArgs, depthArgs...)
301 fetchArgs = append(fetchArgs, "origin", fetchTarget)
302 if err := runGit(fetchArgs...); err != nil {
303 return closeWriter(fmt.Errorf("runner engine: checkout step %q failed: %w", stepLabel(step), err))
304 }
305 if err := runGit("-C", job.WorkspaceDir, "checkout", "--force", "--detach", headSHA); err != nil {
306 return closeWriter(fmt.Errorf("runner engine: checkout step %q failed: %w", stepLabel(step), err))
307 }
308
309 var rev bytes.Buffer
310 if err := d.cfg.Runner.Run(ctx, d.cfg.GitBinary,
311 []string{"-C", job.WorkspaceDir, "rev-parse", "HEAD"},
312 gitEnv, io.MultiWriter(out, &rev), errOut); err != nil {
313 return closeWriter(fmt.Errorf("runner engine: checkout step %q failed: git rev-parse HEAD: %w", stepLabel(step), err))
314 }
315 if got := strings.TrimSpace(rev.String()); !strings.EqualFold(got, headSHA) {
316 return closeWriter(fmt.Errorf("runner engine: checkout step %q failed: HEAD %s != expected %s", stepLabel(step), got, headSHA))
317 }
318 fmt.Fprintf(out, "Checked out %s\n", shortObjectID(headSHA))
319 return closeWriter(nil)
320 }
321
322 type dockerInvocation struct {
323 args []string
324 env []string
325 containerName string
326 image string
327 network string
328 memory string
329 cpus string
330 user string
331 seccompProfile string
332 pidsLimit int
333 }
334
335 func (d *Docker) dockerInvocation(job Job, step Step) (dockerInvocation, error) {
336 workdir, err := containerWorkdir(step.WorkingDirectory)
337 if err != nil {
338 return dockerInvocation{}, err
339 }
340 image := strings.TrimSpace(job.Image)
341 if image == "" {
342 image = d.cfg.DefaultImage
343 }
344 if image == "" {
345 return dockerInvocation{}, errors.New("runner engine: image is required")
346 }
347 rendered, err := runnerexec.RenderStep(runnerexec.StepInput{
348 Run: step.Run,
349 JobEnv: job.Env,
350 StepEnv: step.Env,
351 Context: expressionContext(job),
352 })
353 if err != nil {
354 return dockerInvocation{}, fmt.Errorf("runner engine: render step %q: %w", stepLabel(step), err)
355 }
356 user := d.cfg.User
357 if d.cfg.AllowRoot && permissionsRequestRoot(job.Permissions) {
358 user = "0:0"
359 }
360 containerName := dockerContainerName(job, step)
361 args := []string{
362 "run",
363 "--rm",
364 "--name", containerName,
365 "--network=" + d.cfg.Network,
366 "--memory=" + d.cfg.Memory,
367 "--cpus=" + d.cfg.CPUs,
368 "--pids-limit=" + strconv.Itoa(d.cfg.PidsLimit),
369 "--read-only",
370 "--tmpfs", "/tmp:rw,exec,nosuid,nodev,size=1g",
371 "--cap-drop=ALL",
372 "--cap-add=DAC_OVERRIDE",
373 "--cap-add=SETGID",
374 "--cap-add=SETUID",
375 "--security-opt=no-new-privileges",
376 "--security-opt=seccomp=" + d.cfg.SeccompProfile,
377 "--ulimit", "nofile=" + defaultNofileLimit,
378 "--ulimit", "nproc=" + defaultNprocLimit,
379 "--user", user,
380 "--workdir=" + workdir,
381 "--mount", "type=bind,src=" + job.WorkspaceDir + ",dst=/workspace,rw",
382 }
383 for _, dns := range d.cfg.DNSServers {
384 dns = strings.TrimSpace(dns)
385 if dns != "" {
386 args = append(args, "--dns", dns)
387 }
388 }
389 env, err := validateEnv(rendered.Env)
390 if err != nil {
391 return dockerInvocation{}, err
392 }
393 processEnv := make([]string, 0, len(env))
394 for _, key := range sortedKeys(env) {
395 args = append(args, "--env", key)
396 processEnv = append(processEnv, key+"="+env[key])
397 }
398 args = append(args, image, "bash", "-c", rendered.Run)
399 return dockerInvocation{
400 args: args,
401 env: processEnv,
402 containerName: containerName,
403 image: image,
404 network: d.cfg.Network,
405 memory: d.cfg.Memory,
406 cpus: d.cfg.CPUs,
407 user: user,
408 seccompProfile: d.cfg.SeccompProfile,
409 pidsLimit: d.cfg.PidsLimit,
410 }, nil
411 }
412
413 func (d *Docker) logStep(ctx context.Context, msg string, job Job, step Step, invocation dockerInvocation, conclusion string) {
414 attrs := []any{
415 "run_id", job.RunID,
416 "job_id", job.ID,
417 "step_id", step.ID,
418 "image", invocation.image,
419 "network", invocation.network,
420 "cpu_limit", invocation.cpus,
421 "memory_limit", invocation.memory,
422 "pids_limit", invocation.pidsLimit,
423 "container_user", invocation.user,
424 "seccomp_profile", invocation.seccompProfile,
425 }
426 if conclusion != "" {
427 attrs = append(attrs, "conclusion", conclusion)
428 }
429 d.cfg.Logger.InfoContext(ctx, msg, attrs...)
430 }
431
432 func expressionContext(job Job) expr.Context {
433 event := job.EventPayload
434 if len(event) == 0 && strings.TrimSpace(job.Event) != "" && json.Valid([]byte(job.Event)) {
435 _ = json.Unmarshal([]byte(job.Event), &event)
436 }
437 return expr.Context{
438 Secrets: job.Secrets,
439 Shithub: expr.ShithubContext{
440 Event: event,
441 RunID: fmt.Sprintf("%d", job.RunID),
442 SHA: job.HeadSHA,
443 Ref: job.HeadRef,
444 },
445 Untrusted: expr.DefaultUntrusted(),
446 }
447 }
448
449 func permissionsRequestRoot(raw json.RawMessage) bool {
450 if len(raw) == 0 || !json.Valid(raw) {
451 return false
452 }
453 var shaped struct {
454 Per map[string]string `json:"per"`
455 }
456 if err := json.Unmarshal(raw, &shaped); err == nil && strings.EqualFold(shaped.Per[rootPermissionKey], "write") {
457 return true
458 }
459 var flat map[string]string
460 if err := json.Unmarshal(raw, &flat); err != nil {
461 return false
462 }
463 return strings.EqualFold(flat[rootPermissionKey], "write")
464 }
465
466 var gitObjectIDRE = regexp.MustCompile(`^[0-9a-fA-F]{40,64}$`)
467
468 func validateCheckoutURL(raw string) (string, error) {
469 raw = strings.TrimSpace(raw)
470 if raw == "" {
471 return "", errors.New("runner engine: checkout url is required")
472 }
473 u, err := url.Parse(raw)
474 if err != nil || u.Scheme == "" || u.Host == "" {
475 return "", fmt.Errorf("runner engine: invalid checkout url %q", raw)
476 }
477 if u.User != nil {
478 return "", errors.New("runner engine: checkout url must not contain credentials")
479 }
480 switch strings.ToLower(u.Scheme) {
481 case "http", "https":
482 default:
483 return "", fmt.Errorf("runner engine: checkout url scheme %q is not allowed", u.Scheme)
484 }
485 return u.String(), nil
486 }
487
488 func checkoutDepthArgs(with map[string]string) ([]string, error) {
489 for key := range with {
490 if key != "fetch-depth" {
491 return nil, fmt.Errorf("runner engine: unsupported checkout input %q", key)
492 }
493 }
494 raw := strings.TrimSpace(with["fetch-depth"])
495 if raw == "" {
496 return []string{"--depth=1"}, nil
497 }
498 depth, err := strconv.Atoi(raw)
499 if err != nil || depth < 0 || depth > 100000 {
500 return nil, fmt.Errorf("runner engine: invalid checkout fetch-depth %q", raw)
501 }
502 if depth == 0 {
503 return nil, nil
504 }
505 return []string{"--depth=" + strconv.Itoa(depth)}, nil
506 }
507
508 func shortObjectID(oid string) string {
509 if len(oid) <= 12 {
510 return oid
511 }
512 return oid[:12]
513 }
514
515 func redactCheckoutArgs(args []string) []string {
516 out := append([]string{}, args...)
517 for i, arg := range out {
518 if strings.Contains(arg, "SHITHUB_CHECKOUT_TOKEN") {
519 out[i] = "credential.helper=<redacted>"
520 }
521 }
522 return out
523 }
524
525 func (d *Docker) StreamLogs(_ context.Context, jobID int64) (<-chan LogChunk, error) {
526 return d.ensureStream(jobID), nil
527 }
528
529 func (d *Docker) StreamEvents(_ context.Context, jobID int64) (<-chan Event, error) {
530 return d.ensureEventStream(jobID), nil
531 }
532
533 func (d *Docker) Cancel(ctx context.Context, jobID int64) error {
534 name := d.activeContainer(jobID)
535 if name == "" {
536 return nil
537 }
538 killCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
539 defer cancel()
540 return d.killContainer(killCtx, name)
541 }
542
543 func (d *Docker) killContainer(ctx context.Context, name string) error {
544 if err := d.cfg.Runner.Run(ctx, d.cfg.Binary, []string{"kill", name}, nil, d.cfg.Stdout, d.cfg.Stderr); err != nil {
545 return fmt.Errorf("runner engine: kill container %s: %w", name, err)
546 }
547 return nil
548 }
549
550 func (d *Docker) setActiveContainer(jobID int64, name string) {
551 if name == "" {
552 return
553 }
554 d.mu.Lock()
555 d.active[jobID] = name
556 d.mu.Unlock()
557 }
558
559 func (d *Docker) clearActiveContainer(jobID int64, name string) {
560 d.mu.Lock()
561 if d.active[jobID] == name {
562 delete(d.active, jobID)
563 }
564 d.mu.Unlock()
565 }
566
567 func (d *Docker) activeContainer(jobID int64) string {
568 d.mu.Lock()
569 defer d.mu.Unlock()
570 return d.active[jobID]
571 }
572
573 func (d *Docker) ensureStream(jobID int64) chan LogChunk {
574 d.mu.Lock()
575 defer d.mu.Unlock()
576 if ch, ok := d.streams[jobID]; ok {
577 return ch
578 }
579 ch := make(chan LogChunk, 128)
580 d.streams[jobID] = ch
581 return ch
582 }
583
584 func (d *Docker) closeStream(jobID int64) {
585 d.mu.Lock()
586 ch, ok := d.streams[jobID]
587 if ok {
588 delete(d.streams, jobID)
589 }
590 d.mu.Unlock()
591 if ok {
592 close(ch)
593 }
594 }
595
596 func (d *Docker) logStream(jobID int64) chan LogChunk {
597 d.mu.Lock()
598 defer d.mu.Unlock()
599 return d.streams[jobID]
600 }
601
602 func (d *Docker) ensureEventStream(jobID int64) chan Event {
603 d.mu.Lock()
604 defer d.mu.Unlock()
605 if ch, ok := d.eventSubs[jobID]; ok {
606 return ch
607 }
608 ch := make(chan Event, 128)
609 d.eventSubs[jobID] = ch
610 return ch
611 }
612
613 func (d *Docker) closeEventStream(jobID int64) {
614 d.mu.Lock()
615 ch, ok := d.eventSubs[jobID]
616 if ok {
617 delete(d.eventSubs, jobID)
618 }
619 d.mu.Unlock()
620 if ok {
621 close(ch)
622 }
623 }
624
625 func (d *Docker) eventStream(jobID int64) chan Event {
626 d.mu.Lock()
627 defer d.mu.Unlock()
628 return d.eventSubs[jobID]
629 }
630
631 func (d *Docker) emitStepOutcome(ctx context.Context, jobID int64, step StepOutcome) error {
632 ch := d.eventStream(jobID)
633 if ch == nil {
634 return nil
635 }
636 copied := step
637 select {
638 case <-ctx.Done():
639 return ctx.Err()
640 case ch <- Event{Step: &copied}:
641 return nil
642 }
643 }
644
645 func (d *Docker) emitStepOutcomeAfterRun(ctx context.Context, jobID int64, step StepOutcome) error {
646 if ctx.Err() == nil {
647 return d.emitStepOutcome(ctx, jobID, step)
648 }
649 emitCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
650 defer cancel()
651 return d.emitStepOutcome(emitCtx, jobID, step)
652 }
653
654 func (d *Docker) newStepLogWriter(ctx context.Context, jobID, stepID int64, jobMasks []string) *stepLogWriter {
655 w := &stepLogWriter{
656 ctx: ctx,
657 ch: d.logStream(jobID),
658 events: d.eventStream(jobID),
659 jobID: jobID,
660 stepID: stepID,
661 maxChunk: d.cfg.LogChunkBytes,
662 interval: d.cfg.LogFlushInterval,
663 limit: d.cfg.StepLogLimit,
664 masker: scrub.New(append(append([]string{}, d.cfg.MaskValues...), jobMasks...)),
665 done: make(chan struct{}),
666 }
667 go w.flushLoop()
668 return w
669 }
670
671 type stepLogWriter struct {
672 ctx context.Context
673 ch chan<- LogChunk
674 events chan<- Event
675 jobID int64
676 stepID int64
677 seq int32
678 maxChunk int
679 interval time.Duration
680 limit int64
681 written int64
682 truncated bool
683 masker *scrub.Scrubber
684 buf []byte
685 done chan struct{}
686 once sync.Once
687 mu sync.Mutex
688 closed bool
689 }
690
691 func (w *stepLogWriter) Write(p []byte) (int, error) {
692 w.mu.Lock()
693 defer w.mu.Unlock()
694 if w.closed {
695 return 0, io.ErrClosedPipe
696 }
697 w.appendWithinLimit(p)
698 for len(w.buf) >= w.maxChunk {
699 if err := w.emitLocked(w.buf[:w.maxChunk]); err != nil {
700 return 0, err
701 }
702 w.buf = w.buf[w.maxChunk:]
703 }
704 return len(p), nil
705 }
706
707 func (w *stepLogWriter) Close() error {
708 w.once.Do(func() {
709 close(w.done)
710 w.mu.Lock()
711 defer w.mu.Unlock()
712 _ = w.flushLocked()
713 _ = w.flushMaskerLocked()
714 w.closed = true
715 })
716 return nil
717 }
718
719 func (w *stepLogWriter) appendWithinLimit(p []byte) {
720 if w.limit <= 0 {
721 w.buf = append(w.buf, p...)
722 return
723 }
724 remaining := w.limit - w.written
725 if remaining > 0 {
726 if int64(len(p)) <= remaining {
727 w.buf = append(w.buf, p...)
728 w.written += int64(len(p))
729 return
730 }
731 w.buf = append(w.buf, p[:int(remaining)]...)
732 w.written += remaining
733 }
734 if !w.truncated {
735 w.buf = append(w.buf, []byte("\n[shithub-runner: step log truncated after 10 MiB]\n")...)
736 w.truncated = true
737 }
738 }
739
740 func (w *stepLogWriter) flushLoop() {
741 ticker := time.NewTicker(w.interval)
742 defer ticker.Stop()
743 for {
744 select {
745 case <-w.done:
746 return
747 case <-w.ctx.Done():
748 return
749 case <-ticker.C:
750 w.mu.Lock()
751 _ = w.flushLocked()
752 w.mu.Unlock()
753 }
754 }
755 }
756
757 func (w *stepLogWriter) flushLocked() error {
758 if len(w.buf) == 0 {
759 return nil
760 }
761 if err := w.emitLocked(w.buf); err != nil {
762 return err
763 }
764 w.buf = nil
765 return nil
766 }
767
768 func (w *stepLogWriter) emitLocked(chunk []byte) error {
769 if w.masker != nil {
770 chunk = w.masker.Scrub(chunk)
771 if len(chunk) == 0 {
772 return nil
773 }
774 }
775 return w.emitChunkLocked(chunk)
776 }
777
778 func (w *stepLogWriter) flushMaskerLocked() error {
779 if w.masker == nil {
780 return nil
781 }
782 chunk := w.masker.Flush()
783 if len(chunk) == 0 {
784 return nil
785 }
786 return w.emitChunkLocked(chunk)
787 }
788
789 func (w *stepLogWriter) emitChunkLocked(chunk []byte) error {
790 copied := LogChunk{JobID: w.jobID, StepID: w.stepID, Seq: w.seq, Chunk: append([]byte(nil), chunk...)}
791 if w.ch != nil {
792 select {
793 case <-w.ctx.Done():
794 return w.ctx.Err()
795 case w.ch <- copied:
796 }
797 }
798 if w.events != nil {
799 eventChunk := copied
800 select {
801 case <-w.ctx.Done():
802 return w.ctx.Err()
803 case w.events <- Event{Log: &eventChunk}:
804 }
805 }
806 w.seq++
807 return nil
808 }
809
810 func conclusionForError(err error) string {
811 if errors.Is(err, ErrJobTimedOut) {
812 return ConclusionTimedOut
813 }
814 if errors.Is(err, context.Canceled) {
815 return ConclusionCancelled
816 }
817 return ConclusionFailure
818 }
819
820 func isJobTimeout(ctx context.Context, err error) bool {
821 if errors.Is(err, ErrJobTimedOut) {
822 return true
823 }
824 if !errors.Is(err, context.DeadlineExceeded) {
825 return false
826 }
827 return errors.Is(context.Cause(ctx), ErrJobTimedOut)
828 }
829
830 func containerWorkdir(wd string) (string, error) {
831 wd = strings.TrimSpace(wd)
832 if wd == "" {
833 return "/workspace", nil
834 }
835 if strings.HasPrefix(wd, "/") {
836 return "", fmt.Errorf("runner engine: working-directory must be relative, got %q", wd)
837 }
838 clean := path.Clean("/workspace/" + wd)
839 if clean != "/workspace" && !strings.HasPrefix(clean, "/workspace/") {
840 return "", fmt.Errorf("runner engine: working-directory escapes workspace: %q", wd)
841 }
842 return clean, nil
843 }
844
845 func dockerContainerName(job Job, step Step) string {
846 stepID := step.ID
847 if stepID == 0 {
848 stepID = int64(step.Index)
849 }
850 return fmt.Sprintf("shithub-job-%d-step-%d", job.ID, stepID)
851 }
852
853 var envNameRE = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`)
854
855 func validateEnv(env map[string]string) (map[string]string, error) {
856 out := make(map[string]string, len(env))
857 for k, v := range env {
858 if !envNameRE.MatchString(k) {
859 return nil, fmt.Errorf("runner engine: invalid env name %q", k)
860 }
861 if strings.ContainsRune(v, '\x00') {
862 return nil, fmt.Errorf("runner engine: invalid env value for %q", k)
863 }
864 out[k] = v
865 }
866 return out, nil
867 }
868
869 func sortedKeys(m map[string]string) []string {
870 keys := make([]string, 0, len(m))
871 for k := range m {
872 keys = append(keys, k)
873 }
874 sort.Strings(keys)
875 return keys
876 }
877
878 func stepLabel(step Step) string {
879 if step.Name != "" {
880 return step.Name
881 }
882 if step.StepID != "" {
883 return step.StepID
884 }
885 return fmt.Sprintf("#%d", step.Index)
886 }
887