| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | package engine |
| 4 | |
| 5 | import ( |
| 6 | "bytes" |
| 7 | "context" |
| 8 | "encoding/json" |
| 9 | "errors" |
| 10 | "fmt" |
| 11 | "io" |
| 12 | "log/slog" |
| 13 | "net/url" |
| 14 | "os" |
| 15 | "os/exec" |
| 16 | "path" |
| 17 | "regexp" |
| 18 | "sort" |
| 19 | "strconv" |
| 20 | "strings" |
| 21 | "sync" |
| 22 | "time" |
| 23 | |
| 24 | "github.com/tenseleyFlow/shithub/internal/actions/expr" |
| 25 | runnerexec "github.com/tenseleyFlow/shithub/internal/runner/exec" |
| 26 | "github.com/tenseleyFlow/shithub/internal/runner/scrub" |
| 27 | ) |
| 28 | |
| 29 | var ( |
| 30 | ErrUnsupportedUses = errors.New("runner engine: unsupported uses step") |
| 31 | ErrUnsupported = errors.New("runner engine: unsupported operation") |
| 32 | ) |
| 33 | |
| 34 | const ( |
| 35 | defaultSeccompProfile = "/etc/shithubd-runner/seccomp.json" |
| 36 | defaultContainerUser = "65534:65534" |
| 37 | defaultPidsLimit = 512 |
| 38 | defaultNofileLimit = "4096:4096" |
| 39 | defaultNprocLimit = "512:512" |
| 40 | |
| 41 | // rootPermissionKey is an intentionally shithub-specific escape hatch. |
| 42 | // It requires an explicit per-job permissions entry rather than treating |
| 43 | // broad write-all permissions as permission to run the container as root. |
| 44 | rootPermissionKey = "shithub-runner-root" |
| 45 | ) |
| 46 | |
| 47 | type CommandRunner interface { |
| 48 | Run(ctx context.Context, name string, args []string, env []string, stdout, stderr io.Writer) error |
| 49 | } |
| 50 | |
| 51 | type ExecRunner struct{} |
| 52 | |
| 53 | func (ExecRunner) Run(ctx context.Context, name string, args []string, env []string, stdout, stderr io.Writer) error { |
| 54 | cmd := exec.CommandContext(ctx, name, args...) |
| 55 | if len(env) > 0 { |
| 56 | cmd.Env = append(os.Environ(), env...) |
| 57 | } |
| 58 | cmd.Stdout = stdout |
| 59 | cmd.Stderr = stderr |
| 60 | return cmd.Run() |
| 61 | } |
| 62 | |
| 63 | type DockerConfig struct { |
| 64 | Binary string |
| 65 | GitBinary string |
| 66 | DefaultImage string |
| 67 | Network string |
| 68 | Memory string |
| 69 | CPUs string |
| 70 | SeccompProfile string |
| 71 | User string |
| 72 | PidsLimit int |
| 73 | DNSServers []string |
| 74 | LogChunkBytes int |
| 75 | LogFlushInterval time.Duration |
| 76 | StepLogLimit int64 |
| 77 | TimeoutMinute time.Duration |
| 78 | Stdout io.Writer |
| 79 | Stderr io.Writer |
| 80 | Runner CommandRunner |
| 81 | MaskValues []string |
| 82 | AllowRoot bool |
| 83 | Logger *slog.Logger |
| 84 | } |
| 85 | |
| 86 | type Docker struct { |
| 87 | cfg DockerConfig |
| 88 | streams map[int64]chan LogChunk |
| 89 | eventSubs map[int64]chan Event |
| 90 | active map[int64]string |
| 91 | mu sync.Mutex |
| 92 | } |
| 93 | |
| 94 | func NewDocker(cfg DockerConfig) *Docker { |
| 95 | if cfg.Binary == "" { |
| 96 | cfg.Binary = "docker" |
| 97 | } |
| 98 | if cfg.GitBinary == "" { |
| 99 | cfg.GitBinary = "git" |
| 100 | } |
| 101 | if cfg.LogChunkBytes <= 0 { |
| 102 | cfg.LogChunkBytes = 4 * 1024 |
| 103 | } |
| 104 | if cfg.LogFlushInterval <= 0 { |
| 105 | cfg.LogFlushInterval = time.Second |
| 106 | } |
| 107 | if cfg.StepLogLimit <= 0 { |
| 108 | cfg.StepLogLimit = 10 * 1024 * 1024 |
| 109 | } |
| 110 | if cfg.TimeoutMinute <= 0 { |
| 111 | cfg.TimeoutMinute = time.Minute |
| 112 | } |
| 113 | if cfg.Stdout == nil { |
| 114 | cfg.Stdout = io.Discard |
| 115 | } |
| 116 | if cfg.Stderr == nil { |
| 117 | cfg.Stderr = io.Discard |
| 118 | } |
| 119 | if cfg.Runner == nil { |
| 120 | cfg.Runner = ExecRunner{} |
| 121 | } |
| 122 | if cfg.SeccompProfile == "" { |
| 123 | cfg.SeccompProfile = defaultSeccompProfile |
| 124 | } |
| 125 | if cfg.User == "" { |
| 126 | cfg.User = defaultContainerUser |
| 127 | } |
| 128 | if cfg.PidsLimit <= 0 { |
| 129 | cfg.PidsLimit = defaultPidsLimit |
| 130 | } |
| 131 | if cfg.Logger == nil { |
| 132 | cfg.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) |
| 133 | } |
| 134 | return &Docker{ |
| 135 | cfg: cfg, |
| 136 | streams: make(map[int64]chan LogChunk), |
| 137 | eventSubs: make(map[int64]chan Event), |
| 138 | active: make(map[int64]string), |
| 139 | } |
| 140 | } |
| 141 | |
| 142 | func (d *Docker) Execute(ctx context.Context, job Job) (Outcome, error) { |
| 143 | started := time.Now().UTC() |
| 144 | outcome := Outcome{Conclusion: ConclusionSuccess, StartedAt: started} |
| 145 | defer d.closeStream(job.ID) |
| 146 | defer d.closeEventStream(job.ID) |
| 147 | if job.TimeoutMinutes > 0 { |
| 148 | var cancel context.CancelFunc |
| 149 | ctx, cancel = context.WithTimeoutCause(ctx, time.Duration(job.TimeoutMinutes)*d.cfg.TimeoutMinute, ErrJobTimedOut) |
| 150 | defer cancel() |
| 151 | } |
| 152 | if err := os.MkdirAll(job.WorkspaceDir, 0o700); err != nil { |
| 153 | outcome.Conclusion = ConclusionFailure |
| 154 | outcome.CompletedAt = time.Now().UTC() |
| 155 | return outcome, fmt.Errorf("runner engine: prepare workspace: %w", err) |
| 156 | } |
| 157 | for _, step := range job.Steps { |
| 158 | stepStarted := time.Now().UTC() |
| 159 | if err := d.executeStep(ctx, job, step); err != nil { |
| 160 | stepCompleted := time.Now().UTC() |
| 161 | stepOutcome := StepOutcome{ |
| 162 | StepID: step.ID, |
| 163 | Status: "completed", |
| 164 | Conclusion: conclusionForError(err), |
| 165 | StartedAt: stepStarted, |
| 166 | CompletedAt: stepCompleted, |
| 167 | } |
| 168 | outcome.StepOutcomes = append(outcome.StepOutcomes, stepOutcome) |
| 169 | if emitErr := d.emitStepOutcomeAfterRun(ctx, job.ID, stepOutcome); emitErr != nil { |
| 170 | outcome.Conclusion = conclusionForError(emitErr) |
| 171 | outcome.CompletedAt = time.Now().UTC() |
| 172 | return outcome, emitErr |
| 173 | } |
| 174 | if step.ContinueOnError && !errors.Is(err, ErrJobTimedOut) { |
| 175 | continue |
| 176 | } |
| 177 | outcome.Conclusion = conclusionForError(err) |
| 178 | outcome.CompletedAt = stepCompleted |
| 179 | return outcome, err |
| 180 | } |
| 181 | stepOutcome := StepOutcome{ |
| 182 | StepID: step.ID, |
| 183 | Status: "completed", |
| 184 | Conclusion: ConclusionSuccess, |
| 185 | StartedAt: stepStarted, |
| 186 | CompletedAt: time.Now().UTC(), |
| 187 | } |
| 188 | outcome.StepOutcomes = append(outcome.StepOutcomes, stepOutcome) |
| 189 | if err := d.emitStepOutcomeAfterRun(ctx, job.ID, stepOutcome); err != nil { |
| 190 | outcome.Conclusion = conclusionForError(err) |
| 191 | outcome.CompletedAt = time.Now().UTC() |
| 192 | return outcome, err |
| 193 | } |
| 194 | } |
| 195 | outcome.CompletedAt = time.Now().UTC() |
| 196 | return outcome, nil |
| 197 | } |
| 198 | |
| 199 | func (d *Docker) executeStep(ctx context.Context, job Job, step Step) error { |
| 200 | if uses := strings.TrimSpace(step.Uses); uses != "" { |
| 201 | switch uses { |
| 202 | case "actions/checkout@v4": |
| 203 | return d.executeCheckout(ctx, job, step) |
| 204 | default: |
| 205 | return fmt.Errorf("%w: %s is not executable until that alias lands", ErrUnsupportedUses, uses) |
| 206 | } |
| 207 | } |
| 208 | if strings.TrimSpace(step.Run) == "" { |
| 209 | return nil |
| 210 | } |
| 211 | invocation, err := d.dockerInvocation(job, step) |
| 212 | if err != nil { |
| 213 | return err |
| 214 | } |
| 215 | d.setActiveContainer(job.ID, invocation.containerName) |
| 216 | defer d.clearActiveContainer(job.ID, invocation.containerName) |
| 217 | d.logStep(ctx, "runner step starting", job, step, invocation, "") |
| 218 | writer := d.newStepLogWriter(ctx, job.ID, step.ID, job.MaskValues) |
| 219 | out := io.MultiWriter(d.cfg.Stdout, writer) |
| 220 | errOut := io.MultiWriter(d.cfg.Stderr, writer) |
| 221 | if err := d.cfg.Runner.Run(ctx, d.cfg.Binary, invocation.args, invocation.env, out, errOut); err != nil { |
| 222 | if isJobTimeout(ctx, err) { |
| 223 | killCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) |
| 224 | killErr := d.killContainer(killCtx, invocation.containerName) |
| 225 | cancel() |
| 226 | if killErr != nil { |
| 227 | err = errors.Join(err, killErr) |
| 228 | } |
| 229 | err = fmt.Errorf("%w: %w", ErrJobTimedOut, err) |
| 230 | } |
| 231 | d.logStep(ctx, "runner step completed", job, step, invocation, conclusionForError(err)) |
| 232 | if closeErr := writer.Close(); closeErr != nil { |
| 233 | return fmt.Errorf("runner engine: step %q failed: %w", stepLabel(step), errors.Join(err, closeErr)) |
| 234 | } |
| 235 | return fmt.Errorf("runner engine: step %q failed: %w", stepLabel(step), err) |
| 236 | } |
| 237 | d.logStep(ctx, "runner step completed", job, step, invocation, ConclusionSuccess) |
| 238 | if err := writer.Close(); err != nil { |
| 239 | return fmt.Errorf("runner engine: flush step %q logs: %w", stepLabel(step), err) |
| 240 | } |
| 241 | return nil |
| 242 | } |
| 243 | |
| 244 | func (d *Docker) executeCheckout(ctx context.Context, job Job, step Step) error { |
| 245 | checkoutURL, err := validateCheckoutURL(job.CheckoutURL) |
| 246 | if err != nil { |
| 247 | return err |
| 248 | } |
| 249 | if strings.TrimSpace(job.CheckoutToken) == "" { |
| 250 | return errors.New("runner engine: checkout token is required") |
| 251 | } |
| 252 | headSHA := strings.TrimSpace(job.HeadSHA) |
| 253 | if !gitObjectIDRE.MatchString(headSHA) { |
| 254 | return fmt.Errorf("runner engine: invalid checkout sha %q", headSHA) |
| 255 | } |
| 256 | if err := os.MkdirAll(job.WorkspaceDir, 0o700); err != nil { |
| 257 | return fmt.Errorf("runner engine: prepare checkout workspace: %w", err) |
| 258 | } |
| 259 | depthArgs, err := checkoutDepthArgs(step.With) |
| 260 | if err != nil { |
| 261 | return err |
| 262 | } |
| 263 | fetchTarget := headSHA |
| 264 | |
| 265 | writer := d.newStepLogWriter(ctx, job.ID, step.ID, job.MaskValues) |
| 266 | out := io.MultiWriter(d.cfg.Stdout, writer) |
| 267 | errOut := io.MultiWriter(d.cfg.Stderr, writer) |
| 268 | closeWriter := func(runErr error) error { |
| 269 | if closeErr := writer.Close(); closeErr != nil { |
| 270 | if runErr != nil { |
| 271 | return errors.Join(runErr, closeErr) |
| 272 | } |
| 273 | return closeErr |
| 274 | } |
| 275 | return runErr |
| 276 | } |
| 277 | |
| 278 | fmt.Fprintf(out, "Checking out %s at %s\n", checkoutURL, shortObjectID(headSHA)) |
| 279 | gitEnv := []string{ |
| 280 | "GIT_TERMINAL_PROMPT=0", |
| 281 | "GIT_ASKPASS=/bin/false", |
| 282 | "SHITHUB_CHECKOUT_TOKEN=" + job.CheckoutToken, |
| 283 | } |
| 284 | //nolint:gosec // G101: this helper reads the token from env; it does not hard-code or argv-expose a secret. |
| 285 | credentialHelper := `credential.helper=!f() { echo username=shithub-actions; echo password=$SHITHUB_CHECKOUT_TOKEN; }; f` |
| 286 | runGit := func(args ...string) error { |
| 287 | if err := d.cfg.Runner.Run(ctx, d.cfg.GitBinary, args, gitEnv, out, errOut); err != nil { |
| 288 | return fmt.Errorf("git %s: %w", strings.Join(redactCheckoutArgs(args), " "), err) |
| 289 | } |
| 290 | return nil |
| 291 | } |
| 292 | |
| 293 | if err := runGit("-C", job.WorkspaceDir, "init"); err != nil { |
| 294 | return closeWriter(fmt.Errorf("runner engine: checkout step %q failed: %w", stepLabel(step), err)) |
| 295 | } |
| 296 | if err := runGit("-C", job.WorkspaceDir, "remote", "add", "origin", checkoutURL); err != nil { |
| 297 | return closeWriter(fmt.Errorf("runner engine: checkout step %q failed: %w", stepLabel(step), err)) |
| 298 | } |
| 299 | fetchArgs := []string{"-C", job.WorkspaceDir, "-c", credentialHelper, "fetch", "--no-tags"} |
| 300 | fetchArgs = append(fetchArgs, depthArgs...) |
| 301 | fetchArgs = append(fetchArgs, "origin", fetchTarget) |
| 302 | if err := runGit(fetchArgs...); err != nil { |
| 303 | return closeWriter(fmt.Errorf("runner engine: checkout step %q failed: %w", stepLabel(step), err)) |
| 304 | } |
| 305 | if err := runGit("-C", job.WorkspaceDir, "checkout", "--force", "--detach", headSHA); err != nil { |
| 306 | return closeWriter(fmt.Errorf("runner engine: checkout step %q failed: %w", stepLabel(step), err)) |
| 307 | } |
| 308 | |
| 309 | var rev bytes.Buffer |
| 310 | if err := d.cfg.Runner.Run(ctx, d.cfg.GitBinary, |
| 311 | []string{"-C", job.WorkspaceDir, "rev-parse", "HEAD"}, |
| 312 | gitEnv, io.MultiWriter(out, &rev), errOut); err != nil { |
| 313 | return closeWriter(fmt.Errorf("runner engine: checkout step %q failed: git rev-parse HEAD: %w", stepLabel(step), err)) |
| 314 | } |
| 315 | if got := strings.TrimSpace(rev.String()); !strings.EqualFold(got, headSHA) { |
| 316 | return closeWriter(fmt.Errorf("runner engine: checkout step %q failed: HEAD %s != expected %s", stepLabel(step), got, headSHA)) |
| 317 | } |
| 318 | fmt.Fprintf(out, "Checked out %s\n", shortObjectID(headSHA)) |
| 319 | return closeWriter(nil) |
| 320 | } |
| 321 | |
| 322 | type dockerInvocation struct { |
| 323 | args []string |
| 324 | env []string |
| 325 | containerName string |
| 326 | image string |
| 327 | network string |
| 328 | memory string |
| 329 | cpus string |
| 330 | user string |
| 331 | seccompProfile string |
| 332 | pidsLimit int |
| 333 | } |
| 334 | |
| 335 | func (d *Docker) dockerInvocation(job Job, step Step) (dockerInvocation, error) { |
| 336 | workdir, err := containerWorkdir(step.WorkingDirectory) |
| 337 | if err != nil { |
| 338 | return dockerInvocation{}, err |
| 339 | } |
| 340 | image := strings.TrimSpace(job.Image) |
| 341 | if image == "" { |
| 342 | image = d.cfg.DefaultImage |
| 343 | } |
| 344 | if image == "" { |
| 345 | return dockerInvocation{}, errors.New("runner engine: image is required") |
| 346 | } |
| 347 | rendered, err := runnerexec.RenderStep(runnerexec.StepInput{ |
| 348 | Run: step.Run, |
| 349 | JobEnv: job.Env, |
| 350 | StepEnv: step.Env, |
| 351 | Context: expressionContext(job), |
| 352 | }) |
| 353 | if err != nil { |
| 354 | return dockerInvocation{}, fmt.Errorf("runner engine: render step %q: %w", stepLabel(step), err) |
| 355 | } |
| 356 | user := d.cfg.User |
| 357 | if d.cfg.AllowRoot && permissionsRequestRoot(job.Permissions) { |
| 358 | user = "0:0" |
| 359 | } |
| 360 | containerName := dockerContainerName(job, step) |
| 361 | args := []string{ |
| 362 | "run", |
| 363 | "--rm", |
| 364 | "--name", containerName, |
| 365 | "--network=" + d.cfg.Network, |
| 366 | "--memory=" + d.cfg.Memory, |
| 367 | "--cpus=" + d.cfg.CPUs, |
| 368 | "--pids-limit=" + strconv.Itoa(d.cfg.PidsLimit), |
| 369 | "--read-only", |
| 370 | "--tmpfs", "/tmp:rw,exec,nosuid,nodev,size=1g", |
| 371 | "--cap-drop=ALL", |
| 372 | "--cap-add=DAC_OVERRIDE", |
| 373 | "--cap-add=SETGID", |
| 374 | "--cap-add=SETUID", |
| 375 | "--security-opt=no-new-privileges", |
| 376 | "--security-opt=seccomp=" + d.cfg.SeccompProfile, |
| 377 | "--ulimit", "nofile=" + defaultNofileLimit, |
| 378 | "--ulimit", "nproc=" + defaultNprocLimit, |
| 379 | "--user", user, |
| 380 | "--workdir=" + workdir, |
| 381 | "--mount", "type=bind,src=" + job.WorkspaceDir + ",dst=/workspace,rw", |
| 382 | } |
| 383 | for _, dns := range d.cfg.DNSServers { |
| 384 | dns = strings.TrimSpace(dns) |
| 385 | if dns != "" { |
| 386 | args = append(args, "--dns", dns) |
| 387 | } |
| 388 | } |
| 389 | env, err := validateEnv(rendered.Env) |
| 390 | if err != nil { |
| 391 | return dockerInvocation{}, err |
| 392 | } |
| 393 | processEnv := make([]string, 0, len(env)) |
| 394 | for _, key := range sortedKeys(env) { |
| 395 | args = append(args, "--env", key) |
| 396 | processEnv = append(processEnv, key+"="+env[key]) |
| 397 | } |
| 398 | args = append(args, image, "bash", "-c", rendered.Run) |
| 399 | return dockerInvocation{ |
| 400 | args: args, |
| 401 | env: processEnv, |
| 402 | containerName: containerName, |
| 403 | image: image, |
| 404 | network: d.cfg.Network, |
| 405 | memory: d.cfg.Memory, |
| 406 | cpus: d.cfg.CPUs, |
| 407 | user: user, |
| 408 | seccompProfile: d.cfg.SeccompProfile, |
| 409 | pidsLimit: d.cfg.PidsLimit, |
| 410 | }, nil |
| 411 | } |
| 412 | |
| 413 | func (d *Docker) logStep(ctx context.Context, msg string, job Job, step Step, invocation dockerInvocation, conclusion string) { |
| 414 | attrs := []any{ |
| 415 | "run_id", job.RunID, |
| 416 | "job_id", job.ID, |
| 417 | "step_id", step.ID, |
| 418 | "image", invocation.image, |
| 419 | "network", invocation.network, |
| 420 | "cpu_limit", invocation.cpus, |
| 421 | "memory_limit", invocation.memory, |
| 422 | "pids_limit", invocation.pidsLimit, |
| 423 | "container_user", invocation.user, |
| 424 | "seccomp_profile", invocation.seccompProfile, |
| 425 | } |
| 426 | if conclusion != "" { |
| 427 | attrs = append(attrs, "conclusion", conclusion) |
| 428 | } |
| 429 | d.cfg.Logger.InfoContext(ctx, msg, attrs...) |
| 430 | } |
| 431 | |
| 432 | func expressionContext(job Job) expr.Context { |
| 433 | event := job.EventPayload |
| 434 | if len(event) == 0 && strings.TrimSpace(job.Event) != "" && json.Valid([]byte(job.Event)) { |
| 435 | _ = json.Unmarshal([]byte(job.Event), &event) |
| 436 | } |
| 437 | return expr.Context{ |
| 438 | Secrets: job.Secrets, |
| 439 | Shithub: expr.ShithubContext{ |
| 440 | Event: event, |
| 441 | RunID: fmt.Sprintf("%d", job.RunID), |
| 442 | SHA: job.HeadSHA, |
| 443 | Ref: job.HeadRef, |
| 444 | }, |
| 445 | Untrusted: expr.DefaultUntrusted(), |
| 446 | } |
| 447 | } |
| 448 | |
| 449 | func permissionsRequestRoot(raw json.RawMessage) bool { |
| 450 | if len(raw) == 0 || !json.Valid(raw) { |
| 451 | return false |
| 452 | } |
| 453 | var shaped struct { |
| 454 | Per map[string]string `json:"per"` |
| 455 | } |
| 456 | if err := json.Unmarshal(raw, &shaped); err == nil && strings.EqualFold(shaped.Per[rootPermissionKey], "write") { |
| 457 | return true |
| 458 | } |
| 459 | var flat map[string]string |
| 460 | if err := json.Unmarshal(raw, &flat); err != nil { |
| 461 | return false |
| 462 | } |
| 463 | return strings.EqualFold(flat[rootPermissionKey], "write") |
| 464 | } |
| 465 | |
| 466 | var gitObjectIDRE = regexp.MustCompile(`^[0-9a-fA-F]{40,64}$`) |
| 467 | |
| 468 | func validateCheckoutURL(raw string) (string, error) { |
| 469 | raw = strings.TrimSpace(raw) |
| 470 | if raw == "" { |
| 471 | return "", errors.New("runner engine: checkout url is required") |
| 472 | } |
| 473 | u, err := url.Parse(raw) |
| 474 | if err != nil || u.Scheme == "" || u.Host == "" { |
| 475 | return "", fmt.Errorf("runner engine: invalid checkout url %q", raw) |
| 476 | } |
| 477 | if u.User != nil { |
| 478 | return "", errors.New("runner engine: checkout url must not contain credentials") |
| 479 | } |
| 480 | switch strings.ToLower(u.Scheme) { |
| 481 | case "http", "https": |
| 482 | default: |
| 483 | return "", fmt.Errorf("runner engine: checkout url scheme %q is not allowed", u.Scheme) |
| 484 | } |
| 485 | return u.String(), nil |
| 486 | } |
| 487 | |
| 488 | func checkoutDepthArgs(with map[string]string) ([]string, error) { |
| 489 | for key := range with { |
| 490 | if key != "fetch-depth" { |
| 491 | return nil, fmt.Errorf("runner engine: unsupported checkout input %q", key) |
| 492 | } |
| 493 | } |
| 494 | raw := strings.TrimSpace(with["fetch-depth"]) |
| 495 | if raw == "" { |
| 496 | return []string{"--depth=1"}, nil |
| 497 | } |
| 498 | depth, err := strconv.Atoi(raw) |
| 499 | if err != nil || depth < 0 || depth > 100000 { |
| 500 | return nil, fmt.Errorf("runner engine: invalid checkout fetch-depth %q", raw) |
| 501 | } |
| 502 | if depth == 0 { |
| 503 | return nil, nil |
| 504 | } |
| 505 | return []string{"--depth=" + strconv.Itoa(depth)}, nil |
| 506 | } |
| 507 | |
| 508 | func shortObjectID(oid string) string { |
| 509 | if len(oid) <= 12 { |
| 510 | return oid |
| 511 | } |
| 512 | return oid[:12] |
| 513 | } |
| 514 | |
| 515 | func redactCheckoutArgs(args []string) []string { |
| 516 | out := append([]string{}, args...) |
| 517 | for i, arg := range out { |
| 518 | if strings.Contains(arg, "SHITHUB_CHECKOUT_TOKEN") { |
| 519 | out[i] = "credential.helper=<redacted>" |
| 520 | } |
| 521 | } |
| 522 | return out |
| 523 | } |
| 524 | |
| 525 | func (d *Docker) StreamLogs(_ context.Context, jobID int64) (<-chan LogChunk, error) { |
| 526 | return d.ensureStream(jobID), nil |
| 527 | } |
| 528 | |
| 529 | func (d *Docker) StreamEvents(_ context.Context, jobID int64) (<-chan Event, error) { |
| 530 | return d.ensureEventStream(jobID), nil |
| 531 | } |
| 532 | |
| 533 | func (d *Docker) Cancel(ctx context.Context, jobID int64) error { |
| 534 | name := d.activeContainer(jobID) |
| 535 | if name == "" { |
| 536 | return nil |
| 537 | } |
| 538 | killCtx, cancel := context.WithTimeout(ctx, 5*time.Second) |
| 539 | defer cancel() |
| 540 | return d.killContainer(killCtx, name) |
| 541 | } |
| 542 | |
| 543 | func (d *Docker) killContainer(ctx context.Context, name string) error { |
| 544 | if err := d.cfg.Runner.Run(ctx, d.cfg.Binary, []string{"kill", name}, nil, d.cfg.Stdout, d.cfg.Stderr); err != nil { |
| 545 | return fmt.Errorf("runner engine: kill container %s: %w", name, err) |
| 546 | } |
| 547 | return nil |
| 548 | } |
| 549 | |
| 550 | func (d *Docker) setActiveContainer(jobID int64, name string) { |
| 551 | if name == "" { |
| 552 | return |
| 553 | } |
| 554 | d.mu.Lock() |
| 555 | d.active[jobID] = name |
| 556 | d.mu.Unlock() |
| 557 | } |
| 558 | |
| 559 | func (d *Docker) clearActiveContainer(jobID int64, name string) { |
| 560 | d.mu.Lock() |
| 561 | if d.active[jobID] == name { |
| 562 | delete(d.active, jobID) |
| 563 | } |
| 564 | d.mu.Unlock() |
| 565 | } |
| 566 | |
| 567 | func (d *Docker) activeContainer(jobID int64) string { |
| 568 | d.mu.Lock() |
| 569 | defer d.mu.Unlock() |
| 570 | return d.active[jobID] |
| 571 | } |
| 572 | |
| 573 | func (d *Docker) ensureStream(jobID int64) chan LogChunk { |
| 574 | d.mu.Lock() |
| 575 | defer d.mu.Unlock() |
| 576 | if ch, ok := d.streams[jobID]; ok { |
| 577 | return ch |
| 578 | } |
| 579 | ch := make(chan LogChunk, 128) |
| 580 | d.streams[jobID] = ch |
| 581 | return ch |
| 582 | } |
| 583 | |
| 584 | func (d *Docker) closeStream(jobID int64) { |
| 585 | d.mu.Lock() |
| 586 | ch, ok := d.streams[jobID] |
| 587 | if ok { |
| 588 | delete(d.streams, jobID) |
| 589 | } |
| 590 | d.mu.Unlock() |
| 591 | if ok { |
| 592 | close(ch) |
| 593 | } |
| 594 | } |
| 595 | |
| 596 | func (d *Docker) logStream(jobID int64) chan LogChunk { |
| 597 | d.mu.Lock() |
| 598 | defer d.mu.Unlock() |
| 599 | return d.streams[jobID] |
| 600 | } |
| 601 | |
| 602 | func (d *Docker) ensureEventStream(jobID int64) chan Event { |
| 603 | d.mu.Lock() |
| 604 | defer d.mu.Unlock() |
| 605 | if ch, ok := d.eventSubs[jobID]; ok { |
| 606 | return ch |
| 607 | } |
| 608 | ch := make(chan Event, 128) |
| 609 | d.eventSubs[jobID] = ch |
| 610 | return ch |
| 611 | } |
| 612 | |
| 613 | func (d *Docker) closeEventStream(jobID int64) { |
| 614 | d.mu.Lock() |
| 615 | ch, ok := d.eventSubs[jobID] |
| 616 | if ok { |
| 617 | delete(d.eventSubs, jobID) |
| 618 | } |
| 619 | d.mu.Unlock() |
| 620 | if ok { |
| 621 | close(ch) |
| 622 | } |
| 623 | } |
| 624 | |
| 625 | func (d *Docker) eventStream(jobID int64) chan Event { |
| 626 | d.mu.Lock() |
| 627 | defer d.mu.Unlock() |
| 628 | return d.eventSubs[jobID] |
| 629 | } |
| 630 | |
| 631 | func (d *Docker) emitStepOutcome(ctx context.Context, jobID int64, step StepOutcome) error { |
| 632 | ch := d.eventStream(jobID) |
| 633 | if ch == nil { |
| 634 | return nil |
| 635 | } |
| 636 | copied := step |
| 637 | select { |
| 638 | case <-ctx.Done(): |
| 639 | return ctx.Err() |
| 640 | case ch <- Event{Step: &copied}: |
| 641 | return nil |
| 642 | } |
| 643 | } |
| 644 | |
| 645 | func (d *Docker) emitStepOutcomeAfterRun(ctx context.Context, jobID int64, step StepOutcome) error { |
| 646 | if ctx.Err() == nil { |
| 647 | return d.emitStepOutcome(ctx, jobID, step) |
| 648 | } |
| 649 | emitCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) |
| 650 | defer cancel() |
| 651 | return d.emitStepOutcome(emitCtx, jobID, step) |
| 652 | } |
| 653 | |
| 654 | func (d *Docker) newStepLogWriter(ctx context.Context, jobID, stepID int64, jobMasks []string) *stepLogWriter { |
| 655 | w := &stepLogWriter{ |
| 656 | ctx: ctx, |
| 657 | ch: d.logStream(jobID), |
| 658 | events: d.eventStream(jobID), |
| 659 | jobID: jobID, |
| 660 | stepID: stepID, |
| 661 | maxChunk: d.cfg.LogChunkBytes, |
| 662 | interval: d.cfg.LogFlushInterval, |
| 663 | limit: d.cfg.StepLogLimit, |
| 664 | masker: scrub.New(append(append([]string{}, d.cfg.MaskValues...), jobMasks...)), |
| 665 | done: make(chan struct{}), |
| 666 | } |
| 667 | go w.flushLoop() |
| 668 | return w |
| 669 | } |
| 670 | |
| 671 | type stepLogWriter struct { |
| 672 | ctx context.Context |
| 673 | ch chan<- LogChunk |
| 674 | events chan<- Event |
| 675 | jobID int64 |
| 676 | stepID int64 |
| 677 | seq int32 |
| 678 | maxChunk int |
| 679 | interval time.Duration |
| 680 | limit int64 |
| 681 | written int64 |
| 682 | truncated bool |
| 683 | masker *scrub.Scrubber |
| 684 | buf []byte |
| 685 | done chan struct{} |
| 686 | once sync.Once |
| 687 | mu sync.Mutex |
| 688 | closed bool |
| 689 | } |
| 690 | |
| 691 | func (w *stepLogWriter) Write(p []byte) (int, error) { |
| 692 | w.mu.Lock() |
| 693 | defer w.mu.Unlock() |
| 694 | if w.closed { |
| 695 | return 0, io.ErrClosedPipe |
| 696 | } |
| 697 | w.appendWithinLimit(p) |
| 698 | for len(w.buf) >= w.maxChunk { |
| 699 | if err := w.emitLocked(w.buf[:w.maxChunk]); err != nil { |
| 700 | return 0, err |
| 701 | } |
| 702 | w.buf = w.buf[w.maxChunk:] |
| 703 | } |
| 704 | return len(p), nil |
| 705 | } |
| 706 | |
| 707 | func (w *stepLogWriter) Close() error { |
| 708 | w.once.Do(func() { |
| 709 | close(w.done) |
| 710 | w.mu.Lock() |
| 711 | defer w.mu.Unlock() |
| 712 | _ = w.flushLocked() |
| 713 | _ = w.flushMaskerLocked() |
| 714 | w.closed = true |
| 715 | }) |
| 716 | return nil |
| 717 | } |
| 718 | |
| 719 | func (w *stepLogWriter) appendWithinLimit(p []byte) { |
| 720 | if w.limit <= 0 { |
| 721 | w.buf = append(w.buf, p...) |
| 722 | return |
| 723 | } |
| 724 | remaining := w.limit - w.written |
| 725 | if remaining > 0 { |
| 726 | if int64(len(p)) <= remaining { |
| 727 | w.buf = append(w.buf, p...) |
| 728 | w.written += int64(len(p)) |
| 729 | return |
| 730 | } |
| 731 | w.buf = append(w.buf, p[:int(remaining)]...) |
| 732 | w.written += remaining |
| 733 | } |
| 734 | if !w.truncated { |
| 735 | w.buf = append(w.buf, []byte("\n[shithub-runner: step log truncated after 10 MiB]\n")...) |
| 736 | w.truncated = true |
| 737 | } |
| 738 | } |
| 739 | |
| 740 | func (w *stepLogWriter) flushLoop() { |
| 741 | ticker := time.NewTicker(w.interval) |
| 742 | defer ticker.Stop() |
| 743 | for { |
| 744 | select { |
| 745 | case <-w.done: |
| 746 | return |
| 747 | case <-w.ctx.Done(): |
| 748 | return |
| 749 | case <-ticker.C: |
| 750 | w.mu.Lock() |
| 751 | _ = w.flushLocked() |
| 752 | w.mu.Unlock() |
| 753 | } |
| 754 | } |
| 755 | } |
| 756 | |
| 757 | func (w *stepLogWriter) flushLocked() error { |
| 758 | if len(w.buf) == 0 { |
| 759 | return nil |
| 760 | } |
| 761 | if err := w.emitLocked(w.buf); err != nil { |
| 762 | return err |
| 763 | } |
| 764 | w.buf = nil |
| 765 | return nil |
| 766 | } |
| 767 | |
| 768 | func (w *stepLogWriter) emitLocked(chunk []byte) error { |
| 769 | if w.masker != nil { |
| 770 | chunk = w.masker.Scrub(chunk) |
| 771 | if len(chunk) == 0 { |
| 772 | return nil |
| 773 | } |
| 774 | } |
| 775 | return w.emitChunkLocked(chunk) |
| 776 | } |
| 777 | |
| 778 | func (w *stepLogWriter) flushMaskerLocked() error { |
| 779 | if w.masker == nil { |
| 780 | return nil |
| 781 | } |
| 782 | chunk := w.masker.Flush() |
| 783 | if len(chunk) == 0 { |
| 784 | return nil |
| 785 | } |
| 786 | return w.emitChunkLocked(chunk) |
| 787 | } |
| 788 | |
| 789 | func (w *stepLogWriter) emitChunkLocked(chunk []byte) error { |
| 790 | copied := LogChunk{JobID: w.jobID, StepID: w.stepID, Seq: w.seq, Chunk: append([]byte(nil), chunk...)} |
| 791 | if w.ch != nil { |
| 792 | select { |
| 793 | case <-w.ctx.Done(): |
| 794 | return w.ctx.Err() |
| 795 | case w.ch <- copied: |
| 796 | } |
| 797 | } |
| 798 | if w.events != nil { |
| 799 | eventChunk := copied |
| 800 | select { |
| 801 | case <-w.ctx.Done(): |
| 802 | return w.ctx.Err() |
| 803 | case w.events <- Event{Log: &eventChunk}: |
| 804 | } |
| 805 | } |
| 806 | w.seq++ |
| 807 | return nil |
| 808 | } |
| 809 | |
| 810 | func conclusionForError(err error) string { |
| 811 | if errors.Is(err, ErrJobTimedOut) { |
| 812 | return ConclusionTimedOut |
| 813 | } |
| 814 | if errors.Is(err, context.Canceled) { |
| 815 | return ConclusionCancelled |
| 816 | } |
| 817 | return ConclusionFailure |
| 818 | } |
| 819 | |
| 820 | func isJobTimeout(ctx context.Context, err error) bool { |
| 821 | if errors.Is(err, ErrJobTimedOut) { |
| 822 | return true |
| 823 | } |
| 824 | if !errors.Is(err, context.DeadlineExceeded) { |
| 825 | return false |
| 826 | } |
| 827 | return errors.Is(context.Cause(ctx), ErrJobTimedOut) |
| 828 | } |
| 829 | |
| 830 | func containerWorkdir(wd string) (string, error) { |
| 831 | wd = strings.TrimSpace(wd) |
| 832 | if wd == "" { |
| 833 | return "/workspace", nil |
| 834 | } |
| 835 | if strings.HasPrefix(wd, "/") { |
| 836 | return "", fmt.Errorf("runner engine: working-directory must be relative, got %q", wd) |
| 837 | } |
| 838 | clean := path.Clean("/workspace/" + wd) |
| 839 | if clean != "/workspace" && !strings.HasPrefix(clean, "/workspace/") { |
| 840 | return "", fmt.Errorf("runner engine: working-directory escapes workspace: %q", wd) |
| 841 | } |
| 842 | return clean, nil |
| 843 | } |
| 844 | |
| 845 | func dockerContainerName(job Job, step Step) string { |
| 846 | stepID := step.ID |
| 847 | if stepID == 0 { |
| 848 | stepID = int64(step.Index) |
| 849 | } |
| 850 | return fmt.Sprintf("shithub-job-%d-step-%d", job.ID, stepID) |
| 851 | } |
| 852 | |
| 853 | var envNameRE = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`) |
| 854 | |
| 855 | func validateEnv(env map[string]string) (map[string]string, error) { |
| 856 | out := make(map[string]string, len(env)) |
| 857 | for k, v := range env { |
| 858 | if !envNameRE.MatchString(k) { |
| 859 | return nil, fmt.Errorf("runner engine: invalid env name %q", k) |
| 860 | } |
| 861 | if strings.ContainsRune(v, '\x00') { |
| 862 | return nil, fmt.Errorf("runner engine: invalid env value for %q", k) |
| 863 | } |
| 864 | out[k] = v |
| 865 | } |
| 866 | return out, nil |
| 867 | } |
| 868 | |
| 869 | func sortedKeys(m map[string]string) []string { |
| 870 | keys := make([]string, 0, len(m)) |
| 871 | for k := range m { |
| 872 | keys = append(keys, k) |
| 873 | } |
| 874 | sort.Strings(keys) |
| 875 | return keys |
| 876 | } |
| 877 | |
| 878 | func stepLabel(step Step) string { |
| 879 | if step.Name != "" { |
| 880 | return step.Name |
| 881 | } |
| 882 | if step.StepID != "" { |
| 883 | return step.StepID |
| 884 | } |
| 885 | return fmt.Sprintf("#%d", step.Index) |
| 886 | } |
| 887 |