Go · 13500 bytes Raw Blame History
1 // Code generated by sqlc. DO NOT EDIT.
2 // versions:
3 // sqlc v1.31.1
4 // source: workflow_jobs.sql
5
6 package actionsdb
7
8 import (
9 "context"
10
11 "github.com/jackc/pgx/v5/pgtype"
12 )
13
14 const claimQueuedWorkflowJob = `-- name: ClaimQueuedWorkflowJob :one
15 WITH candidate AS (
16 SELECT j.id
17 FROM workflow_jobs j
18 JOIN workflow_runs r ON r.id = j.run_id
19 WHERE j.status = 'queued'
20 AND r.status IN ('queued', 'running')
21 AND j.cancel_requested = false
22 AND j.runner_id IS NULL
23 AND (j.runs_on = '' OR j.runs_on = ANY($1::text[]))
24 AND NOT EXISTS (
25 SELECT 1
26 FROM workflow_jobs dep
27 WHERE dep.run_id = j.run_id
28 AND dep.job_key = ANY(j.needs_jobs)
29 AND (dep.status <> 'completed' OR dep.conclusion <> 'success')
30 )
31 AND NOT EXISTS (
32 SELECT 1
33 FROM workflow_runs blocker
34 WHERE r.concurrency_group <> ''
35 AND blocker.repo_id = r.repo_id
36 AND blocker.concurrency_group = r.concurrency_group
37 AND blocker.id <> r.id
38 AND blocker.status IN ('queued', 'running')
39 AND (blocker.created_at, blocker.id) < (r.created_at, r.id)
40 AND EXISTS (
41 SELECT 1
42 FROM workflow_jobs blocker_job
43 WHERE blocker_job.run_id = blocker.id
44 AND blocker_job.status IN ('queued', 'running')
45 AND blocker_job.cancel_requested = false
46 )
47 )
48 ORDER BY j.created_at ASC, j.id ASC
49 FOR UPDATE OF j SKIP LOCKED
50 LIMIT 1
51 ),
52 claimed AS (
53 UPDATE workflow_jobs j
54 SET runner_id = $2::bigint,
55 status = 'running',
56 started_at = COALESCE(j.started_at, now()),
57 version = j.version + 1,
58 updated_at = now()
59 FROM candidate c
60 WHERE j.id = c.id
61 RETURNING j.id, j.run_id, j.job_index, j.job_key, j.job_name, j.runs_on,
62 j.runner_id, j.needs_jobs, j.if_expr, j.timeout_minutes,
63 j.permissions, j.job_env, j.status, j.conclusion,
64 j.cancel_requested, j.started_at, j.completed_at, j.version,
65 j.created_at, j.updated_at
66 )
67 SELECT c.id, c.run_id, c.job_index, c.job_key, c.job_name, c.runs_on,
68 c.runner_id, c.needs_jobs, c.if_expr, c.timeout_minutes,
69 c.permissions, c.job_env, c.status, c.conclusion,
70 c.cancel_requested, c.started_at, c.completed_at, c.version,
71 c.created_at, c.updated_at,
72 r.repo_id, r.run_index, r.workflow_file, r.workflow_name,
73 r.head_sha, r.head_ref, r.event, r.event_payload,
74 COALESCE(owner_user.username, owner_org.slug)::text AS repo_owner,
75 repo.name AS repo_name
76 FROM claimed c
77 JOIN workflow_runs r ON r.id = c.run_id
78 JOIN repos repo ON repo.id = r.repo_id
79 LEFT JOIN users owner_user ON owner_user.id = repo.owner_user_id
80 LEFT JOIN orgs owner_org ON owner_org.id = repo.owner_org_id
81 `
82
83 type ClaimQueuedWorkflowJobParams struct {
84 Labels []string
85 RunnerID int64
86 }
87
88 type ClaimQueuedWorkflowJobRow struct {
89 ID int64
90 RunID int64
91 JobIndex int32
92 JobKey string
93 JobName string
94 RunsOn string
95 RunnerID pgtype.Int8
96 NeedsJobs []string
97 IfExpr string
98 TimeoutMinutes int32
99 Permissions []byte
100 JobEnv []byte
101 Status WorkflowJobStatus
102 Conclusion NullCheckConclusion
103 CancelRequested bool
104 StartedAt pgtype.Timestamptz
105 CompletedAt pgtype.Timestamptz
106 Version int32
107 CreatedAt pgtype.Timestamptz
108 UpdatedAt pgtype.Timestamptz
109 RepoID int64
110 RunIndex int64
111 WorkflowFile string
112 WorkflowName string
113 HeadSha string
114 HeadRef string
115 Event WorkflowRunEvent
116 EventPayload []byte
117 RepoOwner string
118 RepoName string
119 }
120
121 func (q *Queries) ClaimQueuedWorkflowJob(ctx context.Context, db DBTX, arg ClaimQueuedWorkflowJobParams) (ClaimQueuedWorkflowJobRow, error) {
122 row := db.QueryRow(ctx, claimQueuedWorkflowJob, arg.Labels, arg.RunnerID)
123 var i ClaimQueuedWorkflowJobRow
124 err := row.Scan(
125 &i.ID,
126 &i.RunID,
127 &i.JobIndex,
128 &i.JobKey,
129 &i.JobName,
130 &i.RunsOn,
131 &i.RunnerID,
132 &i.NeedsJobs,
133 &i.IfExpr,
134 &i.TimeoutMinutes,
135 &i.Permissions,
136 &i.JobEnv,
137 &i.Status,
138 &i.Conclusion,
139 &i.CancelRequested,
140 &i.StartedAt,
141 &i.CompletedAt,
142 &i.Version,
143 &i.CreatedAt,
144 &i.UpdatedAt,
145 &i.RepoID,
146 &i.RunIndex,
147 &i.WorkflowFile,
148 &i.WorkflowName,
149 &i.HeadSha,
150 &i.HeadRef,
151 &i.Event,
152 &i.EventPayload,
153 &i.RepoOwner,
154 &i.RepoName,
155 )
156 return i, err
157 }
158
159 const countRunningJobsForRunner = `-- name: CountRunningJobsForRunner :one
160 SELECT COUNT(*)::integer
161 FROM workflow_jobs
162 WHERE runner_id = $1::bigint AND status = 'running'
163 `
164
165 func (q *Queries) CountRunningJobsForRunner(ctx context.Context, db DBTX, runnerID int64) (int32, error) {
166 row := db.QueryRow(ctx, countRunningJobsForRunner, runnerID)
167 var column_1 int32
168 err := row.Scan(&column_1)
169 return column_1, err
170 }
171
172 const getWorkflowJobByID = `-- name: GetWorkflowJobByID :one
173 SELECT id, run_id, job_index, job_key, job_name, runs_on,
174 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
175 job_env, status, conclusion, cancel_requested,
176 started_at, completed_at, version, created_at, updated_at
177 FROM workflow_jobs
178 WHERE id = $1
179 `
180
181 func (q *Queries) GetWorkflowJobByID(ctx context.Context, db DBTX, id int64) (WorkflowJob, error) {
182 row := db.QueryRow(ctx, getWorkflowJobByID, id)
183 var i WorkflowJob
184 err := row.Scan(
185 &i.ID,
186 &i.RunID,
187 &i.JobIndex,
188 &i.JobKey,
189 &i.JobName,
190 &i.RunsOn,
191 &i.RunnerID,
192 &i.NeedsJobs,
193 &i.IfExpr,
194 &i.TimeoutMinutes,
195 &i.Permissions,
196 &i.JobEnv,
197 &i.Status,
198 &i.Conclusion,
199 &i.CancelRequested,
200 &i.StartedAt,
201 &i.CompletedAt,
202 &i.Version,
203 &i.CreatedAt,
204 &i.UpdatedAt,
205 )
206 return i, err
207 }
208
209 const insertWorkflowJob = `-- name: InsertWorkflowJob :one
210
211 INSERT INTO workflow_jobs (
212 run_id, job_index, job_key, job_name,
213 runs_on, needs_jobs, if_expr, timeout_minutes,
214 permissions, job_env
215 ) VALUES (
216 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
217 )
218 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
219 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
220 job_env, status, conclusion, cancel_requested,
221 started_at, completed_at, version, created_at, updated_at
222 `
223
224 type InsertWorkflowJobParams struct {
225 RunID int64
226 JobIndex int32
227 JobKey string
228 JobName string
229 RunsOn string
230 NeedsJobs []string
231 IfExpr string
232 TimeoutMinutes int32
233 Permissions []byte
234 JobEnv []byte
235 }
236
237 // SPDX-License-Identifier: AGPL-3.0-or-later
238 func (q *Queries) InsertWorkflowJob(ctx context.Context, db DBTX, arg InsertWorkflowJobParams) (WorkflowJob, error) {
239 row := db.QueryRow(ctx, insertWorkflowJob,
240 arg.RunID,
241 arg.JobIndex,
242 arg.JobKey,
243 arg.JobName,
244 arg.RunsOn,
245 arg.NeedsJobs,
246 arg.IfExpr,
247 arg.TimeoutMinutes,
248 arg.Permissions,
249 arg.JobEnv,
250 )
251 var i WorkflowJob
252 err := row.Scan(
253 &i.ID,
254 &i.RunID,
255 &i.JobIndex,
256 &i.JobKey,
257 &i.JobName,
258 &i.RunsOn,
259 &i.RunnerID,
260 &i.NeedsJobs,
261 &i.IfExpr,
262 &i.TimeoutMinutes,
263 &i.Permissions,
264 &i.JobEnv,
265 &i.Status,
266 &i.Conclusion,
267 &i.CancelRequested,
268 &i.StartedAt,
269 &i.CompletedAt,
270 &i.Version,
271 &i.CreatedAt,
272 &i.UpdatedAt,
273 )
274 return i, err
275 }
276
277 const listJobsForRun = `-- name: ListJobsForRun :many
278 SELECT id, run_id, job_index, job_key, job_name, runs_on, status,
279 conclusion, cancel_requested, needs_jobs, started_at, completed_at, created_at, updated_at
280 FROM workflow_jobs
281 WHERE run_id = $1
282 ORDER BY job_index ASC
283 `
284
285 type ListJobsForRunRow struct {
286 ID int64
287 RunID int64
288 JobIndex int32
289 JobKey string
290 JobName string
291 RunsOn string
292 Status WorkflowJobStatus
293 Conclusion NullCheckConclusion
294 CancelRequested bool
295 NeedsJobs []string
296 StartedAt pgtype.Timestamptz
297 CompletedAt pgtype.Timestamptz
298 CreatedAt pgtype.Timestamptz
299 UpdatedAt pgtype.Timestamptz
300 }
301
302 func (q *Queries) ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]ListJobsForRunRow, error) {
303 rows, err := db.Query(ctx, listJobsForRun, runID)
304 if err != nil {
305 return nil, err
306 }
307 defer rows.Close()
308 items := []ListJobsForRunRow{}
309 for rows.Next() {
310 var i ListJobsForRunRow
311 if err := rows.Scan(
312 &i.ID,
313 &i.RunID,
314 &i.JobIndex,
315 &i.JobKey,
316 &i.JobName,
317 &i.RunsOn,
318 &i.Status,
319 &i.Conclusion,
320 &i.CancelRequested,
321 &i.NeedsJobs,
322 &i.StartedAt,
323 &i.CompletedAt,
324 &i.CreatedAt,
325 &i.UpdatedAt,
326 ); err != nil {
327 return nil, err
328 }
329 items = append(items, i)
330 }
331 if err := rows.Err(); err != nil {
332 return nil, err
333 }
334 return items, nil
335 }
336
337 const requestWorkflowJobCancel = `-- name: RequestWorkflowJobCancel :one
338 UPDATE workflow_jobs
339 SET cancel_requested = true,
340 status = CASE
341 WHEN status = 'queued' THEN 'cancelled'::workflow_job_status
342 ELSE status
343 END,
344 conclusion = CASE
345 WHEN status = 'queued' THEN 'cancelled'::check_conclusion
346 ELSE conclusion
347 END,
348 started_at = CASE
349 WHEN status = 'queued' THEN COALESCE(started_at, now())
350 ELSE started_at
351 END,
352 completed_at = CASE
353 WHEN status = 'queued' THEN COALESCE(completed_at, now())
354 ELSE completed_at
355 END,
356 version = version + 1,
357 updated_at = now()
358 WHERE id = $1
359 AND status IN ('queued', 'running')
360 AND (status = 'queued' OR cancel_requested = false)
361 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
362 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
363 job_env, status, conclusion, cancel_requested,
364 started_at, completed_at, version, created_at, updated_at
365 `
366
367 func (q *Queries) RequestWorkflowJobCancel(ctx context.Context, db DBTX, id int64) (WorkflowJob, error) {
368 row := db.QueryRow(ctx, requestWorkflowJobCancel, id)
369 var i WorkflowJob
370 err := row.Scan(
371 &i.ID,
372 &i.RunID,
373 &i.JobIndex,
374 &i.JobKey,
375 &i.JobName,
376 &i.RunsOn,
377 &i.RunnerID,
378 &i.NeedsJobs,
379 &i.IfExpr,
380 &i.TimeoutMinutes,
381 &i.Permissions,
382 &i.JobEnv,
383 &i.Status,
384 &i.Conclusion,
385 &i.CancelRequested,
386 &i.StartedAt,
387 &i.CompletedAt,
388 &i.Version,
389 &i.CreatedAt,
390 &i.UpdatedAt,
391 )
392 return i, err
393 }
394
395 const requestWorkflowRunCancel = `-- name: RequestWorkflowRunCancel :many
396 UPDATE workflow_jobs
397 SET cancel_requested = true,
398 status = CASE
399 WHEN status = 'queued' THEN 'cancelled'::workflow_job_status
400 ELSE status
401 END,
402 conclusion = CASE
403 WHEN status = 'queued' THEN 'cancelled'::check_conclusion
404 ELSE conclusion
405 END,
406 started_at = CASE
407 WHEN status = 'queued' THEN COALESCE(started_at, now())
408 ELSE started_at
409 END,
410 completed_at = CASE
411 WHEN status = 'queued' THEN COALESCE(completed_at, now())
412 ELSE completed_at
413 END,
414 version = version + 1,
415 updated_at = now()
416 WHERE run_id = $1
417 AND status IN ('queued', 'running')
418 AND (status = 'queued' OR cancel_requested = false)
419 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
420 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
421 job_env, status, conclusion, cancel_requested,
422 started_at, completed_at, version, created_at, updated_at
423 `
424
425 func (q *Queries) RequestWorkflowRunCancel(ctx context.Context, db DBTX, runID int64) ([]WorkflowJob, error) {
426 rows, err := db.Query(ctx, requestWorkflowRunCancel, runID)
427 if err != nil {
428 return nil, err
429 }
430 defer rows.Close()
431 items := []WorkflowJob{}
432 for rows.Next() {
433 var i WorkflowJob
434 if err := rows.Scan(
435 &i.ID,
436 &i.RunID,
437 &i.JobIndex,
438 &i.JobKey,
439 &i.JobName,
440 &i.RunsOn,
441 &i.RunnerID,
442 &i.NeedsJobs,
443 &i.IfExpr,
444 &i.TimeoutMinutes,
445 &i.Permissions,
446 &i.JobEnv,
447 &i.Status,
448 &i.Conclusion,
449 &i.CancelRequested,
450 &i.StartedAt,
451 &i.CompletedAt,
452 &i.Version,
453 &i.CreatedAt,
454 &i.UpdatedAt,
455 ); err != nil {
456 return nil, err
457 }
458 items = append(items, i)
459 }
460 if err := rows.Err(); err != nil {
461 return nil, err
462 }
463 return items, nil
464 }
465
466 const updateWorkflowJobStatus = `-- name: UpdateWorkflowJobStatus :one
467 UPDATE workflow_jobs
468 SET status = $2,
469 conclusion = $3::check_conclusion,
470 started_at = $4::timestamptz,
471 completed_at = $5::timestamptz,
472 version = version + 1,
473 updated_at = now()
474 WHERE id = $1
475 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
476 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
477 job_env, status, conclusion, cancel_requested,
478 started_at, completed_at, version, created_at, updated_at
479 `
480
481 type UpdateWorkflowJobStatusParams struct {
482 ID int64
483 Status WorkflowJobStatus
484 Conclusion NullCheckConclusion
485 StartedAt pgtype.Timestamptz
486 CompletedAt pgtype.Timestamptz
487 }
488
489 func (q *Queries) UpdateWorkflowJobStatus(ctx context.Context, db DBTX, arg UpdateWorkflowJobStatusParams) (WorkflowJob, error) {
490 row := db.QueryRow(ctx, updateWorkflowJobStatus,
491 arg.ID,
492 arg.Status,
493 arg.Conclusion,
494 arg.StartedAt,
495 arg.CompletedAt,
496 )
497 var i WorkflowJob
498 err := row.Scan(
499 &i.ID,
500 &i.RunID,
501 &i.JobIndex,
502 &i.JobKey,
503 &i.JobName,
504 &i.RunsOn,
505 &i.RunnerID,
506 &i.NeedsJobs,
507 &i.IfExpr,
508 &i.TimeoutMinutes,
509 &i.Permissions,
510 &i.JobEnv,
511 &i.Status,
512 &i.Conclusion,
513 &i.CancelRequested,
514 &i.StartedAt,
515 &i.CompletedAt,
516 &i.Version,
517 &i.CreatedAt,
518 &i.UpdatedAt,
519 )
520 return i, err
521 }
522