| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | package engine |
| 4 | |
| 5 | import ( |
| 6 | "context" |
| 7 | "encoding/json" |
| 8 | "errors" |
| 9 | "fmt" |
| 10 | "io" |
| 11 | "log/slog" |
| 12 | "os" |
| 13 | "os/exec" |
| 14 | "path" |
| 15 | "regexp" |
| 16 | "sort" |
| 17 | "strconv" |
| 18 | "strings" |
| 19 | "sync" |
| 20 | "time" |
| 21 | |
| 22 | "github.com/tenseleyFlow/shithub/internal/actions/expr" |
| 23 | runnerexec "github.com/tenseleyFlow/shithub/internal/runner/exec" |
| 24 | "github.com/tenseleyFlow/shithub/internal/runner/scrub" |
| 25 | ) |
| 26 | |
| 27 | var ( |
| 28 | ErrUnsupportedUses = errors.New("runner engine: unsupported uses step") |
| 29 | ErrUnsupported = errors.New("runner engine: unsupported operation") |
| 30 | ) |
| 31 | |
| 32 | const ( |
| 33 | defaultSeccompProfile = "/etc/shithubd-runner/seccomp.json" |
| 34 | defaultContainerUser = "65534:65534" |
| 35 | defaultPidsLimit = 512 |
| 36 | defaultNofileLimit = "4096:4096" |
| 37 | defaultNprocLimit = "512:512" |
| 38 | |
| 39 | // rootPermissionKey is an intentionally shithub-specific escape hatch. |
| 40 | // It requires an explicit per-job permissions entry rather than treating |
| 41 | // broad write-all permissions as permission to run the container as root. |
| 42 | rootPermissionKey = "shithub-runner-root" |
| 43 | ) |
| 44 | |
| 45 | type CommandRunner interface { |
| 46 | Run(ctx context.Context, name string, args []string, env []string, stdout, stderr io.Writer) error |
| 47 | } |
| 48 | |
| 49 | type ExecRunner struct{} |
| 50 | |
| 51 | func (ExecRunner) Run(ctx context.Context, name string, args []string, env []string, stdout, stderr io.Writer) error { |
| 52 | cmd := exec.CommandContext(ctx, name, args...) |
| 53 | if len(env) > 0 { |
| 54 | cmd.Env = append(os.Environ(), env...) |
| 55 | } |
| 56 | cmd.Stdout = stdout |
| 57 | cmd.Stderr = stderr |
| 58 | return cmd.Run() |
| 59 | } |
| 60 | |
| 61 | type DockerConfig struct { |
| 62 | Binary string |
| 63 | DefaultImage string |
| 64 | Network string |
| 65 | Memory string |
| 66 | CPUs string |
| 67 | SeccompProfile string |
| 68 | User string |
| 69 | PidsLimit int |
| 70 | DNSServers []string |
| 71 | LogChunkBytes int |
| 72 | LogFlushInterval time.Duration |
| 73 | StepLogLimit int64 |
| 74 | TimeoutMinute time.Duration |
| 75 | Stdout io.Writer |
| 76 | Stderr io.Writer |
| 77 | Runner CommandRunner |
| 78 | MaskValues []string |
| 79 | AllowRoot bool |
| 80 | Logger *slog.Logger |
| 81 | } |
| 82 | |
| 83 | type Docker struct { |
| 84 | cfg DockerConfig |
| 85 | streams map[int64]chan LogChunk |
| 86 | eventSubs map[int64]chan Event |
| 87 | active map[int64]string |
| 88 | mu sync.Mutex |
| 89 | } |
| 90 | |
| 91 | func NewDocker(cfg DockerConfig) *Docker { |
| 92 | if cfg.Binary == "" { |
| 93 | cfg.Binary = "docker" |
| 94 | } |
| 95 | if cfg.LogChunkBytes <= 0 { |
| 96 | cfg.LogChunkBytes = 4 * 1024 |
| 97 | } |
| 98 | if cfg.LogFlushInterval <= 0 { |
| 99 | cfg.LogFlushInterval = time.Second |
| 100 | } |
| 101 | if cfg.StepLogLimit <= 0 { |
| 102 | cfg.StepLogLimit = 10 * 1024 * 1024 |
| 103 | } |
| 104 | if cfg.TimeoutMinute <= 0 { |
| 105 | cfg.TimeoutMinute = time.Minute |
| 106 | } |
| 107 | if cfg.Stdout == nil { |
| 108 | cfg.Stdout = io.Discard |
| 109 | } |
| 110 | if cfg.Stderr == nil { |
| 111 | cfg.Stderr = io.Discard |
| 112 | } |
| 113 | if cfg.Runner == nil { |
| 114 | cfg.Runner = ExecRunner{} |
| 115 | } |
| 116 | if cfg.SeccompProfile == "" { |
| 117 | cfg.SeccompProfile = defaultSeccompProfile |
| 118 | } |
| 119 | if cfg.User == "" { |
| 120 | cfg.User = defaultContainerUser |
| 121 | } |
| 122 | if cfg.PidsLimit <= 0 { |
| 123 | cfg.PidsLimit = defaultPidsLimit |
| 124 | } |
| 125 | if cfg.Logger == nil { |
| 126 | cfg.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) |
| 127 | } |
| 128 | return &Docker{ |
| 129 | cfg: cfg, |
| 130 | streams: make(map[int64]chan LogChunk), |
| 131 | eventSubs: make(map[int64]chan Event), |
| 132 | active: make(map[int64]string), |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | func (d *Docker) Execute(ctx context.Context, job Job) (Outcome, error) { |
| 137 | started := time.Now().UTC() |
| 138 | outcome := Outcome{Conclusion: ConclusionSuccess, StartedAt: started} |
| 139 | defer d.closeStream(job.ID) |
| 140 | defer d.closeEventStream(job.ID) |
| 141 | if job.TimeoutMinutes > 0 { |
| 142 | var cancel context.CancelFunc |
| 143 | ctx, cancel = context.WithTimeoutCause(ctx, time.Duration(job.TimeoutMinutes)*d.cfg.TimeoutMinute, ErrJobTimedOut) |
| 144 | defer cancel() |
| 145 | } |
| 146 | if err := os.MkdirAll(job.WorkspaceDir, 0o700); err != nil { |
| 147 | outcome.Conclusion = ConclusionFailure |
| 148 | outcome.CompletedAt = time.Now().UTC() |
| 149 | return outcome, fmt.Errorf("runner engine: prepare workspace: %w", err) |
| 150 | } |
| 151 | for _, step := range job.Steps { |
| 152 | stepStarted := time.Now().UTC() |
| 153 | if err := d.executeStep(ctx, job, step); err != nil { |
| 154 | stepCompleted := time.Now().UTC() |
| 155 | stepOutcome := StepOutcome{ |
| 156 | StepID: step.ID, |
| 157 | Status: "completed", |
| 158 | Conclusion: conclusionForError(err), |
| 159 | StartedAt: stepStarted, |
| 160 | CompletedAt: stepCompleted, |
| 161 | } |
| 162 | outcome.StepOutcomes = append(outcome.StepOutcomes, stepOutcome) |
| 163 | if emitErr := d.emitStepOutcomeAfterRun(ctx, job.ID, stepOutcome); emitErr != nil { |
| 164 | outcome.Conclusion = conclusionForError(emitErr) |
| 165 | outcome.CompletedAt = time.Now().UTC() |
| 166 | return outcome, emitErr |
| 167 | } |
| 168 | if step.ContinueOnError && !errors.Is(err, ErrJobTimedOut) { |
| 169 | continue |
| 170 | } |
| 171 | outcome.Conclusion = conclusionForError(err) |
| 172 | outcome.CompletedAt = stepCompleted |
| 173 | return outcome, err |
| 174 | } |
| 175 | stepOutcome := StepOutcome{ |
| 176 | StepID: step.ID, |
| 177 | Status: "completed", |
| 178 | Conclusion: ConclusionSuccess, |
| 179 | StartedAt: stepStarted, |
| 180 | CompletedAt: time.Now().UTC(), |
| 181 | } |
| 182 | outcome.StepOutcomes = append(outcome.StepOutcomes, stepOutcome) |
| 183 | if err := d.emitStepOutcomeAfterRun(ctx, job.ID, stepOutcome); err != nil { |
| 184 | outcome.Conclusion = conclusionForError(err) |
| 185 | outcome.CompletedAt = time.Now().UTC() |
| 186 | return outcome, err |
| 187 | } |
| 188 | } |
| 189 | outcome.CompletedAt = time.Now().UTC() |
| 190 | return outcome, nil |
| 191 | } |
| 192 | |
| 193 | func (d *Docker) executeStep(ctx context.Context, job Job, step Step) error { |
| 194 | if strings.TrimSpace(step.Uses) != "" { |
| 195 | return fmt.Errorf("%w: %s is not executable until checkout/artifact support lands", ErrUnsupportedUses, step.Uses) |
| 196 | } |
| 197 | if strings.TrimSpace(step.Run) == "" { |
| 198 | return nil |
| 199 | } |
| 200 | invocation, err := d.dockerInvocation(job, step) |
| 201 | if err != nil { |
| 202 | return err |
| 203 | } |
| 204 | d.setActiveContainer(job.ID, invocation.containerName) |
| 205 | defer d.clearActiveContainer(job.ID, invocation.containerName) |
| 206 | d.logStep(ctx, "runner step starting", job, step, invocation, "") |
| 207 | writer := d.newStepLogWriter(ctx, job.ID, step.ID, job.MaskValues) |
| 208 | out := io.MultiWriter(d.cfg.Stdout, writer) |
| 209 | errOut := io.MultiWriter(d.cfg.Stderr, writer) |
| 210 | if err := d.cfg.Runner.Run(ctx, d.cfg.Binary, invocation.args, invocation.env, out, errOut); err != nil { |
| 211 | if isJobTimeout(ctx, err) { |
| 212 | killCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) |
| 213 | killErr := d.killContainer(killCtx, invocation.containerName) |
| 214 | cancel() |
| 215 | if killErr != nil { |
| 216 | err = errors.Join(err, killErr) |
| 217 | } |
| 218 | err = fmt.Errorf("%w: %w", ErrJobTimedOut, err) |
| 219 | } |
| 220 | d.logStep(ctx, "runner step completed", job, step, invocation, conclusionForError(err)) |
| 221 | if closeErr := writer.Close(); closeErr != nil { |
| 222 | return fmt.Errorf("runner engine: step %q failed: %w", stepLabel(step), errors.Join(err, closeErr)) |
| 223 | } |
| 224 | return fmt.Errorf("runner engine: step %q failed: %w", stepLabel(step), err) |
| 225 | } |
| 226 | d.logStep(ctx, "runner step completed", job, step, invocation, ConclusionSuccess) |
| 227 | if err := writer.Close(); err != nil { |
| 228 | return fmt.Errorf("runner engine: flush step %q logs: %w", stepLabel(step), err) |
| 229 | } |
| 230 | return nil |
| 231 | } |
| 232 | |
| 233 | type dockerInvocation struct { |
| 234 | args []string |
| 235 | env []string |
| 236 | containerName string |
| 237 | image string |
| 238 | network string |
| 239 | memory string |
| 240 | cpus string |
| 241 | user string |
| 242 | seccompProfile string |
| 243 | pidsLimit int |
| 244 | } |
| 245 | |
| 246 | func (d *Docker) dockerInvocation(job Job, step Step) (dockerInvocation, error) { |
| 247 | workdir, err := containerWorkdir(step.WorkingDirectory) |
| 248 | if err != nil { |
| 249 | return dockerInvocation{}, err |
| 250 | } |
| 251 | image := strings.TrimSpace(job.Image) |
| 252 | if image == "" { |
| 253 | image = d.cfg.DefaultImage |
| 254 | } |
| 255 | if image == "" { |
| 256 | return dockerInvocation{}, errors.New("runner engine: image is required") |
| 257 | } |
| 258 | rendered, err := runnerexec.RenderStep(runnerexec.StepInput{ |
| 259 | Run: step.Run, |
| 260 | JobEnv: job.Env, |
| 261 | StepEnv: step.Env, |
| 262 | Context: expressionContext(job), |
| 263 | }) |
| 264 | if err != nil { |
| 265 | return dockerInvocation{}, fmt.Errorf("runner engine: render step %q: %w", stepLabel(step), err) |
| 266 | } |
| 267 | user := d.cfg.User |
| 268 | if d.cfg.AllowRoot && permissionsRequestRoot(job.Permissions) { |
| 269 | user = "0:0" |
| 270 | } |
| 271 | containerName := dockerContainerName(job, step) |
| 272 | args := []string{ |
| 273 | "run", |
| 274 | "--rm", |
| 275 | "--name", containerName, |
| 276 | "--network=" + d.cfg.Network, |
| 277 | "--memory=" + d.cfg.Memory, |
| 278 | "--cpus=" + d.cfg.CPUs, |
| 279 | "--pids-limit=" + strconv.Itoa(d.cfg.PidsLimit), |
| 280 | "--read-only", |
| 281 | "--tmpfs", "/tmp:rw,exec,nosuid,nodev,size=1g", |
| 282 | "--cap-drop=ALL", |
| 283 | "--cap-add=DAC_OVERRIDE", |
| 284 | "--cap-add=SETGID", |
| 285 | "--cap-add=SETUID", |
| 286 | "--security-opt=no-new-privileges", |
| 287 | "--security-opt=seccomp=" + d.cfg.SeccompProfile, |
| 288 | "--ulimit", "nofile=" + defaultNofileLimit, |
| 289 | "--ulimit", "nproc=" + defaultNprocLimit, |
| 290 | "--user", user, |
| 291 | "--workdir=" + workdir, |
| 292 | "--mount", "type=bind,src=" + job.WorkspaceDir + ",dst=/workspace,rw", |
| 293 | } |
| 294 | for _, dns := range d.cfg.DNSServers { |
| 295 | dns = strings.TrimSpace(dns) |
| 296 | if dns != "" { |
| 297 | args = append(args, "--dns", dns) |
| 298 | } |
| 299 | } |
| 300 | env, err := validateEnv(rendered.Env) |
| 301 | if err != nil { |
| 302 | return dockerInvocation{}, err |
| 303 | } |
| 304 | processEnv := make([]string, 0, len(env)) |
| 305 | for _, key := range sortedKeys(env) { |
| 306 | args = append(args, "--env", key) |
| 307 | processEnv = append(processEnv, key+"="+env[key]) |
| 308 | } |
| 309 | args = append(args, image, "bash", "-c", rendered.Run) |
| 310 | return dockerInvocation{ |
| 311 | args: args, |
| 312 | env: processEnv, |
| 313 | containerName: containerName, |
| 314 | image: image, |
| 315 | network: d.cfg.Network, |
| 316 | memory: d.cfg.Memory, |
| 317 | cpus: d.cfg.CPUs, |
| 318 | user: user, |
| 319 | seccompProfile: d.cfg.SeccompProfile, |
| 320 | pidsLimit: d.cfg.PidsLimit, |
| 321 | }, nil |
| 322 | } |
| 323 | |
| 324 | func (d *Docker) logStep(ctx context.Context, msg string, job Job, step Step, invocation dockerInvocation, conclusion string) { |
| 325 | attrs := []any{ |
| 326 | "run_id", job.RunID, |
| 327 | "job_id", job.ID, |
| 328 | "step_id", step.ID, |
| 329 | "image", invocation.image, |
| 330 | "network", invocation.network, |
| 331 | "cpu_limit", invocation.cpus, |
| 332 | "memory_limit", invocation.memory, |
| 333 | "pids_limit", invocation.pidsLimit, |
| 334 | "container_user", invocation.user, |
| 335 | "seccomp_profile", invocation.seccompProfile, |
| 336 | } |
| 337 | if conclusion != "" { |
| 338 | attrs = append(attrs, "conclusion", conclusion) |
| 339 | } |
| 340 | d.cfg.Logger.InfoContext(ctx, msg, attrs...) |
| 341 | } |
| 342 | |
| 343 | func expressionContext(job Job) expr.Context { |
| 344 | event := job.EventPayload |
| 345 | if len(event) == 0 && strings.TrimSpace(job.Event) != "" && json.Valid([]byte(job.Event)) { |
| 346 | _ = json.Unmarshal([]byte(job.Event), &event) |
| 347 | } |
| 348 | return expr.Context{ |
| 349 | Secrets: job.Secrets, |
| 350 | Shithub: expr.ShithubContext{ |
| 351 | Event: event, |
| 352 | RunID: fmt.Sprintf("%d", job.RunID), |
| 353 | SHA: job.HeadSHA, |
| 354 | Ref: job.HeadRef, |
| 355 | }, |
| 356 | Untrusted: expr.DefaultUntrusted(), |
| 357 | } |
| 358 | } |
| 359 | |
| 360 | func permissionsRequestRoot(raw json.RawMessage) bool { |
| 361 | if len(raw) == 0 || !json.Valid(raw) { |
| 362 | return false |
| 363 | } |
| 364 | var shaped struct { |
| 365 | Per map[string]string `json:"per"` |
| 366 | } |
| 367 | if err := json.Unmarshal(raw, &shaped); err == nil && strings.EqualFold(shaped.Per[rootPermissionKey], "write") { |
| 368 | return true |
| 369 | } |
| 370 | var flat map[string]string |
| 371 | if err := json.Unmarshal(raw, &flat); err != nil { |
| 372 | return false |
| 373 | } |
| 374 | return strings.EqualFold(flat[rootPermissionKey], "write") |
| 375 | } |
| 376 | |
| 377 | func (d *Docker) StreamLogs(_ context.Context, jobID int64) (<-chan LogChunk, error) { |
| 378 | return d.ensureStream(jobID), nil |
| 379 | } |
| 380 | |
| 381 | func (d *Docker) StreamEvents(_ context.Context, jobID int64) (<-chan Event, error) { |
| 382 | return d.ensureEventStream(jobID), nil |
| 383 | } |
| 384 | |
| 385 | func (d *Docker) Cancel(ctx context.Context, jobID int64) error { |
| 386 | name := d.activeContainer(jobID) |
| 387 | if name == "" { |
| 388 | return nil |
| 389 | } |
| 390 | killCtx, cancel := context.WithTimeout(ctx, 5*time.Second) |
| 391 | defer cancel() |
| 392 | return d.killContainer(killCtx, name) |
| 393 | } |
| 394 | |
| 395 | func (d *Docker) killContainer(ctx context.Context, name string) error { |
| 396 | if err := d.cfg.Runner.Run(ctx, d.cfg.Binary, []string{"kill", name}, nil, d.cfg.Stdout, d.cfg.Stderr); err != nil { |
| 397 | return fmt.Errorf("runner engine: kill container %s: %w", name, err) |
| 398 | } |
| 399 | return nil |
| 400 | } |
| 401 | |
| 402 | func (d *Docker) setActiveContainer(jobID int64, name string) { |
| 403 | if name == "" { |
| 404 | return |
| 405 | } |
| 406 | d.mu.Lock() |
| 407 | d.active[jobID] = name |
| 408 | d.mu.Unlock() |
| 409 | } |
| 410 | |
| 411 | func (d *Docker) clearActiveContainer(jobID int64, name string) { |
| 412 | d.mu.Lock() |
| 413 | if d.active[jobID] == name { |
| 414 | delete(d.active, jobID) |
| 415 | } |
| 416 | d.mu.Unlock() |
| 417 | } |
| 418 | |
| 419 | func (d *Docker) activeContainer(jobID int64) string { |
| 420 | d.mu.Lock() |
| 421 | defer d.mu.Unlock() |
| 422 | return d.active[jobID] |
| 423 | } |
| 424 | |
| 425 | func (d *Docker) ensureStream(jobID int64) chan LogChunk { |
| 426 | d.mu.Lock() |
| 427 | defer d.mu.Unlock() |
| 428 | if ch, ok := d.streams[jobID]; ok { |
| 429 | return ch |
| 430 | } |
| 431 | ch := make(chan LogChunk, 128) |
| 432 | d.streams[jobID] = ch |
| 433 | return ch |
| 434 | } |
| 435 | |
| 436 | func (d *Docker) closeStream(jobID int64) { |
| 437 | d.mu.Lock() |
| 438 | ch, ok := d.streams[jobID] |
| 439 | if ok { |
| 440 | delete(d.streams, jobID) |
| 441 | } |
| 442 | d.mu.Unlock() |
| 443 | if ok { |
| 444 | close(ch) |
| 445 | } |
| 446 | } |
| 447 | |
| 448 | func (d *Docker) logStream(jobID int64) chan LogChunk { |
| 449 | d.mu.Lock() |
| 450 | defer d.mu.Unlock() |
| 451 | return d.streams[jobID] |
| 452 | } |
| 453 | |
| 454 | func (d *Docker) ensureEventStream(jobID int64) chan Event { |
| 455 | d.mu.Lock() |
| 456 | defer d.mu.Unlock() |
| 457 | if ch, ok := d.eventSubs[jobID]; ok { |
| 458 | return ch |
| 459 | } |
| 460 | ch := make(chan Event, 128) |
| 461 | d.eventSubs[jobID] = ch |
| 462 | return ch |
| 463 | } |
| 464 | |
| 465 | func (d *Docker) closeEventStream(jobID int64) { |
| 466 | d.mu.Lock() |
| 467 | ch, ok := d.eventSubs[jobID] |
| 468 | if ok { |
| 469 | delete(d.eventSubs, jobID) |
| 470 | } |
| 471 | d.mu.Unlock() |
| 472 | if ok { |
| 473 | close(ch) |
| 474 | } |
| 475 | } |
| 476 | |
| 477 | func (d *Docker) eventStream(jobID int64) chan Event { |
| 478 | d.mu.Lock() |
| 479 | defer d.mu.Unlock() |
| 480 | return d.eventSubs[jobID] |
| 481 | } |
| 482 | |
| 483 | func (d *Docker) emitStepOutcome(ctx context.Context, jobID int64, step StepOutcome) error { |
| 484 | ch := d.eventStream(jobID) |
| 485 | if ch == nil { |
| 486 | return nil |
| 487 | } |
| 488 | copied := step |
| 489 | select { |
| 490 | case <-ctx.Done(): |
| 491 | return ctx.Err() |
| 492 | case ch <- Event{Step: &copied}: |
| 493 | return nil |
| 494 | } |
| 495 | } |
| 496 | |
| 497 | func (d *Docker) emitStepOutcomeAfterRun(ctx context.Context, jobID int64, step StepOutcome) error { |
| 498 | if ctx.Err() == nil { |
| 499 | return d.emitStepOutcome(ctx, jobID, step) |
| 500 | } |
| 501 | emitCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) |
| 502 | defer cancel() |
| 503 | return d.emitStepOutcome(emitCtx, jobID, step) |
| 504 | } |
| 505 | |
| 506 | func (d *Docker) newStepLogWriter(ctx context.Context, jobID, stepID int64, jobMasks []string) *stepLogWriter { |
| 507 | w := &stepLogWriter{ |
| 508 | ctx: ctx, |
| 509 | ch: d.logStream(jobID), |
| 510 | events: d.eventStream(jobID), |
| 511 | jobID: jobID, |
| 512 | stepID: stepID, |
| 513 | maxChunk: d.cfg.LogChunkBytes, |
| 514 | interval: d.cfg.LogFlushInterval, |
| 515 | limit: d.cfg.StepLogLimit, |
| 516 | masker: scrub.New(append(append([]string{}, d.cfg.MaskValues...), jobMasks...)), |
| 517 | done: make(chan struct{}), |
| 518 | } |
| 519 | go w.flushLoop() |
| 520 | return w |
| 521 | } |
| 522 | |
| 523 | type stepLogWriter struct { |
| 524 | ctx context.Context |
| 525 | ch chan<- LogChunk |
| 526 | events chan<- Event |
| 527 | jobID int64 |
| 528 | stepID int64 |
| 529 | seq int32 |
| 530 | maxChunk int |
| 531 | interval time.Duration |
| 532 | limit int64 |
| 533 | written int64 |
| 534 | truncated bool |
| 535 | masker *scrub.Scrubber |
| 536 | buf []byte |
| 537 | done chan struct{} |
| 538 | once sync.Once |
| 539 | mu sync.Mutex |
| 540 | closed bool |
| 541 | } |
| 542 | |
| 543 | func (w *stepLogWriter) Write(p []byte) (int, error) { |
| 544 | w.mu.Lock() |
| 545 | defer w.mu.Unlock() |
| 546 | if w.closed { |
| 547 | return 0, io.ErrClosedPipe |
| 548 | } |
| 549 | w.appendWithinLimit(p) |
| 550 | for len(w.buf) >= w.maxChunk { |
| 551 | if err := w.emitLocked(w.buf[:w.maxChunk]); err != nil { |
| 552 | return 0, err |
| 553 | } |
| 554 | w.buf = w.buf[w.maxChunk:] |
| 555 | } |
| 556 | return len(p), nil |
| 557 | } |
| 558 | |
| 559 | func (w *stepLogWriter) Close() error { |
| 560 | w.once.Do(func() { |
| 561 | close(w.done) |
| 562 | w.mu.Lock() |
| 563 | defer w.mu.Unlock() |
| 564 | _ = w.flushLocked() |
| 565 | _ = w.flushMaskerLocked() |
| 566 | w.closed = true |
| 567 | }) |
| 568 | return nil |
| 569 | } |
| 570 | |
| 571 | func (w *stepLogWriter) appendWithinLimit(p []byte) { |
| 572 | if w.limit <= 0 { |
| 573 | w.buf = append(w.buf, p...) |
| 574 | return |
| 575 | } |
| 576 | remaining := w.limit - w.written |
| 577 | if remaining > 0 { |
| 578 | if int64(len(p)) <= remaining { |
| 579 | w.buf = append(w.buf, p...) |
| 580 | w.written += int64(len(p)) |
| 581 | return |
| 582 | } |
| 583 | w.buf = append(w.buf, p[:int(remaining)]...) |
| 584 | w.written += remaining |
| 585 | } |
| 586 | if !w.truncated { |
| 587 | w.buf = append(w.buf, []byte("\n[shithub-runner: step log truncated after 10 MiB]\n")...) |
| 588 | w.truncated = true |
| 589 | } |
| 590 | } |
| 591 | |
| 592 | func (w *stepLogWriter) flushLoop() { |
| 593 | ticker := time.NewTicker(w.interval) |
| 594 | defer ticker.Stop() |
| 595 | for { |
| 596 | select { |
| 597 | case <-w.done: |
| 598 | return |
| 599 | case <-w.ctx.Done(): |
| 600 | return |
| 601 | case <-ticker.C: |
| 602 | w.mu.Lock() |
| 603 | _ = w.flushLocked() |
| 604 | w.mu.Unlock() |
| 605 | } |
| 606 | } |
| 607 | } |
| 608 | |
| 609 | func (w *stepLogWriter) flushLocked() error { |
| 610 | if len(w.buf) == 0 { |
| 611 | return nil |
| 612 | } |
| 613 | if err := w.emitLocked(w.buf); err != nil { |
| 614 | return err |
| 615 | } |
| 616 | w.buf = nil |
| 617 | return nil |
| 618 | } |
| 619 | |
| 620 | func (w *stepLogWriter) emitLocked(chunk []byte) error { |
| 621 | if w.masker != nil { |
| 622 | chunk = w.masker.Scrub(chunk) |
| 623 | if len(chunk) == 0 { |
| 624 | return nil |
| 625 | } |
| 626 | } |
| 627 | return w.emitChunkLocked(chunk) |
| 628 | } |
| 629 | |
| 630 | func (w *stepLogWriter) flushMaskerLocked() error { |
| 631 | if w.masker == nil { |
| 632 | return nil |
| 633 | } |
| 634 | chunk := w.masker.Flush() |
| 635 | if len(chunk) == 0 { |
| 636 | return nil |
| 637 | } |
| 638 | return w.emitChunkLocked(chunk) |
| 639 | } |
| 640 | |
| 641 | func (w *stepLogWriter) emitChunkLocked(chunk []byte) error { |
| 642 | copied := LogChunk{JobID: w.jobID, StepID: w.stepID, Seq: w.seq, Chunk: append([]byte(nil), chunk...)} |
| 643 | if w.ch != nil { |
| 644 | select { |
| 645 | case <-w.ctx.Done(): |
| 646 | return w.ctx.Err() |
| 647 | case w.ch <- copied: |
| 648 | } |
| 649 | } |
| 650 | if w.events != nil { |
| 651 | eventChunk := copied |
| 652 | select { |
| 653 | case <-w.ctx.Done(): |
| 654 | return w.ctx.Err() |
| 655 | case w.events <- Event{Log: &eventChunk}: |
| 656 | } |
| 657 | } |
| 658 | w.seq++ |
| 659 | return nil |
| 660 | } |
| 661 | |
| 662 | func conclusionForError(err error) string { |
| 663 | if errors.Is(err, ErrJobTimedOut) { |
| 664 | return ConclusionTimedOut |
| 665 | } |
| 666 | if errors.Is(err, context.Canceled) { |
| 667 | return ConclusionCancelled |
| 668 | } |
| 669 | return ConclusionFailure |
| 670 | } |
| 671 | |
| 672 | func isJobTimeout(ctx context.Context, err error) bool { |
| 673 | if errors.Is(err, ErrJobTimedOut) { |
| 674 | return true |
| 675 | } |
| 676 | if !errors.Is(err, context.DeadlineExceeded) { |
| 677 | return false |
| 678 | } |
| 679 | return errors.Is(context.Cause(ctx), ErrJobTimedOut) |
| 680 | } |
| 681 | |
| 682 | func containerWorkdir(wd string) (string, error) { |
| 683 | wd = strings.TrimSpace(wd) |
| 684 | if wd == "" { |
| 685 | return "/workspace", nil |
| 686 | } |
| 687 | if strings.HasPrefix(wd, "/") { |
| 688 | return "", fmt.Errorf("runner engine: working-directory must be relative, got %q", wd) |
| 689 | } |
| 690 | clean := path.Clean("/workspace/" + wd) |
| 691 | if clean != "/workspace" && !strings.HasPrefix(clean, "/workspace/") { |
| 692 | return "", fmt.Errorf("runner engine: working-directory escapes workspace: %q", wd) |
| 693 | } |
| 694 | return clean, nil |
| 695 | } |
| 696 | |
| 697 | func dockerContainerName(job Job, step Step) string { |
| 698 | stepID := step.ID |
| 699 | if stepID == 0 { |
| 700 | stepID = int64(step.Index) |
| 701 | } |
| 702 | return fmt.Sprintf("shithub-job-%d-step-%d", job.ID, stepID) |
| 703 | } |
| 704 | |
| 705 | var envNameRE = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`) |
| 706 | |
| 707 | func validateEnv(env map[string]string) (map[string]string, error) { |
| 708 | out := make(map[string]string, len(env)) |
| 709 | for k, v := range env { |
| 710 | if !envNameRE.MatchString(k) { |
| 711 | return nil, fmt.Errorf("runner engine: invalid env name %q", k) |
| 712 | } |
| 713 | if strings.ContainsRune(v, '\x00') { |
| 714 | return nil, fmt.Errorf("runner engine: invalid env value for %q", k) |
| 715 | } |
| 716 | out[k] = v |
| 717 | } |
| 718 | return out, nil |
| 719 | } |
| 720 | |
| 721 | func sortedKeys(m map[string]string) []string { |
| 722 | keys := make([]string, 0, len(m)) |
| 723 | for k := range m { |
| 724 | keys = append(keys, k) |
| 725 | } |
| 726 | sort.Strings(keys) |
| 727 | return keys |
| 728 | } |
| 729 | |
| 730 | func stepLabel(step Step) string { |
| 731 | if step.Name != "" { |
| 732 | return step.Name |
| 733 | } |
| 734 | if step.StepID != "" { |
| 735 | return step.StepID |
| 736 | } |
| 737 | return fmt.Sprintf("#%d", step.Index) |
| 738 | } |
| 739 |