Go · 16798 bytes Raw Blame History
1 // Code generated by sqlc. DO NOT EDIT.
2 // versions:
3 // sqlc v1.31.1
4 // source: workflow_jobs.sql
5
6 package actionsdb
7
8 import (
9 "context"
10
11 "github.com/jackc/pgx/v5/pgtype"
12 )
13
14 const claimQueuedWorkflowJob = `-- name: ClaimQueuedWorkflowJob :one
15 WITH candidate AS (
16 SELECT j.id
17 FROM workflow_jobs j
18 JOIN workflow_runs r ON r.id = j.run_id
19 JOIN repos repo ON repo.id = r.repo_id
20 LEFT JOIN actions_site_policy sp ON sp.id = true
21 LEFT JOIN actions_org_policies op ON op.org_id = repo.owner_org_id
22 LEFT JOIN actions_repo_policies rp ON rp.repo_id = r.repo_id
23 WHERE j.status = 'queued'
24 AND r.status IN ('queued', 'running')
25 AND (r.need_approval = false OR r.approved_by_user_id IS NOT NULL)
26 AND j.cancel_requested = false
27 AND j.runner_id IS NULL
28 AND CASE
29 WHEN COALESCE(sp.actions_enabled, true) = false THEN false
30 WHEN COALESCE(rp.actions_enabled, 'inherit'::actions_policy_state) = 'enabled' THEN true
31 WHEN COALESCE(rp.actions_enabled, 'inherit'::actions_policy_state) = 'disabled' THEN false
32 WHEN COALESCE(op.actions_enabled, 'inherit'::actions_policy_state) = 'enabled' THEN true
33 WHEN COALESCE(op.actions_enabled, 'inherit'::actions_policy_state) = 'disabled' THEN false
34 ELSE COALESCE(sp.actions_enabled, true)
35 END
36 AND (
37 SELECT COUNT(*)::integer
38 FROM workflow_jobs running_job
39 JOIN workflow_runs running_run ON running_run.id = running_job.run_id
40 WHERE running_job.status = 'running'
41 AND running_run.repo_id = r.repo_id
42 ) < COALESCE(rp.max_repo_concurrent_jobs, op.max_repo_concurrent_jobs, sp.max_repo_concurrent_jobs, 20)
43 AND (
44 SELECT COUNT(*)::integer
45 FROM workflow_jobs running_job
46 JOIN workflow_runs running_run ON running_run.id = running_job.run_id
47 JOIN repos running_repo ON running_repo.id = running_run.repo_id
48 WHERE running_job.status = 'running'
49 AND (
50 (repo.owner_user_id IS NOT NULL AND running_repo.owner_user_id = repo.owner_user_id)
51 OR (repo.owner_org_id IS NOT NULL AND running_repo.owner_org_id = repo.owner_org_id)
52 )
53 ) < COALESCE(rp.max_owner_concurrent_jobs, op.max_owner_concurrent_jobs, sp.max_owner_concurrent_jobs, 100)
54 AND (j.runs_on = '' OR j.runs_on = ANY($1::text[]))
55 AND NOT EXISTS (
56 SELECT 1
57 FROM workflow_jobs dep
58 WHERE dep.run_id = j.run_id
59 AND dep.job_key = ANY(j.needs_jobs)
60 AND (dep.status <> 'completed' OR dep.conclusion <> 'success')
61 )
62 AND NOT EXISTS (
63 SELECT 1
64 FROM workflow_runs blocker
65 WHERE r.concurrency_group <> ''
66 AND blocker.repo_id = r.repo_id
67 AND blocker.concurrency_group = r.concurrency_group
68 AND blocker.id <> r.id
69 AND blocker.status IN ('queued', 'running')
70 AND (blocker.created_at, blocker.id) < (r.created_at, r.id)
71 AND EXISTS (
72 SELECT 1
73 FROM workflow_jobs blocker_job
74 WHERE blocker_job.run_id = blocker.id
75 AND blocker_job.status IN ('queued', 'running')
76 AND blocker_job.cancel_requested = false
77 )
78 )
79 ORDER BY j.created_at ASC, j.id ASC
80 FOR UPDATE OF j SKIP LOCKED
81 LIMIT 1
82 ),
83 claimed AS (
84 UPDATE workflow_jobs j
85 SET runner_id = $2::bigint,
86 status = 'running',
87 started_at = COALESCE(j.started_at, now()),
88 version = j.version + 1,
89 updated_at = now()
90 FROM candidate c
91 WHERE j.id = c.id
92 RETURNING j.id, j.run_id, j.job_index, j.job_key, j.job_name, j.runs_on,
93 j.runner_id, j.needs_jobs, j.if_expr, j.timeout_minutes,
94 j.permissions, j.job_env, j.status, j.conclusion,
95 j.cancel_requested, j.started_at, j.completed_at, j.version,
96 j.created_at, j.updated_at
97 )
98 SELECT c.id, c.run_id, c.job_index, c.job_key, c.job_name, c.runs_on,
99 c.runner_id, c.needs_jobs, c.if_expr, c.timeout_minutes,
100 c.permissions, c.job_env, c.status, c.conclusion,
101 c.cancel_requested, c.started_at, c.completed_at, c.version,
102 c.created_at, c.updated_at,
103 r.repo_id, r.run_index, r.workflow_file, r.workflow_name,
104 r.head_sha, r.head_ref, r.event, r.event_payload,
105 COALESCE(owner_user.username, owner_org.slug)::text AS repo_owner,
106 repo.name AS repo_name
107 FROM claimed c
108 JOIN workflow_runs r ON r.id = c.run_id
109 JOIN repos repo ON repo.id = r.repo_id
110 LEFT JOIN users owner_user ON owner_user.id = repo.owner_user_id
111 LEFT JOIN orgs owner_org ON owner_org.id = repo.owner_org_id
112 `
113
114 type ClaimQueuedWorkflowJobParams struct {
115 Labels []string
116 RunnerID int64
117 }
118
119 type ClaimQueuedWorkflowJobRow struct {
120 ID int64
121 RunID int64
122 JobIndex int32
123 JobKey string
124 JobName string
125 RunsOn string
126 RunnerID pgtype.Int8
127 NeedsJobs []string
128 IfExpr string
129 TimeoutMinutes int32
130 Permissions []byte
131 JobEnv []byte
132 Status WorkflowJobStatus
133 Conclusion NullCheckConclusion
134 CancelRequested bool
135 StartedAt pgtype.Timestamptz
136 CompletedAt pgtype.Timestamptz
137 Version int32
138 CreatedAt pgtype.Timestamptz
139 UpdatedAt pgtype.Timestamptz
140 RepoID int64
141 RunIndex int64
142 WorkflowFile string
143 WorkflowName string
144 HeadSha string
145 HeadRef string
146 Event WorkflowRunEvent
147 EventPayload []byte
148 RepoOwner string
149 RepoName string
150 }
151
152 func (q *Queries) ClaimQueuedWorkflowJob(ctx context.Context, db DBTX, arg ClaimQueuedWorkflowJobParams) (ClaimQueuedWorkflowJobRow, error) {
153 row := db.QueryRow(ctx, claimQueuedWorkflowJob, arg.Labels, arg.RunnerID)
154 var i ClaimQueuedWorkflowJobRow
155 err := row.Scan(
156 &i.ID,
157 &i.RunID,
158 &i.JobIndex,
159 &i.JobKey,
160 &i.JobName,
161 &i.RunsOn,
162 &i.RunnerID,
163 &i.NeedsJobs,
164 &i.IfExpr,
165 &i.TimeoutMinutes,
166 &i.Permissions,
167 &i.JobEnv,
168 &i.Status,
169 &i.Conclusion,
170 &i.CancelRequested,
171 &i.StartedAt,
172 &i.CompletedAt,
173 &i.Version,
174 &i.CreatedAt,
175 &i.UpdatedAt,
176 &i.RepoID,
177 &i.RunIndex,
178 &i.WorkflowFile,
179 &i.WorkflowName,
180 &i.HeadSha,
181 &i.HeadRef,
182 &i.Event,
183 &i.EventPayload,
184 &i.RepoOwner,
185 &i.RepoName,
186 )
187 return i, err
188 }
189
190 const countRunningJobsForRunner = `-- name: CountRunningJobsForRunner :one
191 SELECT COUNT(*)::integer
192 FROM workflow_jobs
193 WHERE runner_id = $1::bigint AND status = 'running'
194 `
195
196 func (q *Queries) CountRunningJobsForRunner(ctx context.Context, db DBTX, runnerID int64) (int32, error) {
197 row := db.QueryRow(ctx, countRunningJobsForRunner, runnerID)
198 var column_1 int32
199 err := row.Scan(&column_1)
200 return column_1, err
201 }
202
203 const getWorkflowJobByID = `-- name: GetWorkflowJobByID :one
204 SELECT id, run_id, job_index, job_key, job_name, runs_on,
205 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
206 job_env, status, conclusion, cancel_requested,
207 started_at, completed_at, version, created_at, updated_at
208 FROM workflow_jobs
209 WHERE id = $1
210 `
211
212 func (q *Queries) GetWorkflowJobByID(ctx context.Context, db DBTX, id int64) (WorkflowJob, error) {
213 row := db.QueryRow(ctx, getWorkflowJobByID, id)
214 var i WorkflowJob
215 err := row.Scan(
216 &i.ID,
217 &i.RunID,
218 &i.JobIndex,
219 &i.JobKey,
220 &i.JobName,
221 &i.RunsOn,
222 &i.RunnerID,
223 &i.NeedsJobs,
224 &i.IfExpr,
225 &i.TimeoutMinutes,
226 &i.Permissions,
227 &i.JobEnv,
228 &i.Status,
229 &i.Conclusion,
230 &i.CancelRequested,
231 &i.StartedAt,
232 &i.CompletedAt,
233 &i.Version,
234 &i.CreatedAt,
235 &i.UpdatedAt,
236 )
237 return i, err
238 }
239
240 const insertWorkflowJob = `-- name: InsertWorkflowJob :one
241
242 INSERT INTO workflow_jobs (
243 run_id, job_index, job_key, job_name,
244 runs_on, needs_jobs, if_expr, timeout_minutes,
245 permissions, job_env
246 ) VALUES (
247 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
248 )
249 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
250 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
251 job_env, status, conclusion, cancel_requested,
252 started_at, completed_at, version, created_at, updated_at
253 `
254
255 type InsertWorkflowJobParams struct {
256 RunID int64
257 JobIndex int32
258 JobKey string
259 JobName string
260 RunsOn string
261 NeedsJobs []string
262 IfExpr string
263 TimeoutMinutes int32
264 Permissions []byte
265 JobEnv []byte
266 }
267
268 // SPDX-License-Identifier: AGPL-3.0-or-later
269 func (q *Queries) InsertWorkflowJob(ctx context.Context, db DBTX, arg InsertWorkflowJobParams) (WorkflowJob, error) {
270 row := db.QueryRow(ctx, insertWorkflowJob,
271 arg.RunID,
272 arg.JobIndex,
273 arg.JobKey,
274 arg.JobName,
275 arg.RunsOn,
276 arg.NeedsJobs,
277 arg.IfExpr,
278 arg.TimeoutMinutes,
279 arg.Permissions,
280 arg.JobEnv,
281 )
282 var i WorkflowJob
283 err := row.Scan(
284 &i.ID,
285 &i.RunID,
286 &i.JobIndex,
287 &i.JobKey,
288 &i.JobName,
289 &i.RunsOn,
290 &i.RunnerID,
291 &i.NeedsJobs,
292 &i.IfExpr,
293 &i.TimeoutMinutes,
294 &i.Permissions,
295 &i.JobEnv,
296 &i.Status,
297 &i.Conclusion,
298 &i.CancelRequested,
299 &i.StartedAt,
300 &i.CompletedAt,
301 &i.Version,
302 &i.CreatedAt,
303 &i.UpdatedAt,
304 )
305 return i, err
306 }
307
308 const listJobsForRun = `-- name: ListJobsForRun :many
309 SELECT id, run_id, job_index, job_key, job_name, runs_on, status,
310 conclusion, cancel_requested, needs_jobs, started_at, completed_at, created_at, updated_at
311 FROM workflow_jobs
312 WHERE run_id = $1
313 ORDER BY job_index ASC
314 `
315
316 type ListJobsForRunRow struct {
317 ID int64
318 RunID int64
319 JobIndex int32
320 JobKey string
321 JobName string
322 RunsOn string
323 Status WorkflowJobStatus
324 Conclusion NullCheckConclusion
325 CancelRequested bool
326 NeedsJobs []string
327 StartedAt pgtype.Timestamptz
328 CompletedAt pgtype.Timestamptz
329 CreatedAt pgtype.Timestamptz
330 UpdatedAt pgtype.Timestamptz
331 }
332
333 func (q *Queries) ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]ListJobsForRunRow, error) {
334 rows, err := db.Query(ctx, listJobsForRun, runID)
335 if err != nil {
336 return nil, err
337 }
338 defer rows.Close()
339 items := []ListJobsForRunRow{}
340 for rows.Next() {
341 var i ListJobsForRunRow
342 if err := rows.Scan(
343 &i.ID,
344 &i.RunID,
345 &i.JobIndex,
346 &i.JobKey,
347 &i.JobName,
348 &i.RunsOn,
349 &i.Status,
350 &i.Conclusion,
351 &i.CancelRequested,
352 &i.NeedsJobs,
353 &i.StartedAt,
354 &i.CompletedAt,
355 &i.CreatedAt,
356 &i.UpdatedAt,
357 ); err != nil {
358 return nil, err
359 }
360 items = append(items, i)
361 }
362 if err := rows.Err(); err != nil {
363 return nil, err
364 }
365 return items, nil
366 }
367
368 const listQueuedWorkflowJobRunsOn = `-- name: ListQueuedWorkflowJobRunsOn :many
369 SELECT
370 COALESCE(NULLIF(j.runs_on, ''), '(none)')::text AS runs_on,
371 COUNT(*)::integer AS queued_jobs,
372 COUNT(DISTINCT wr.id)::integer AS matching_runner_count,
373 MIN(j.created_at)::timestamptz AS oldest_queued_at
374 FROM workflow_jobs j
375 LEFT JOIN workflow_runners wr
376 ON (j.runs_on = '' OR j.runs_on = ANY(wr.labels))
377 AND wr.status IN ('idle', 'busy')
378 AND wr.draining_at IS NULL
379 AND wr.revoked_at IS NULL
380 WHERE j.status = 'queued'
381 AND j.cancel_requested = false
382 AND j.runner_id IS NULL
383 GROUP BY COALESCE(NULLIF(j.runs_on, ''), '(none)')
384 ORDER BY queued_jobs DESC, runs_on ASC
385 `
386
387 type ListQueuedWorkflowJobRunsOnRow struct {
388 RunsOn string
389 QueuedJobs int32
390 MatchingRunnerCount int32
391 OldestQueuedAt pgtype.Timestamptz
392 }
393
394 func (q *Queries) ListQueuedWorkflowJobRunsOn(ctx context.Context, db DBTX) ([]ListQueuedWorkflowJobRunsOnRow, error) {
395 rows, err := db.Query(ctx, listQueuedWorkflowJobRunsOn)
396 if err != nil {
397 return nil, err
398 }
399 defer rows.Close()
400 items := []ListQueuedWorkflowJobRunsOnRow{}
401 for rows.Next() {
402 var i ListQueuedWorkflowJobRunsOnRow
403 if err := rows.Scan(
404 &i.RunsOn,
405 &i.QueuedJobs,
406 &i.MatchingRunnerCount,
407 &i.OldestQueuedAt,
408 ); err != nil {
409 return nil, err
410 }
411 items = append(items, i)
412 }
413 if err := rows.Err(); err != nil {
414 return nil, err
415 }
416 return items, nil
417 }
418
419 const requestWorkflowJobCancel = `-- name: RequestWorkflowJobCancel :one
420 UPDATE workflow_jobs
421 SET cancel_requested = true,
422 status = CASE
423 WHEN status = 'queued' THEN 'cancelled'::workflow_job_status
424 ELSE status
425 END,
426 conclusion = CASE
427 WHEN status = 'queued' THEN 'cancelled'::check_conclusion
428 ELSE conclusion
429 END,
430 started_at = CASE
431 WHEN status = 'queued' THEN COALESCE(started_at, now())
432 ELSE started_at
433 END,
434 completed_at = CASE
435 WHEN status = 'queued' THEN COALESCE(completed_at, now())
436 ELSE completed_at
437 END,
438 version = version + 1,
439 updated_at = now()
440 WHERE id = $1
441 AND status IN ('queued', 'running')
442 AND (status = 'queued' OR cancel_requested = false)
443 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
444 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
445 job_env, status, conclusion, cancel_requested,
446 started_at, completed_at, version, created_at, updated_at
447 `
448
449 func (q *Queries) RequestWorkflowJobCancel(ctx context.Context, db DBTX, id int64) (WorkflowJob, error) {
450 row := db.QueryRow(ctx, requestWorkflowJobCancel, id)
451 var i WorkflowJob
452 err := row.Scan(
453 &i.ID,
454 &i.RunID,
455 &i.JobIndex,
456 &i.JobKey,
457 &i.JobName,
458 &i.RunsOn,
459 &i.RunnerID,
460 &i.NeedsJobs,
461 &i.IfExpr,
462 &i.TimeoutMinutes,
463 &i.Permissions,
464 &i.JobEnv,
465 &i.Status,
466 &i.Conclusion,
467 &i.CancelRequested,
468 &i.StartedAt,
469 &i.CompletedAt,
470 &i.Version,
471 &i.CreatedAt,
472 &i.UpdatedAt,
473 )
474 return i, err
475 }
476
477 const requestWorkflowRunCancel = `-- name: RequestWorkflowRunCancel :many
478 UPDATE workflow_jobs
479 SET cancel_requested = true,
480 status = CASE
481 WHEN status = 'queued' THEN 'cancelled'::workflow_job_status
482 ELSE status
483 END,
484 conclusion = CASE
485 WHEN status = 'queued' THEN 'cancelled'::check_conclusion
486 ELSE conclusion
487 END,
488 started_at = CASE
489 WHEN status = 'queued' THEN COALESCE(started_at, now())
490 ELSE started_at
491 END,
492 completed_at = CASE
493 WHEN status = 'queued' THEN COALESCE(completed_at, now())
494 ELSE completed_at
495 END,
496 version = version + 1,
497 updated_at = now()
498 WHERE run_id = $1
499 AND status IN ('queued', 'running')
500 AND (status = 'queued' OR cancel_requested = false)
501 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
502 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
503 job_env, status, conclusion, cancel_requested,
504 started_at, completed_at, version, created_at, updated_at
505 `
506
507 func (q *Queries) RequestWorkflowRunCancel(ctx context.Context, db DBTX, runID int64) ([]WorkflowJob, error) {
508 rows, err := db.Query(ctx, requestWorkflowRunCancel, runID)
509 if err != nil {
510 return nil, err
511 }
512 defer rows.Close()
513 items := []WorkflowJob{}
514 for rows.Next() {
515 var i WorkflowJob
516 if err := rows.Scan(
517 &i.ID,
518 &i.RunID,
519 &i.JobIndex,
520 &i.JobKey,
521 &i.JobName,
522 &i.RunsOn,
523 &i.RunnerID,
524 &i.NeedsJobs,
525 &i.IfExpr,
526 &i.TimeoutMinutes,
527 &i.Permissions,
528 &i.JobEnv,
529 &i.Status,
530 &i.Conclusion,
531 &i.CancelRequested,
532 &i.StartedAt,
533 &i.CompletedAt,
534 &i.Version,
535 &i.CreatedAt,
536 &i.UpdatedAt,
537 ); err != nil {
538 return nil, err
539 }
540 items = append(items, i)
541 }
542 if err := rows.Err(); err != nil {
543 return nil, err
544 }
545 return items, nil
546 }
547
548 const updateWorkflowJobStatus = `-- name: UpdateWorkflowJobStatus :one
549 UPDATE workflow_jobs
550 SET status = $2,
551 conclusion = $3::check_conclusion,
552 started_at = $4::timestamptz,
553 completed_at = $5::timestamptz,
554 version = version + 1,
555 updated_at = now()
556 WHERE id = $1
557 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
558 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
559 job_env, status, conclusion, cancel_requested,
560 started_at, completed_at, version, created_at, updated_at
561 `
562
563 type UpdateWorkflowJobStatusParams struct {
564 ID int64
565 Status WorkflowJobStatus
566 Conclusion NullCheckConclusion
567 StartedAt pgtype.Timestamptz
568 CompletedAt pgtype.Timestamptz
569 }
570
571 func (q *Queries) UpdateWorkflowJobStatus(ctx context.Context, db DBTX, arg UpdateWorkflowJobStatusParams) (WorkflowJob, error) {
572 row := db.QueryRow(ctx, updateWorkflowJobStatus,
573 arg.ID,
574 arg.Status,
575 arg.Conclusion,
576 arg.StartedAt,
577 arg.CompletedAt,
578 )
579 var i WorkflowJob
580 err := row.Scan(
581 &i.ID,
582 &i.RunID,
583 &i.JobIndex,
584 &i.JobKey,
585 &i.JobName,
586 &i.RunsOn,
587 &i.RunnerID,
588 &i.NeedsJobs,
589 &i.IfExpr,
590 &i.TimeoutMinutes,
591 &i.Permissions,
592 &i.JobEnv,
593 &i.Status,
594 &i.Conclusion,
595 &i.CancelRequested,
596 &i.StartedAt,
597 &i.CompletedAt,
598 &i.Version,
599 &i.CreatedAt,
600 &i.UpdatedAt,
601 )
602 return i, err
603 }
604