Go · 14883 bytes Raw Blame History
1 // Code generated by sqlc. DO NOT EDIT.
2 // versions:
3 // sqlc v1.31.1
4 // source: workflow_jobs.sql
5
6 package actionsdb
7
8 import (
9 "context"
10
11 "github.com/jackc/pgx/v5/pgtype"
12 )
13
14 const claimQueuedWorkflowJob = `-- name: ClaimQueuedWorkflowJob :one
15 WITH candidate AS (
16 SELECT j.id
17 FROM workflow_jobs j
18 JOIN workflow_runs r ON r.id = j.run_id
19 WHERE j.status = 'queued'
20 AND r.status IN ('queued', 'running')
21 AND j.cancel_requested = false
22 AND j.runner_id IS NULL
23 AND (j.runs_on = '' OR j.runs_on = ANY($1::text[]))
24 AND NOT EXISTS (
25 SELECT 1
26 FROM workflow_jobs dep
27 WHERE dep.run_id = j.run_id
28 AND dep.job_key = ANY(j.needs_jobs)
29 AND (dep.status <> 'completed' OR dep.conclusion <> 'success')
30 )
31 AND NOT EXISTS (
32 SELECT 1
33 FROM workflow_runs blocker
34 WHERE r.concurrency_group <> ''
35 AND blocker.repo_id = r.repo_id
36 AND blocker.concurrency_group = r.concurrency_group
37 AND blocker.id <> r.id
38 AND blocker.status IN ('queued', 'running')
39 AND (blocker.created_at, blocker.id) < (r.created_at, r.id)
40 AND EXISTS (
41 SELECT 1
42 FROM workflow_jobs blocker_job
43 WHERE blocker_job.run_id = blocker.id
44 AND blocker_job.status IN ('queued', 'running')
45 AND blocker_job.cancel_requested = false
46 )
47 )
48 ORDER BY j.created_at ASC, j.id ASC
49 FOR UPDATE OF j SKIP LOCKED
50 LIMIT 1
51 ),
52 claimed AS (
53 UPDATE workflow_jobs j
54 SET runner_id = $2::bigint,
55 status = 'running',
56 started_at = COALESCE(j.started_at, now()),
57 version = j.version + 1,
58 updated_at = now()
59 FROM candidate c
60 WHERE j.id = c.id
61 RETURNING j.id, j.run_id, j.job_index, j.job_key, j.job_name, j.runs_on,
62 j.runner_id, j.needs_jobs, j.if_expr, j.timeout_minutes,
63 j.permissions, j.job_env, j.status, j.conclusion,
64 j.cancel_requested, j.started_at, j.completed_at, j.version,
65 j.created_at, j.updated_at
66 )
67 SELECT c.id, c.run_id, c.job_index, c.job_key, c.job_name, c.runs_on,
68 c.runner_id, c.needs_jobs, c.if_expr, c.timeout_minutes,
69 c.permissions, c.job_env, c.status, c.conclusion,
70 c.cancel_requested, c.started_at, c.completed_at, c.version,
71 c.created_at, c.updated_at,
72 r.repo_id, r.run_index, r.workflow_file, r.workflow_name,
73 r.head_sha, r.head_ref, r.event, r.event_payload,
74 COALESCE(owner_user.username, owner_org.slug)::text AS repo_owner,
75 repo.name AS repo_name
76 FROM claimed c
77 JOIN workflow_runs r ON r.id = c.run_id
78 JOIN repos repo ON repo.id = r.repo_id
79 LEFT JOIN users owner_user ON owner_user.id = repo.owner_user_id
80 LEFT JOIN orgs owner_org ON owner_org.id = repo.owner_org_id
81 `
82
83 type ClaimQueuedWorkflowJobParams struct {
84 Labels []string
85 RunnerID int64
86 }
87
88 type ClaimQueuedWorkflowJobRow struct {
89 ID int64
90 RunID int64
91 JobIndex int32
92 JobKey string
93 JobName string
94 RunsOn string
95 RunnerID pgtype.Int8
96 NeedsJobs []string
97 IfExpr string
98 TimeoutMinutes int32
99 Permissions []byte
100 JobEnv []byte
101 Status WorkflowJobStatus
102 Conclusion NullCheckConclusion
103 CancelRequested bool
104 StartedAt pgtype.Timestamptz
105 CompletedAt pgtype.Timestamptz
106 Version int32
107 CreatedAt pgtype.Timestamptz
108 UpdatedAt pgtype.Timestamptz
109 RepoID int64
110 RunIndex int64
111 WorkflowFile string
112 WorkflowName string
113 HeadSha string
114 HeadRef string
115 Event WorkflowRunEvent
116 EventPayload []byte
117 RepoOwner string
118 RepoName string
119 }
120
121 func (q *Queries) ClaimQueuedWorkflowJob(ctx context.Context, db DBTX, arg ClaimQueuedWorkflowJobParams) (ClaimQueuedWorkflowJobRow, error) {
122 row := db.QueryRow(ctx, claimQueuedWorkflowJob, arg.Labels, arg.RunnerID)
123 var i ClaimQueuedWorkflowJobRow
124 err := row.Scan(
125 &i.ID,
126 &i.RunID,
127 &i.JobIndex,
128 &i.JobKey,
129 &i.JobName,
130 &i.RunsOn,
131 &i.RunnerID,
132 &i.NeedsJobs,
133 &i.IfExpr,
134 &i.TimeoutMinutes,
135 &i.Permissions,
136 &i.JobEnv,
137 &i.Status,
138 &i.Conclusion,
139 &i.CancelRequested,
140 &i.StartedAt,
141 &i.CompletedAt,
142 &i.Version,
143 &i.CreatedAt,
144 &i.UpdatedAt,
145 &i.RepoID,
146 &i.RunIndex,
147 &i.WorkflowFile,
148 &i.WorkflowName,
149 &i.HeadSha,
150 &i.HeadRef,
151 &i.Event,
152 &i.EventPayload,
153 &i.RepoOwner,
154 &i.RepoName,
155 )
156 return i, err
157 }
158
159 const countRunningJobsForRunner = `-- name: CountRunningJobsForRunner :one
160 SELECT COUNT(*)::integer
161 FROM workflow_jobs
162 WHERE runner_id = $1::bigint AND status = 'running'
163 `
164
165 func (q *Queries) CountRunningJobsForRunner(ctx context.Context, db DBTX, runnerID int64) (int32, error) {
166 row := db.QueryRow(ctx, countRunningJobsForRunner, runnerID)
167 var column_1 int32
168 err := row.Scan(&column_1)
169 return column_1, err
170 }
171
172 const getWorkflowJobByID = `-- name: GetWorkflowJobByID :one
173 SELECT id, run_id, job_index, job_key, job_name, runs_on,
174 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
175 job_env, status, conclusion, cancel_requested,
176 started_at, completed_at, version, created_at, updated_at
177 FROM workflow_jobs
178 WHERE id = $1
179 `
180
181 func (q *Queries) GetWorkflowJobByID(ctx context.Context, db DBTX, id int64) (WorkflowJob, error) {
182 row := db.QueryRow(ctx, getWorkflowJobByID, id)
183 var i WorkflowJob
184 err := row.Scan(
185 &i.ID,
186 &i.RunID,
187 &i.JobIndex,
188 &i.JobKey,
189 &i.JobName,
190 &i.RunsOn,
191 &i.RunnerID,
192 &i.NeedsJobs,
193 &i.IfExpr,
194 &i.TimeoutMinutes,
195 &i.Permissions,
196 &i.JobEnv,
197 &i.Status,
198 &i.Conclusion,
199 &i.CancelRequested,
200 &i.StartedAt,
201 &i.CompletedAt,
202 &i.Version,
203 &i.CreatedAt,
204 &i.UpdatedAt,
205 )
206 return i, err
207 }
208
209 const insertWorkflowJob = `-- name: InsertWorkflowJob :one
210
211 INSERT INTO workflow_jobs (
212 run_id, job_index, job_key, job_name,
213 runs_on, needs_jobs, if_expr, timeout_minutes,
214 permissions, job_env
215 ) VALUES (
216 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
217 )
218 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
219 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
220 job_env, status, conclusion, cancel_requested,
221 started_at, completed_at, version, created_at, updated_at
222 `
223
224 type InsertWorkflowJobParams struct {
225 RunID int64
226 JobIndex int32
227 JobKey string
228 JobName string
229 RunsOn string
230 NeedsJobs []string
231 IfExpr string
232 TimeoutMinutes int32
233 Permissions []byte
234 JobEnv []byte
235 }
236
237 // SPDX-License-Identifier: AGPL-3.0-or-later
238 func (q *Queries) InsertWorkflowJob(ctx context.Context, db DBTX, arg InsertWorkflowJobParams) (WorkflowJob, error) {
239 row := db.QueryRow(ctx, insertWorkflowJob,
240 arg.RunID,
241 arg.JobIndex,
242 arg.JobKey,
243 arg.JobName,
244 arg.RunsOn,
245 arg.NeedsJobs,
246 arg.IfExpr,
247 arg.TimeoutMinutes,
248 arg.Permissions,
249 arg.JobEnv,
250 )
251 var i WorkflowJob
252 err := row.Scan(
253 &i.ID,
254 &i.RunID,
255 &i.JobIndex,
256 &i.JobKey,
257 &i.JobName,
258 &i.RunsOn,
259 &i.RunnerID,
260 &i.NeedsJobs,
261 &i.IfExpr,
262 &i.TimeoutMinutes,
263 &i.Permissions,
264 &i.JobEnv,
265 &i.Status,
266 &i.Conclusion,
267 &i.CancelRequested,
268 &i.StartedAt,
269 &i.CompletedAt,
270 &i.Version,
271 &i.CreatedAt,
272 &i.UpdatedAt,
273 )
274 return i, err
275 }
276
277 const listJobsForRun = `-- name: ListJobsForRun :many
278 SELECT id, run_id, job_index, job_key, job_name, runs_on, status,
279 conclusion, cancel_requested, needs_jobs, started_at, completed_at, created_at, updated_at
280 FROM workflow_jobs
281 WHERE run_id = $1
282 ORDER BY job_index ASC
283 `
284
285 type ListJobsForRunRow struct {
286 ID int64
287 RunID int64
288 JobIndex int32
289 JobKey string
290 JobName string
291 RunsOn string
292 Status WorkflowJobStatus
293 Conclusion NullCheckConclusion
294 CancelRequested bool
295 NeedsJobs []string
296 StartedAt pgtype.Timestamptz
297 CompletedAt pgtype.Timestamptz
298 CreatedAt pgtype.Timestamptz
299 UpdatedAt pgtype.Timestamptz
300 }
301
302 func (q *Queries) ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]ListJobsForRunRow, error) {
303 rows, err := db.Query(ctx, listJobsForRun, runID)
304 if err != nil {
305 return nil, err
306 }
307 defer rows.Close()
308 items := []ListJobsForRunRow{}
309 for rows.Next() {
310 var i ListJobsForRunRow
311 if err := rows.Scan(
312 &i.ID,
313 &i.RunID,
314 &i.JobIndex,
315 &i.JobKey,
316 &i.JobName,
317 &i.RunsOn,
318 &i.Status,
319 &i.Conclusion,
320 &i.CancelRequested,
321 &i.NeedsJobs,
322 &i.StartedAt,
323 &i.CompletedAt,
324 &i.CreatedAt,
325 &i.UpdatedAt,
326 ); err != nil {
327 return nil, err
328 }
329 items = append(items, i)
330 }
331 if err := rows.Err(); err != nil {
332 return nil, err
333 }
334 return items, nil
335 }
336
337 const listQueuedWorkflowJobRunsOn = `-- name: ListQueuedWorkflowJobRunsOn :many
338 SELECT
339 COALESCE(NULLIF(j.runs_on, ''), '(none)')::text AS runs_on,
340 COUNT(*)::integer AS queued_jobs,
341 COUNT(DISTINCT wr.id)::integer AS matching_runner_count,
342 MIN(j.created_at)::timestamptz AS oldest_queued_at
343 FROM workflow_jobs j
344 LEFT JOIN workflow_runners wr
345 ON (j.runs_on = '' OR j.runs_on = ANY(wr.labels))
346 AND wr.status IN ('idle', 'busy')
347 WHERE j.status = 'queued'
348 AND j.cancel_requested = false
349 AND j.runner_id IS NULL
350 GROUP BY COALESCE(NULLIF(j.runs_on, ''), '(none)')
351 ORDER BY queued_jobs DESC, runs_on ASC
352 `
353
354 type ListQueuedWorkflowJobRunsOnRow struct {
355 RunsOn string
356 QueuedJobs int32
357 MatchingRunnerCount int32
358 OldestQueuedAt pgtype.Timestamptz
359 }
360
361 func (q *Queries) ListQueuedWorkflowJobRunsOn(ctx context.Context, db DBTX) ([]ListQueuedWorkflowJobRunsOnRow, error) {
362 rows, err := db.Query(ctx, listQueuedWorkflowJobRunsOn)
363 if err != nil {
364 return nil, err
365 }
366 defer rows.Close()
367 items := []ListQueuedWorkflowJobRunsOnRow{}
368 for rows.Next() {
369 var i ListQueuedWorkflowJobRunsOnRow
370 if err := rows.Scan(
371 &i.RunsOn,
372 &i.QueuedJobs,
373 &i.MatchingRunnerCount,
374 &i.OldestQueuedAt,
375 ); err != nil {
376 return nil, err
377 }
378 items = append(items, i)
379 }
380 if err := rows.Err(); err != nil {
381 return nil, err
382 }
383 return items, nil
384 }
385
386 const requestWorkflowJobCancel = `-- name: RequestWorkflowJobCancel :one
387 UPDATE workflow_jobs
388 SET cancel_requested = true,
389 status = CASE
390 WHEN status = 'queued' THEN 'cancelled'::workflow_job_status
391 ELSE status
392 END,
393 conclusion = CASE
394 WHEN status = 'queued' THEN 'cancelled'::check_conclusion
395 ELSE conclusion
396 END,
397 started_at = CASE
398 WHEN status = 'queued' THEN COALESCE(started_at, now())
399 ELSE started_at
400 END,
401 completed_at = CASE
402 WHEN status = 'queued' THEN COALESCE(completed_at, now())
403 ELSE completed_at
404 END,
405 version = version + 1,
406 updated_at = now()
407 WHERE id = $1
408 AND status IN ('queued', 'running')
409 AND (status = 'queued' OR cancel_requested = false)
410 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
411 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
412 job_env, status, conclusion, cancel_requested,
413 started_at, completed_at, version, created_at, updated_at
414 `
415
416 func (q *Queries) RequestWorkflowJobCancel(ctx context.Context, db DBTX, id int64) (WorkflowJob, error) {
417 row := db.QueryRow(ctx, requestWorkflowJobCancel, id)
418 var i WorkflowJob
419 err := row.Scan(
420 &i.ID,
421 &i.RunID,
422 &i.JobIndex,
423 &i.JobKey,
424 &i.JobName,
425 &i.RunsOn,
426 &i.RunnerID,
427 &i.NeedsJobs,
428 &i.IfExpr,
429 &i.TimeoutMinutes,
430 &i.Permissions,
431 &i.JobEnv,
432 &i.Status,
433 &i.Conclusion,
434 &i.CancelRequested,
435 &i.StartedAt,
436 &i.CompletedAt,
437 &i.Version,
438 &i.CreatedAt,
439 &i.UpdatedAt,
440 )
441 return i, err
442 }
443
444 const requestWorkflowRunCancel = `-- name: RequestWorkflowRunCancel :many
445 UPDATE workflow_jobs
446 SET cancel_requested = true,
447 status = CASE
448 WHEN status = 'queued' THEN 'cancelled'::workflow_job_status
449 ELSE status
450 END,
451 conclusion = CASE
452 WHEN status = 'queued' THEN 'cancelled'::check_conclusion
453 ELSE conclusion
454 END,
455 started_at = CASE
456 WHEN status = 'queued' THEN COALESCE(started_at, now())
457 ELSE started_at
458 END,
459 completed_at = CASE
460 WHEN status = 'queued' THEN COALESCE(completed_at, now())
461 ELSE completed_at
462 END,
463 version = version + 1,
464 updated_at = now()
465 WHERE run_id = $1
466 AND status IN ('queued', 'running')
467 AND (status = 'queued' OR cancel_requested = false)
468 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
469 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
470 job_env, status, conclusion, cancel_requested,
471 started_at, completed_at, version, created_at, updated_at
472 `
473
474 func (q *Queries) RequestWorkflowRunCancel(ctx context.Context, db DBTX, runID int64) ([]WorkflowJob, error) {
475 rows, err := db.Query(ctx, requestWorkflowRunCancel, runID)
476 if err != nil {
477 return nil, err
478 }
479 defer rows.Close()
480 items := []WorkflowJob{}
481 for rows.Next() {
482 var i WorkflowJob
483 if err := rows.Scan(
484 &i.ID,
485 &i.RunID,
486 &i.JobIndex,
487 &i.JobKey,
488 &i.JobName,
489 &i.RunsOn,
490 &i.RunnerID,
491 &i.NeedsJobs,
492 &i.IfExpr,
493 &i.TimeoutMinutes,
494 &i.Permissions,
495 &i.JobEnv,
496 &i.Status,
497 &i.Conclusion,
498 &i.CancelRequested,
499 &i.StartedAt,
500 &i.CompletedAt,
501 &i.Version,
502 &i.CreatedAt,
503 &i.UpdatedAt,
504 ); err != nil {
505 return nil, err
506 }
507 items = append(items, i)
508 }
509 if err := rows.Err(); err != nil {
510 return nil, err
511 }
512 return items, nil
513 }
514
515 const updateWorkflowJobStatus = `-- name: UpdateWorkflowJobStatus :one
516 UPDATE workflow_jobs
517 SET status = $2,
518 conclusion = $3::check_conclusion,
519 started_at = $4::timestamptz,
520 completed_at = $5::timestamptz,
521 version = version + 1,
522 updated_at = now()
523 WHERE id = $1
524 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
525 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
526 job_env, status, conclusion, cancel_requested,
527 started_at, completed_at, version, created_at, updated_at
528 `
529
530 type UpdateWorkflowJobStatusParams struct {
531 ID int64
532 Status WorkflowJobStatus
533 Conclusion NullCheckConclusion
534 StartedAt pgtype.Timestamptz
535 CompletedAt pgtype.Timestamptz
536 }
537
538 func (q *Queries) UpdateWorkflowJobStatus(ctx context.Context, db DBTX, arg UpdateWorkflowJobStatusParams) (WorkflowJob, error) {
539 row := db.QueryRow(ctx, updateWorkflowJobStatus,
540 arg.ID,
541 arg.Status,
542 arg.Conclusion,
543 arg.StartedAt,
544 arg.CompletedAt,
545 )
546 var i WorkflowJob
547 err := row.Scan(
548 &i.ID,
549 &i.RunID,
550 &i.JobIndex,
551 &i.JobKey,
552 &i.JobName,
553 &i.RunsOn,
554 &i.RunnerID,
555 &i.NeedsJobs,
556 &i.IfExpr,
557 &i.TimeoutMinutes,
558 &i.Permissions,
559 &i.JobEnv,
560 &i.Status,
561 &i.Conclusion,
562 &i.CancelRequested,
563 &i.StartedAt,
564 &i.CompletedAt,
565 &i.Version,
566 &i.CreatedAt,
567 &i.UpdatedAt,
568 )
569 return i, err
570 }
571