Go · 13151 bytes Raw Blame History
1 // Code generated by sqlc. DO NOT EDIT.
2 // versions:
3 // sqlc v1.31.1
4 // source: workflow_jobs.sql
5
6 package actionsdb
7
8 import (
9 "context"
10
11 "github.com/jackc/pgx/v5/pgtype"
12 )
13
14 const claimQueuedWorkflowJob = `-- name: ClaimQueuedWorkflowJob :one
15 WITH candidate AS (
16 SELECT j.id
17 FROM workflow_jobs j
18 JOIN workflow_runs r ON r.id = j.run_id
19 WHERE j.status = 'queued'
20 AND r.status IN ('queued', 'running')
21 AND j.cancel_requested = false
22 AND j.runner_id IS NULL
23 AND (j.runs_on = '' OR j.runs_on = ANY($1::text[]))
24 AND NOT EXISTS (
25 SELECT 1
26 FROM workflow_jobs dep
27 WHERE dep.run_id = j.run_id
28 AND dep.job_key = ANY(j.needs_jobs)
29 AND (dep.status <> 'completed' OR dep.conclusion <> 'success')
30 )
31 AND NOT EXISTS (
32 SELECT 1
33 FROM workflow_runs blocker
34 WHERE r.concurrency_group <> ''
35 AND blocker.repo_id = r.repo_id
36 AND blocker.concurrency_group = r.concurrency_group
37 AND blocker.id <> r.id
38 AND blocker.status IN ('queued', 'running')
39 AND (blocker.created_at, blocker.id) < (r.created_at, r.id)
40 AND EXISTS (
41 SELECT 1
42 FROM workflow_jobs blocker_job
43 WHERE blocker_job.run_id = blocker.id
44 AND blocker_job.status IN ('queued', 'running')
45 AND blocker_job.cancel_requested = false
46 )
47 )
48 ORDER BY j.created_at ASC, j.id ASC
49 FOR UPDATE OF j SKIP LOCKED
50 LIMIT 1
51 ),
52 claimed AS (
53 UPDATE workflow_jobs j
54 SET runner_id = $2::bigint,
55 status = 'running',
56 started_at = COALESCE(j.started_at, now()),
57 version = j.version + 1,
58 updated_at = now()
59 FROM candidate c
60 WHERE j.id = c.id
61 RETURNING j.id, j.run_id, j.job_index, j.job_key, j.job_name, j.runs_on,
62 j.runner_id, j.needs_jobs, j.if_expr, j.timeout_minutes,
63 j.permissions, j.job_env, j.status, j.conclusion,
64 j.cancel_requested, j.started_at, j.completed_at, j.version,
65 j.created_at, j.updated_at
66 )
67 SELECT c.id, c.run_id, c.job_index, c.job_key, c.job_name, c.runs_on,
68 c.runner_id, c.needs_jobs, c.if_expr, c.timeout_minutes,
69 c.permissions, c.job_env, c.status, c.conclusion,
70 c.cancel_requested, c.started_at, c.completed_at, c.version,
71 c.created_at, c.updated_at,
72 r.repo_id, r.run_index, r.workflow_file, r.workflow_name,
73 r.head_sha, r.head_ref, r.event, r.event_payload
74 FROM claimed c
75 JOIN workflow_runs r ON r.id = c.run_id
76 `
77
78 type ClaimQueuedWorkflowJobParams struct {
79 Labels []string
80 RunnerID int64
81 }
82
83 type ClaimQueuedWorkflowJobRow struct {
84 ID int64
85 RunID int64
86 JobIndex int32
87 JobKey string
88 JobName string
89 RunsOn string
90 RunnerID pgtype.Int8
91 NeedsJobs []string
92 IfExpr string
93 TimeoutMinutes int32
94 Permissions []byte
95 JobEnv []byte
96 Status WorkflowJobStatus
97 Conclusion NullCheckConclusion
98 CancelRequested bool
99 StartedAt pgtype.Timestamptz
100 CompletedAt pgtype.Timestamptz
101 Version int32
102 CreatedAt pgtype.Timestamptz
103 UpdatedAt pgtype.Timestamptz
104 RepoID int64
105 RunIndex int64
106 WorkflowFile string
107 WorkflowName string
108 HeadSha string
109 HeadRef string
110 Event WorkflowRunEvent
111 EventPayload []byte
112 }
113
114 func (q *Queries) ClaimQueuedWorkflowJob(ctx context.Context, db DBTX, arg ClaimQueuedWorkflowJobParams) (ClaimQueuedWorkflowJobRow, error) {
115 row := db.QueryRow(ctx, claimQueuedWorkflowJob, arg.Labels, arg.RunnerID)
116 var i ClaimQueuedWorkflowJobRow
117 err := row.Scan(
118 &i.ID,
119 &i.RunID,
120 &i.JobIndex,
121 &i.JobKey,
122 &i.JobName,
123 &i.RunsOn,
124 &i.RunnerID,
125 &i.NeedsJobs,
126 &i.IfExpr,
127 &i.TimeoutMinutes,
128 &i.Permissions,
129 &i.JobEnv,
130 &i.Status,
131 &i.Conclusion,
132 &i.CancelRequested,
133 &i.StartedAt,
134 &i.CompletedAt,
135 &i.Version,
136 &i.CreatedAt,
137 &i.UpdatedAt,
138 &i.RepoID,
139 &i.RunIndex,
140 &i.WorkflowFile,
141 &i.WorkflowName,
142 &i.HeadSha,
143 &i.HeadRef,
144 &i.Event,
145 &i.EventPayload,
146 )
147 return i, err
148 }
149
150 const countRunningJobsForRunner = `-- name: CountRunningJobsForRunner :one
151 SELECT COUNT(*)::integer
152 FROM workflow_jobs
153 WHERE runner_id = $1::bigint AND status = 'running'
154 `
155
156 func (q *Queries) CountRunningJobsForRunner(ctx context.Context, db DBTX, runnerID int64) (int32, error) {
157 row := db.QueryRow(ctx, countRunningJobsForRunner, runnerID)
158 var column_1 int32
159 err := row.Scan(&column_1)
160 return column_1, err
161 }
162
163 const getWorkflowJobByID = `-- name: GetWorkflowJobByID :one
164 SELECT id, run_id, job_index, job_key, job_name, runs_on,
165 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
166 job_env, status, conclusion, cancel_requested,
167 started_at, completed_at, version, created_at, updated_at
168 FROM workflow_jobs
169 WHERE id = $1
170 `
171
172 func (q *Queries) GetWorkflowJobByID(ctx context.Context, db DBTX, id int64) (WorkflowJob, error) {
173 row := db.QueryRow(ctx, getWorkflowJobByID, id)
174 var i WorkflowJob
175 err := row.Scan(
176 &i.ID,
177 &i.RunID,
178 &i.JobIndex,
179 &i.JobKey,
180 &i.JobName,
181 &i.RunsOn,
182 &i.RunnerID,
183 &i.NeedsJobs,
184 &i.IfExpr,
185 &i.TimeoutMinutes,
186 &i.Permissions,
187 &i.JobEnv,
188 &i.Status,
189 &i.Conclusion,
190 &i.CancelRequested,
191 &i.StartedAt,
192 &i.CompletedAt,
193 &i.Version,
194 &i.CreatedAt,
195 &i.UpdatedAt,
196 )
197 return i, err
198 }
199
200 const insertWorkflowJob = `-- name: InsertWorkflowJob :one
201
202 INSERT INTO workflow_jobs (
203 run_id, job_index, job_key, job_name,
204 runs_on, needs_jobs, if_expr, timeout_minutes,
205 permissions, job_env
206 ) VALUES (
207 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
208 )
209 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
210 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
211 job_env, status, conclusion, cancel_requested,
212 started_at, completed_at, version, created_at, updated_at
213 `
214
215 type InsertWorkflowJobParams struct {
216 RunID int64
217 JobIndex int32
218 JobKey string
219 JobName string
220 RunsOn string
221 NeedsJobs []string
222 IfExpr string
223 TimeoutMinutes int32
224 Permissions []byte
225 JobEnv []byte
226 }
227
228 // SPDX-License-Identifier: AGPL-3.0-or-later
229 func (q *Queries) InsertWorkflowJob(ctx context.Context, db DBTX, arg InsertWorkflowJobParams) (WorkflowJob, error) {
230 row := db.QueryRow(ctx, insertWorkflowJob,
231 arg.RunID,
232 arg.JobIndex,
233 arg.JobKey,
234 arg.JobName,
235 arg.RunsOn,
236 arg.NeedsJobs,
237 arg.IfExpr,
238 arg.TimeoutMinutes,
239 arg.Permissions,
240 arg.JobEnv,
241 )
242 var i WorkflowJob
243 err := row.Scan(
244 &i.ID,
245 &i.RunID,
246 &i.JobIndex,
247 &i.JobKey,
248 &i.JobName,
249 &i.RunsOn,
250 &i.RunnerID,
251 &i.NeedsJobs,
252 &i.IfExpr,
253 &i.TimeoutMinutes,
254 &i.Permissions,
255 &i.JobEnv,
256 &i.Status,
257 &i.Conclusion,
258 &i.CancelRequested,
259 &i.StartedAt,
260 &i.CompletedAt,
261 &i.Version,
262 &i.CreatedAt,
263 &i.UpdatedAt,
264 )
265 return i, err
266 }
267
268 const listJobsForRun = `-- name: ListJobsForRun :many
269 SELECT id, run_id, job_index, job_key, job_name, runs_on, status,
270 conclusion, cancel_requested, needs_jobs, started_at, completed_at, created_at, updated_at
271 FROM workflow_jobs
272 WHERE run_id = $1
273 ORDER BY job_index ASC
274 `
275
276 type ListJobsForRunRow struct {
277 ID int64
278 RunID int64
279 JobIndex int32
280 JobKey string
281 JobName string
282 RunsOn string
283 Status WorkflowJobStatus
284 Conclusion NullCheckConclusion
285 CancelRequested bool
286 NeedsJobs []string
287 StartedAt pgtype.Timestamptz
288 CompletedAt pgtype.Timestamptz
289 CreatedAt pgtype.Timestamptz
290 UpdatedAt pgtype.Timestamptz
291 }
292
293 func (q *Queries) ListJobsForRun(ctx context.Context, db DBTX, runID int64) ([]ListJobsForRunRow, error) {
294 rows, err := db.Query(ctx, listJobsForRun, runID)
295 if err != nil {
296 return nil, err
297 }
298 defer rows.Close()
299 items := []ListJobsForRunRow{}
300 for rows.Next() {
301 var i ListJobsForRunRow
302 if err := rows.Scan(
303 &i.ID,
304 &i.RunID,
305 &i.JobIndex,
306 &i.JobKey,
307 &i.JobName,
308 &i.RunsOn,
309 &i.Status,
310 &i.Conclusion,
311 &i.CancelRequested,
312 &i.NeedsJobs,
313 &i.StartedAt,
314 &i.CompletedAt,
315 &i.CreatedAt,
316 &i.UpdatedAt,
317 ); err != nil {
318 return nil, err
319 }
320 items = append(items, i)
321 }
322 if err := rows.Err(); err != nil {
323 return nil, err
324 }
325 return items, nil
326 }
327
328 const requestWorkflowJobCancel = `-- name: RequestWorkflowJobCancel :one
329 UPDATE workflow_jobs
330 SET cancel_requested = true,
331 status = CASE
332 WHEN status = 'queued' THEN 'cancelled'::workflow_job_status
333 ELSE status
334 END,
335 conclusion = CASE
336 WHEN status = 'queued' THEN 'cancelled'::check_conclusion
337 ELSE conclusion
338 END,
339 started_at = CASE
340 WHEN status = 'queued' THEN COALESCE(started_at, now())
341 ELSE started_at
342 END,
343 completed_at = CASE
344 WHEN status = 'queued' THEN COALESCE(completed_at, now())
345 ELSE completed_at
346 END,
347 version = version + 1,
348 updated_at = now()
349 WHERE id = $1
350 AND status IN ('queued', 'running')
351 AND (status = 'queued' OR cancel_requested = false)
352 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
353 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
354 job_env, status, conclusion, cancel_requested,
355 started_at, completed_at, version, created_at, updated_at
356 `
357
358 func (q *Queries) RequestWorkflowJobCancel(ctx context.Context, db DBTX, id int64) (WorkflowJob, error) {
359 row := db.QueryRow(ctx, requestWorkflowJobCancel, id)
360 var i WorkflowJob
361 err := row.Scan(
362 &i.ID,
363 &i.RunID,
364 &i.JobIndex,
365 &i.JobKey,
366 &i.JobName,
367 &i.RunsOn,
368 &i.RunnerID,
369 &i.NeedsJobs,
370 &i.IfExpr,
371 &i.TimeoutMinutes,
372 &i.Permissions,
373 &i.JobEnv,
374 &i.Status,
375 &i.Conclusion,
376 &i.CancelRequested,
377 &i.StartedAt,
378 &i.CompletedAt,
379 &i.Version,
380 &i.CreatedAt,
381 &i.UpdatedAt,
382 )
383 return i, err
384 }
385
386 const requestWorkflowRunCancel = `-- name: RequestWorkflowRunCancel :many
387 UPDATE workflow_jobs
388 SET cancel_requested = true,
389 status = CASE
390 WHEN status = 'queued' THEN 'cancelled'::workflow_job_status
391 ELSE status
392 END,
393 conclusion = CASE
394 WHEN status = 'queued' THEN 'cancelled'::check_conclusion
395 ELSE conclusion
396 END,
397 started_at = CASE
398 WHEN status = 'queued' THEN COALESCE(started_at, now())
399 ELSE started_at
400 END,
401 completed_at = CASE
402 WHEN status = 'queued' THEN COALESCE(completed_at, now())
403 ELSE completed_at
404 END,
405 version = version + 1,
406 updated_at = now()
407 WHERE run_id = $1
408 AND status IN ('queued', 'running')
409 AND (status = 'queued' OR cancel_requested = false)
410 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
411 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
412 job_env, status, conclusion, cancel_requested,
413 started_at, completed_at, version, created_at, updated_at
414 `
415
416 func (q *Queries) RequestWorkflowRunCancel(ctx context.Context, db DBTX, runID int64) ([]WorkflowJob, error) {
417 rows, err := db.Query(ctx, requestWorkflowRunCancel, runID)
418 if err != nil {
419 return nil, err
420 }
421 defer rows.Close()
422 items := []WorkflowJob{}
423 for rows.Next() {
424 var i WorkflowJob
425 if err := rows.Scan(
426 &i.ID,
427 &i.RunID,
428 &i.JobIndex,
429 &i.JobKey,
430 &i.JobName,
431 &i.RunsOn,
432 &i.RunnerID,
433 &i.NeedsJobs,
434 &i.IfExpr,
435 &i.TimeoutMinutes,
436 &i.Permissions,
437 &i.JobEnv,
438 &i.Status,
439 &i.Conclusion,
440 &i.CancelRequested,
441 &i.StartedAt,
442 &i.CompletedAt,
443 &i.Version,
444 &i.CreatedAt,
445 &i.UpdatedAt,
446 ); err != nil {
447 return nil, err
448 }
449 items = append(items, i)
450 }
451 if err := rows.Err(); err != nil {
452 return nil, err
453 }
454 return items, nil
455 }
456
457 const updateWorkflowJobStatus = `-- name: UpdateWorkflowJobStatus :one
458 UPDATE workflow_jobs
459 SET status = $2,
460 conclusion = $3::check_conclusion,
461 started_at = $4::timestamptz,
462 completed_at = $5::timestamptz,
463 version = version + 1,
464 updated_at = now()
465 WHERE id = $1
466 RETURNING id, run_id, job_index, job_key, job_name, runs_on,
467 runner_id, needs_jobs, if_expr, timeout_minutes, permissions,
468 job_env, status, conclusion, cancel_requested,
469 started_at, completed_at, version, created_at, updated_at
470 `
471
472 type UpdateWorkflowJobStatusParams struct {
473 ID int64
474 Status WorkflowJobStatus
475 Conclusion NullCheckConclusion
476 StartedAt pgtype.Timestamptz
477 CompletedAt pgtype.Timestamptz
478 }
479
480 func (q *Queries) UpdateWorkflowJobStatus(ctx context.Context, db DBTX, arg UpdateWorkflowJobStatusParams) (WorkflowJob, error) {
481 row := db.QueryRow(ctx, updateWorkflowJobStatus,
482 arg.ID,
483 arg.Status,
484 arg.Conclusion,
485 arg.StartedAt,
486 arg.CompletedAt,
487 )
488 var i WorkflowJob
489 err := row.Scan(
490 &i.ID,
491 &i.RunID,
492 &i.JobIndex,
493 &i.JobKey,
494 &i.JobName,
495 &i.RunsOn,
496 &i.RunnerID,
497 &i.NeedsJobs,
498 &i.IfExpr,
499 &i.TimeoutMinutes,
500 &i.Permissions,
501 &i.JobEnv,
502 &i.Status,
503 &i.Conclusion,
504 &i.CancelRequested,
505 &i.StartedAt,
506 &i.CompletedAt,
507 &i.Version,
508 &i.CreatedAt,
509 &i.UpdatedAt,
510 )
511 return i, err
512 }
513