@@ -20,6 +20,7 @@ import ( |
| 20 | 20 | "github.com/jackc/pgx/v5" |
| 21 | 21 | "github.com/jackc/pgx/v5/pgtype" |
| 22 | 22 | |
| 23 | + actionsevents "github.com/tenseleyFlow/shithub/internal/actions/events" |
| 23 | 24 | "github.com/tenseleyFlow/shithub/internal/actions/finalize" |
| 24 | 25 | actionslifecycle "github.com/tenseleyFlow/shithub/internal/actions/lifecycle" |
| 25 | 26 | "github.com/tenseleyFlow/shithub/internal/actions/logstream" |
@@ -220,7 +221,20 @@ func (h *Handlers) claimRunnerJob( |
| 220 | 221 | committed = true |
| 221 | 222 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, nil |
| 222 | 223 | } |
| 223 | | - if err := q.MarkWorkflowRunRunning(ctx, tx, job.RunID); err != nil { |
| 224 | + run, err := q.StartWorkflowRun(ctx, tx, job.RunID) |
| 225 | + if err == nil { |
| 226 | + if err := actionsevents.EmitRunTx(ctx, tx, run, actionsevents.ActionRunning); err != nil { |
| 227 | + return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 228 | + } |
| 229 | + } else if errors.Is(err, pgx.ErrNoRows) { |
| 230 | + run, err = q.GetWorkflowRunByID(ctx, tx, job.RunID) |
| 231 | + if err != nil { |
| 232 | + return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 233 | + } |
| 234 | + } else { |
| 235 | + return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 236 | + } |
| 237 | + if err := actionsevents.EmitJobTx(ctx, tx, run, claimRowWorkflowJob(job), actionsevents.ActionRunning); err != nil { |
| 224 | 238 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 225 | 239 | } |
| 226 | 240 | steps, err := q.ListRunnerStepsForJob(ctx, tx, job.ID) |
@@ -733,15 +747,45 @@ func (h *Handlers) applyJobStatus( |
| 733 | 747 | return actionsdb.WorkflowJob{}, false, "", err |
| 734 | 748 | } |
| 735 | 749 | runConclusion, complete := deriveWorkflowRunConclusion(jobs) |
| 750 | + runAfter, err := q.GetWorkflowRunByID(ctx, tx, updated.RunID) |
| 751 | + if err != nil { |
| 752 | + return actionsdb.WorkflowJob{}, false, "", err |
| 753 | + } |
| 754 | + runBefore := runAfter |
| 755 | + runStarted := false |
| 756 | + runTerminalChanged := false |
| 736 | 757 | if complete { |
| 737 | | - if _, err := q.CompleteWorkflowRun(ctx, tx, actionsdb.CompleteWorkflowRunParams{ |
| 758 | + runAfter, err = q.CompleteWorkflowRun(ctx, tx, actionsdb.CompleteWorkflowRunParams{ |
| 738 | 759 | ID: updated.RunID, |
| 739 | 760 | Conclusion: runConclusion, |
| 740 | | - }); err != nil { |
| 761 | + }) |
| 762 | + if err != nil { |
| 763 | + return actionsdb.WorkflowJob{}, false, "", err |
| 764 | + } |
| 765 | + runTerminalChanged = workflowRunLifecycleChanged(runBefore, runAfter) |
| 766 | + } else { |
| 767 | + startedRun, err := q.StartWorkflowRun(ctx, tx, updated.RunID) |
| 768 | + if err == nil { |
| 769 | + runAfter = startedRun |
| 770 | + runStarted = true |
| 771 | + } else if !errors.Is(err, pgx.ErrNoRows) { |
| 772 | + return actionsdb.WorkflowJob{}, false, "", err |
| 773 | + } |
| 774 | + } |
| 775 | + if jobLifecycleChanged(job, updated) { |
| 776 | + if err := actionsevents.EmitJobTx(ctx, tx, runAfter, updated, workflowJobEventAction(updated.Status)); err != nil { |
| 777 | + return actionsdb.WorkflowJob{}, false, "", err |
| 778 | + } |
| 779 | + } |
| 780 | + if runStarted { |
| 781 | + if err := actionsevents.EmitRunTx(ctx, tx, runAfter, actionsevents.ActionRunning); err != nil { |
| 782 | + return actionsdb.WorkflowJob{}, false, "", err |
| 783 | + } |
| 784 | + } |
| 785 | + if complete && runTerminalChanged { |
| 786 | + if err := actionsevents.EmitRunTx(ctx, tx, runAfter, workflowRunEventAction(runAfter.Status)); err != nil { |
| 741 | 787 | return actionsdb.WorkflowJob{}, false, "", err |
| 742 | 788 | } |
| 743 | | - } else if err := q.MarkWorkflowRunRunning(ctx, tx, updated.RunID); err != nil { |
| 744 | | - return actionsdb.WorkflowJob{}, false, "", err |
| 745 | 789 | } |
| 746 | 790 | if err := tx.Commit(ctx); err != nil { |
| 747 | 791 | return actionsdb.WorkflowJob{}, false, "", err |
@@ -755,6 +799,71 @@ func (h *Handlers) applyJobStatus( |
| 755 | 799 | return updated, complete, runConclusion, nil |
| 756 | 800 | } |
| 757 | 801 | |
| 802 | +func claimRowWorkflowJob(row actionsdb.ClaimQueuedWorkflowJobRow) actionsdb.WorkflowJob { |
| 803 | + return actionsdb.WorkflowJob{ |
| 804 | + ID: row.ID, |
| 805 | + RunID: row.RunID, |
| 806 | + JobIndex: row.JobIndex, |
| 807 | + JobKey: row.JobKey, |
| 808 | + JobName: row.JobName, |
| 809 | + RunsOn: row.RunsOn, |
| 810 | + RunnerID: row.RunnerID, |
| 811 | + NeedsJobs: row.NeedsJobs, |
| 812 | + IfExpr: row.IfExpr, |
| 813 | + TimeoutMinutes: row.TimeoutMinutes, |
| 814 | + Permissions: row.Permissions, |
| 815 | + JobEnv: row.JobEnv, |
| 816 | + Status: row.Status, |
| 817 | + Conclusion: row.Conclusion, |
| 818 | + CancelRequested: row.CancelRequested, |
| 819 | + StartedAt: row.StartedAt, |
| 820 | + CompletedAt: row.CompletedAt, |
| 821 | + Version: row.Version, |
| 822 | + CreatedAt: row.CreatedAt, |
| 823 | + UpdatedAt: row.UpdatedAt, |
| 824 | + } |
| 825 | +} |
| 826 | + |
| 827 | +func jobLifecycleChanged(before, after actionsdb.WorkflowJob) bool { |
| 828 | + if before.Status != after.Status { |
| 829 | + return true |
| 830 | + } |
| 831 | + if before.Conclusion.Valid != after.Conclusion.Valid { |
| 832 | + return true |
| 833 | + } |
| 834 | + return before.Conclusion.Valid && before.Conclusion.CheckConclusion != after.Conclusion.CheckConclusion |
| 835 | +} |
| 836 | + |
| 837 | +func workflowRunLifecycleChanged(before, after actionsdb.WorkflowRun) bool { |
| 838 | + if before.Status != after.Status { |
| 839 | + return true |
| 840 | + } |
| 841 | + if before.Conclusion.Valid != after.Conclusion.Valid { |
| 842 | + return true |
| 843 | + } |
| 844 | + return before.Conclusion.Valid && before.Conclusion.CheckConclusion != after.Conclusion.CheckConclusion |
| 845 | +} |
| 846 | + |
| 847 | +func workflowJobEventAction(status actionsdb.WorkflowJobStatus) string { |
| 848 | + switch status { |
| 849 | + case actionsdb.WorkflowJobStatusCancelled: |
| 850 | + return actionsevents.ActionCancelled |
| 851 | + case actionsdb.WorkflowJobStatusCompleted, actionsdb.WorkflowJobStatusSkipped: |
| 852 | + return actionsevents.ActionCompleted |
| 853 | + case actionsdb.WorkflowJobStatusRunning: |
| 854 | + return actionsevents.ActionRunning |
| 855 | + default: |
| 856 | + return actionsevents.ActionQueued |
| 857 | + } |
| 858 | +} |
| 859 | + |
| 860 | +func workflowRunEventAction(status actionsdb.WorkflowRunStatus) string { |
| 861 | + if status == actionsdb.WorkflowRunStatusCancelled { |
| 862 | + return actionsevents.ActionCancelled |
| 863 | + } |
| 864 | + return actionsevents.ActionCompleted |
| 865 | +} |
| 866 | + |
| 758 | 867 | func deriveWorkflowRunConclusion(jobs []actionsdb.ListJobsForRunRow) (actionsdb.CheckConclusion, bool) { |
| 759 | 868 | if len(jobs) == 0 { |
| 760 | 869 | return actionsdb.CheckConclusionFailure, true |