tenseleyflow/shithub / 11badfd

Browse files

actions: publish observability metrics

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
11badfdfc74c2c0ddbd77ef04a454b4aa831b1be
Parents
17b19ff
Tree
e2b9705

9 changed files

StatusFile+-
M docs/internal/actions-runner-api.md 11 0
M internal/actions/lifecycle/cancel.go 15 0
A internal/actions/telemetry/metrics.go 59 0
A internal/actions/telemetry/metrics_test.go 89 0
A internal/infra/metrics/actionsobserver.go 136 0
A internal/infra/metrics/actionsobserver_test.go 167 0
M internal/infra/metrics/metrics.go 89 0
M internal/web/handlers/api/runners.go 28 2
M internal/web/server.go 1 0
docs/internal/actions-runner-api.mdmodified
@@ -221,8 +221,19 @@ runner posts terminal job status `cancelled`.
221
 - `shithub_actions_runner_registrations_total`
221
 - `shithub_actions_runner_registrations_total`
222
 - `shithub_actions_runner_heartbeats_total{result="claimed|no_job"}`
222
 - `shithub_actions_runner_heartbeats_total{result="claimed|no_job"}`
223
 - `shithub_actions_runner_jwt_total{result="issued|rejected|replay"}`
223
 - `shithub_actions_runner_jwt_total{result="issued|rejected|replay"}`
224
+- `shithub_actions_queue_depth{resource="runs|jobs"}`
225
+- `shithub_actions_active{resource="runs|jobs"}`
226
+- `shithub_actions_runner_heartbeat_age_seconds{runner,status}`
227
+- `shithub_actions_runner_capacity{runner,status}`
228
+- `shithub_actions_runs_completed_total{event,conclusion}`
229
+- `shithub_actions_run_duration_seconds{event,conclusion}`
230
+- `shithub_actions_steps_completed_total{step_type,conclusion}`
224
 - `shithub_actions_jobs_cancelled_total{reason="user|concurrency|timeout"}`
231
 - `shithub_actions_jobs_cancelled_total{reason="user|concurrency|timeout"}`
225
 - `shithub_actions_concurrency_queued_total`
232
 - `shithub_actions_concurrency_queued_total`
226
 - `shithub_actions_log_scrub_replacements_total{location="server"}`
233
 - `shithub_actions_log_scrub_replacements_total{location="server"}`
234
+- `shithub_actions_log_chunks_total{location="server"}`
235
+- `shithub_actions_log_chunk_bytes_total{location="server"}`
227
 - `shithub_actions_runs_pruned_total{kind="chunks|blobs|runs|jwt_used"}`
236
 - `shithub_actions_runs_pruned_total{kind="chunks|blobs|runs|jwt_used"}`
228
 - `shithub_actions_step_timeouts_total`
237
 - `shithub_actions_step_timeouts_total`
238
+- `shithub_actions_storage_objects{kind="artifacts|step_logs|hot_log_chunks"}`
239
+- `shithub_actions_storage_bytes{kind="artifacts|step_logs|hot_log_chunks"}`
internal/actions/lifecycle/cancel.gomodified
@@ -15,6 +15,7 @@ import (
15
 	actionsevents "github.com/tenseleyFlow/shithub/internal/actions/events"
15
 	actionsevents "github.com/tenseleyFlow/shithub/internal/actions/events"
16
 	"github.com/tenseleyFlow/shithub/internal/actions/runstate"
16
 	"github.com/tenseleyFlow/shithub/internal/actions/runstate"
17
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
17
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
18
+	actionstelemetry "github.com/tenseleyFlow/shithub/internal/actions/telemetry"
18
 	"github.com/tenseleyFlow/shithub/internal/infra/metrics"
19
 	"github.com/tenseleyFlow/shithub/internal/infra/metrics"
19
 )
20
 )
20
 
21
 
@@ -68,6 +69,7 @@ func CancelRun(ctx context.Context, deps Deps, runID int64, reason string) (Canc
68
 	var (
69
 	var (
69
 		runCompleted  bool
70
 		runCompleted  bool
70
 		runConclusion actionsdb.CheckConclusion
71
 		runConclusion actionsdb.CheckConclusion
72
+		terminalRun   actionsdb.WorkflowRun
71
 	)
73
 	)
72
 	if len(changed) > 0 {
74
 	if len(changed) > 0 {
73
 		runCompleted, runConclusion, err = runstate.RollupAfterCancel(ctx, q, tx, runID)
75
 		runCompleted, runConclusion, err = runstate.RollupAfterCancel(ctx, q, tx, runID)
@@ -78,6 +80,9 @@ func CancelRun(ctx context.Context, deps Deps, runID int64, reason string) (Canc
78
 		if err != nil {
80
 		if err != nil {
79
 			return CancelResult{}, err
81
 			return CancelResult{}, err
80
 		}
82
 		}
83
+		if runCompleted {
84
+			terminalRun = run
85
+		}
81
 		if err := emitCancelEvents(ctx, tx, run, changed, runCompleted); err != nil {
86
 		if err := emitCancelEvents(ctx, tx, run, changed, runCompleted); err != nil {
82
 			return CancelResult{}, err
87
 			return CancelResult{}, err
83
 		}
88
 		}
@@ -88,6 +93,9 @@ func CancelRun(ctx context.Context, deps Deps, runID int64, reason string) (Canc
88
 	committed = true
93
 	committed = true
89
 
94
 
90
 	recordCancelledJobs(changed, reason)
95
 	recordCancelledJobs(changed, reason)
96
+	if runCompleted {
97
+		actionstelemetry.RecordRunTerminal(terminalRun)
98
+	}
91
 	syncChangedJobChecks(ctx, deps, changed)
99
 	syncChangedJobChecks(ctx, deps, changed)
92
 	return CancelResult{
100
 	return CancelResult{
93
 		RunID:         runID,
101
 		RunID:         runID,
@@ -140,6 +148,7 @@ func CancelJob(ctx context.Context, deps Deps, jobID int64, reason string) (Canc
140
 	var (
148
 	var (
141
 		runCompleted  bool
149
 		runCompleted  bool
142
 		runConclusion actionsdb.CheckConclusion
150
 		runConclusion actionsdb.CheckConclusion
151
+		terminalRun   actionsdb.WorkflowRun
143
 	)
152
 	)
144
 	if len(changed) > 0 {
153
 	if len(changed) > 0 {
145
 		runCompleted, runConclusion, err = runstate.RollupAfterCancel(ctx, q, tx, runID)
154
 		runCompleted, runConclusion, err = runstate.RollupAfterCancel(ctx, q, tx, runID)
@@ -150,6 +159,9 @@ func CancelJob(ctx context.Context, deps Deps, jobID int64, reason string) (Canc
150
 		if err != nil {
159
 		if err != nil {
151
 			return CancelResult{}, err
160
 			return CancelResult{}, err
152
 		}
161
 		}
162
+		if runCompleted {
163
+			terminalRun = run
164
+		}
153
 		if err := emitCancelEvents(ctx, tx, run, changed, runCompleted); err != nil {
165
 		if err := emitCancelEvents(ctx, tx, run, changed, runCompleted); err != nil {
154
 			return CancelResult{}, err
166
 			return CancelResult{}, err
155
 		}
167
 		}
@@ -160,6 +172,9 @@ func CancelJob(ctx context.Context, deps Deps, jobID int64, reason string) (Canc
160
 	committed = true
172
 	committed = true
161
 
173
 
162
 	recordCancelledJobs(changed, reason)
174
 	recordCancelledJobs(changed, reason)
175
+	if runCompleted {
176
+		actionstelemetry.RecordRunTerminal(terminalRun)
177
+	}
163
 	syncChangedJobChecks(ctx, deps, changed)
178
 	syncChangedJobChecks(ctx, deps, changed)
164
 	return CancelResult{
179
 	return CancelResult{
165
 		RunID:         runID,
180
 		RunID:         runID,
internal/actions/telemetry/metrics.goadded
@@ -0,0 +1,59 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package telemetry records bounded-cardinality Actions metrics.
4
+package telemetry
5
+
6
+import (
7
+	"strings"
8
+
9
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
10
+	"github.com/tenseleyFlow/shithub/internal/infra/metrics"
11
+)
12
+
13
+// RecordRunTerminal records terminal workflow-run counters and duration. It is
14
+// idempotency-sensitive: callers must invoke it only when a run first becomes
15
+// terminal.
16
+func RecordRunTerminal(run actionsdb.WorkflowRun) {
17
+	if !run.CompletedAt.Valid || !run.Conclusion.Valid {
18
+		return
19
+	}
20
+	start := run.CreatedAt.Time
21
+	if run.StartedAt.Valid {
22
+		start = run.StartedAt.Time
23
+	}
24
+	duration := run.CompletedAt.Time.Sub(start).Seconds()
25
+	if duration < 0 {
26
+		duration = 0
27
+	}
28
+	event := string(run.Event)
29
+	conclusion := string(run.Conclusion.CheckConclusion)
30
+	metrics.ActionsRunsCompletedTotal.WithLabelValues(event, conclusion).Inc()
31
+	metrics.ActionsRunDurationSeconds.WithLabelValues(event, conclusion).Observe(duration)
32
+}
33
+
34
+// RecordStepTerminal records terminal step outcomes using a bounded step_type
35
+// label. Do not label by user-authored step name; workflow YAML would then be
36
+// able to create unbounded Prometheus series.
37
+func RecordStepTerminal(step actionsdb.WorkflowStep) {
38
+	if !step.Conclusion.Valid {
39
+		return
40
+	}
41
+	metrics.ActionsStepsCompletedTotal.WithLabelValues(stepType(step), string(step.Conclusion.CheckConclusion)).Inc()
42
+}
43
+
44
+func stepType(step actionsdb.WorkflowStep) string {
45
+	uses := strings.TrimSpace(step.UsesAlias)
46
+	if uses != "" {
47
+		switch uses {
48
+		case "actions/checkout@v4":
49
+			return "checkout"
50
+		case "shithub/upload-artifact@v1":
51
+			return "upload-artifact"
52
+		case "shithub/download-artifact@v1":
53
+			return "download-artifact"
54
+		default:
55
+			return "uses"
56
+		}
57
+	}
58
+	return "run"
59
+}
internal/actions/telemetry/metrics_test.goadded
@@ -0,0 +1,89 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package telemetry
4
+
5
+import (
6
+	"testing"
7
+	"time"
8
+
9
+	"github.com/jackc/pgx/v5/pgtype"
10
+	"github.com/prometheus/client_golang/prometheus"
11
+	dto "github.com/prometheus/client_model/go"
12
+
13
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
14
+	"github.com/tenseleyFlow/shithub/internal/infra/metrics"
15
+)
16
+
17
+func TestRecordRunTerminalBoundsLabelsAndDuration(t *testing.T) {
18
+	metrics.ActionsRunsCompletedTotal.Reset()
19
+	metrics.ActionsRunDurationSeconds.Reset()
20
+
21
+	started := time.Date(2026, 5, 12, 10, 0, 0, 0, time.UTC)
22
+	completed := started.Add(75 * time.Second)
23
+	RecordRunTerminal(actionsdb.WorkflowRun{
24
+		Event:       actionsdb.WorkflowRunEvent("pull_request"),
25
+		Status:      actionsdb.WorkflowRunStatusCompleted,
26
+		Conclusion:  actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusionSuccess, Valid: true},
27
+		StartedAt:   pgtype.Timestamptz{Time: started, Valid: true},
28
+		CompletedAt: pgtype.Timestamptz{Time: completed, Valid: true},
29
+	})
30
+
31
+	var completedMetric dto.Metric
32
+	if err := metrics.ActionsRunsCompletedTotal.WithLabelValues("pull_request", "success").Write(&completedMetric); err != nil {
33
+		t.Fatalf("read completed counter: %v", err)
34
+	}
35
+	if got := completedMetric.GetCounter().GetValue(); got != 1 {
36
+		t.Fatalf("completed counter = %v, want 1", got)
37
+	}
38
+
39
+	histogram, ok := metrics.ActionsRunDurationSeconds.WithLabelValues("pull_request", "success").(prometheus.Histogram)
40
+	if !ok {
41
+		t.Fatalf("duration metric is not a prometheus.Histogram")
42
+	}
43
+	var durationMetric dto.Metric
44
+	if err := histogram.Write(&durationMetric); err != nil {
45
+		t.Fatalf("read duration histogram: %v", err)
46
+	}
47
+	if got := durationMetric.GetHistogram().GetSampleCount(); got != 1 {
48
+		t.Fatalf("duration sample count = %v, want 1", got)
49
+	}
50
+	if got := durationMetric.GetHistogram().GetSampleSum(); got != 75 {
51
+		t.Fatalf("duration sample sum = %v, want 75", got)
52
+	}
53
+}
54
+
55
+func TestRecordStepTerminalUsesBoundedStepTypes(t *testing.T) {
56
+	metrics.ActionsStepsCompletedTotal.Reset()
57
+
58
+	RecordStepTerminal(actionsdb.WorkflowStep{
59
+		UsesAlias:  "actions/checkout@v4",
60
+		Status:     actionsdb.WorkflowStepStatusCompleted,
61
+		Conclusion: actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusionSuccess, Valid: true},
62
+	})
63
+	RecordStepTerminal(actionsdb.WorkflowStep{
64
+		StepName:   "user controlled label with high cardinality potential",
65
+		UsesAlias:  "owner/custom-action@v1",
66
+		Status:     actionsdb.WorkflowStepStatusCompleted,
67
+		Conclusion: actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusionFailure, Valid: true},
68
+	})
69
+	RecordStepTerminal(actionsdb.WorkflowStep{
70
+		RunCommand: "go test ./...",
71
+		Status:     actionsdb.WorkflowStepStatusCompleted,
72
+		Conclusion: actionsdb.NullCheckConclusion{CheckConclusion: actionsdb.CheckConclusionSuccess, Valid: true},
73
+	})
74
+
75
+	assertStepCounter(t, "checkout", "success", 1)
76
+	assertStepCounter(t, "uses", "failure", 1)
77
+	assertStepCounter(t, "run", "success", 1)
78
+}
79
+
80
+func assertStepCounter(t *testing.T, stepType, conclusion string, want float64) {
81
+	t.Helper()
82
+	var metric dto.Metric
83
+	if err := metrics.ActionsStepsCompletedTotal.WithLabelValues(stepType, conclusion).Write(&metric); err != nil {
84
+		t.Fatalf("read step counter %s/%s: %v", stepType, conclusion, err)
85
+	}
86
+	if got := metric.GetCounter().GetValue(); got != want {
87
+		t.Fatalf("step counter %s/%s = %v, want %v", stepType, conclusion, got, want)
88
+	}
89
+}
internal/infra/metrics/actionsobserver.goadded
@@ -0,0 +1,136 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package metrics
4
+
5
+import (
6
+	"context"
7
+	"time"
8
+
9
+	"github.com/jackc/pgx/v5/pgxpool"
10
+)
11
+
12
+// ObserveActions starts a goroutine that periodically refreshes DB-backed
13
+// Actions gauges. The goroutine exits when ctx is canceled.
14
+func ObserveActions(ctx context.Context, pool *pgxpool.Pool, interval time.Duration) {
15
+	if pool == nil {
16
+		return
17
+	}
18
+	if interval <= 0 {
19
+		interval = 15 * time.Second
20
+	}
21
+	go func() {
22
+		refreshActions(ctx, pool)
23
+		t := time.NewTicker(interval)
24
+		defer t.Stop()
25
+		for {
26
+			select {
27
+			case <-ctx.Done():
28
+				return
29
+			case <-t.C:
30
+				refreshActions(ctx, pool)
31
+			}
32
+		}
33
+	}()
34
+}
35
+
36
+func refreshActions(ctx context.Context, pool *pgxpool.Pool) {
37
+	if pool == nil {
38
+		return
39
+	}
40
+	refreshActionQueueGauges(ctx, pool)
41
+	refreshActionRunnerGauges(ctx, pool)
42
+	refreshActionStorageGauges(ctx, pool)
43
+}
44
+
45
+func refreshActionQueueGauges(ctx context.Context, pool *pgxpool.Pool) {
46
+	ActionsQueueDepth.WithLabelValues("runs").Set(0)
47
+	ActionsQueueDepth.WithLabelValues("jobs").Set(0)
48
+	ActionsActive.WithLabelValues("runs").Set(0)
49
+	ActionsActive.WithLabelValues("jobs").Set(0)
50
+
51
+	rows, err := pool.Query(ctx, `
52
+SELECT 'runs'::text AS resource, status::text, count(*)::double precision
53
+FROM workflow_runs
54
+WHERE status IN ('queued', 'running')
55
+GROUP BY status
56
+UNION ALL
57
+SELECT 'jobs'::text AS resource, status::text, count(*)::double precision
58
+FROM workflow_jobs
59
+WHERE status IN ('queued', 'running')
60
+GROUP BY status`)
61
+	if err != nil {
62
+		return
63
+	}
64
+	defer rows.Close()
65
+	for rows.Next() {
66
+		var resource, status string
67
+		var count float64
68
+		if err := rows.Scan(&resource, &status, &count); err != nil {
69
+			return
70
+		}
71
+		switch status {
72
+		case "queued":
73
+			ActionsQueueDepth.WithLabelValues(resource).Set(count)
74
+		case "running":
75
+			ActionsActive.WithLabelValues(resource).Set(count)
76
+		}
77
+	}
78
+}
79
+
80
+func refreshActionRunnerGauges(ctx context.Context, pool *pgxpool.Pool) {
81
+	ActionsRunnerHeartbeatAgeSeconds.Reset()
82
+	ActionsRunnerCapacity.Reset()
83
+	rows, err := pool.Query(ctx, `
84
+SELECT name::text,
85
+       status::text,
86
+       capacity::double precision,
87
+       EXTRACT(EPOCH FROM (now() - last_heartbeat_at))::double precision AS heartbeat_age_seconds
88
+FROM workflow_runners
89
+WHERE last_heartbeat_at IS NOT NULL`)
90
+	if err != nil {
91
+		return
92
+	}
93
+	defer rows.Close()
94
+	for rows.Next() {
95
+		var name, status string
96
+		var capacity, age float64
97
+		if err := rows.Scan(&name, &status, &capacity, &age); err != nil {
98
+			return
99
+		}
100
+		ActionsRunnerCapacity.WithLabelValues(name, status).Set(capacity)
101
+		ActionsRunnerHeartbeatAgeSeconds.WithLabelValues(name, status).Set(age)
102
+	}
103
+}
104
+
105
+func refreshActionStorageGauges(ctx context.Context, pool *pgxpool.Pool) {
106
+	ActionsStorageObjects.WithLabelValues("artifacts").Set(0)
107
+	ActionsStorageObjects.WithLabelValues("step_logs").Set(0)
108
+	ActionsStorageObjects.WithLabelValues("hot_log_chunks").Set(0)
109
+	ActionsStorageBytes.WithLabelValues("artifacts").Set(0)
110
+	ActionsStorageBytes.WithLabelValues("step_logs").Set(0)
111
+	ActionsStorageBytes.WithLabelValues("hot_log_chunks").Set(0)
112
+
113
+	rows, err := pool.Query(ctx, `
114
+SELECT 'artifacts'::text AS kind, count(*)::double precision, COALESCE(sum(byte_count), 0)::double precision
115
+FROM workflow_artifacts
116
+UNION ALL
117
+SELECT 'step_logs'::text AS kind, count(*)::double precision, COALESCE(sum(log_byte_count), 0)::double precision
118
+FROM workflow_steps
119
+WHERE log_object_key IS NOT NULL
120
+UNION ALL
121
+SELECT 'hot_log_chunks'::text AS kind, count(*)::double precision, COALESCE(sum(octet_length(chunk)), 0)::double precision
122
+FROM workflow_step_log_chunks`)
123
+	if err != nil {
124
+		return
125
+	}
126
+	defer rows.Close()
127
+	for rows.Next() {
128
+		var kind string
129
+		var objects, bytes float64
130
+		if err := rows.Scan(&kind, &objects, &bytes); err != nil {
131
+			return
132
+		}
133
+		ActionsStorageObjects.WithLabelValues(kind).Set(objects)
134
+		ActionsStorageBytes.WithLabelValues(kind).Set(bytes)
135
+	}
136
+}
internal/infra/metrics/actionsobserver_test.goadded
@@ -0,0 +1,167 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package metrics
4
+
5
+import (
6
+	"context"
7
+	"testing"
8
+	"time"
9
+
10
+	"github.com/jackc/pgx/v5/pgtype"
11
+	"github.com/prometheus/client_golang/prometheus"
12
+	dto "github.com/prometheus/client_model/go"
13
+
14
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
15
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
16
+	"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
17
+	usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
18
+)
19
+
20
+func TestRefreshActionsPublishesQueueRunnerAndStorageGauges(t *testing.T) {
21
+	ctx := context.Background()
22
+	pool := dbtest.NewTestDB(t)
23
+	q := actionsdb.New()
24
+
25
+	user, err := usersdb.New().CreateUser(ctx, pool, usersdb.CreateUserParams{
26
+		Username:     "metrics-observer",
27
+		DisplayName:  "Metrics Observer",
28
+		PasswordHash: "hash",
29
+	})
30
+	if err != nil {
31
+		t.Fatalf("CreateUser: %v", err)
32
+	}
33
+	repo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
34
+		OwnerUserID:   pgtype.Int8{Int64: user.ID, Valid: true},
35
+		Name:          "actions-metrics",
36
+		DefaultBranch: "trunk",
37
+		Visibility:    reposdb.RepoVisibilityPublic,
38
+	})
39
+	if err != nil {
40
+		t.Fatalf("CreateRepo: %v", err)
41
+	}
42
+
43
+	run, err := q.InsertWorkflowRun(ctx, pool, actionsdb.InsertWorkflowRunParams{
44
+		RepoID:       repo.ID,
45
+		RunIndex:     1,
46
+		WorkflowFile: ".shithub/workflows/ci.yml",
47
+		WorkflowName: "CI",
48
+		HeadSha:      "0123456789abcdef0123456789abcdef01234567",
49
+		HeadRef:      "trunk",
50
+		Event:        actionsdb.WorkflowRunEventPush,
51
+		EventPayload: []byte(`{}`),
52
+		ActorUserID:  pgtype.Int8{Int64: user.ID, Valid: true},
53
+	})
54
+	if err != nil {
55
+		t.Fatalf("InsertWorkflowRun: %v", err)
56
+	}
57
+	job, err := q.InsertWorkflowJob(ctx, pool, actionsdb.InsertWorkflowJobParams{
58
+		RunID:          run.ID,
59
+		JobIndex:       0,
60
+		JobKey:         "build",
61
+		JobName:        "Build",
62
+		RunsOn:         `["ubuntu-latest"]`,
63
+		TimeoutMinutes: 30,
64
+		Permissions:    []byte(`{}`),
65
+		JobEnv:         []byte(`{}`),
66
+	})
67
+	if err != nil {
68
+		t.Fatalf("InsertWorkflowJob: %v", err)
69
+	}
70
+	step, err := q.InsertWorkflowStep(ctx, pool, actionsdb.InsertWorkflowStepParams{
71
+		JobID:            job.ID,
72
+		StepIndex:        0,
73
+		StepID:           "test",
74
+		StepName:         "Test",
75
+		RunCommand:       "go test ./...",
76
+		WorkingDirectory: ".",
77
+		StepEnv:          []byte(`{}`),
78
+		StepWith:         []byte(`{}`),
79
+	})
80
+	if err != nil {
81
+		t.Fatalf("InsertWorkflowStep: %v", err)
82
+	}
83
+	if _, err := pool.Exec(ctx, `UPDATE workflow_steps SET log_object_key = $1, log_byte_count = $2 WHERE id = $3`, "actions/logs/test.log", int64(123), step.ID); err != nil {
84
+		t.Fatalf("mark step log object: %v", err)
85
+	}
86
+	if _, err := q.AppendStepLogChunk(ctx, pool, actionsdb.AppendStepLogChunkParams{
87
+		StepID: step.ID,
88
+		Seq:    0,
89
+		Chunk:  []byte("hello"),
90
+	}); err != nil {
91
+		t.Fatalf("AppendStepLogChunk: %v", err)
92
+	}
93
+	if _, err := q.InsertArtifact(ctx, pool, actionsdb.InsertArtifactParams{
94
+		RunID:     run.ID,
95
+		Name:      "bundle",
96
+		ObjectKey: "actions/artifacts/bundle.zip",
97
+		ByteCount: 2048,
98
+		ExpiresAt: pgtype.Timestamptz{
99
+			Time:  time.Now().UTC().Add(24 * time.Hour),
100
+			Valid: true,
101
+		},
102
+	}); err != nil {
103
+		t.Fatalf("InsertArtifact: %v", err)
104
+	}
105
+	runner, err := q.InsertRunner(ctx, pool, actionsdb.InsertRunnerParams{
106
+		Name:               "runner-a",
107
+		Labels:             []string{"self-hosted", "linux", "ubuntu-latest"},
108
+		Capacity:           3,
109
+		RegisteredByUserID: pgtype.Int8{Int64: user.ID, Valid: true},
110
+	})
111
+	if err != nil {
112
+		t.Fatalf("InsertRunner: %v", err)
113
+	}
114
+	if _, err := pool.Exec(ctx, `UPDATE workflow_runners SET status = 'busy', last_heartbeat_at = now() - interval '75 seconds' WHERE id = $1`, runner.ID); err != nil {
115
+		t.Fatalf("touch runner heartbeat: %v", err)
116
+	}
117
+
118
+	resetActionsObserverGauges()
119
+	refreshActions(ctx, pool)
120
+
121
+	assertGauge(t, ActionsQueueDepth, []string{"runs"}, 1)
122
+	assertGauge(t, ActionsQueueDepth, []string{"jobs"}, 1)
123
+	assertGauge(t, ActionsActive, []string{"runs"}, 0)
124
+	assertGauge(t, ActionsActive, []string{"jobs"}, 0)
125
+	assertGauge(t, ActionsRunnerCapacity, []string{"runner-a", "busy"}, 3)
126
+	if got := gaugeValue(t, ActionsRunnerHeartbeatAgeSeconds, []string{"runner-a", "busy"}); got < 60 {
127
+		t.Fatalf("runner heartbeat age = %v, want >= 60", got)
128
+	}
129
+	assertGauge(t, ActionsStorageObjects, []string{"artifacts"}, 1)
130
+	assertGauge(t, ActionsStorageBytes, []string{"artifacts"}, 2048)
131
+	assertGauge(t, ActionsStorageObjects, []string{"step_logs"}, 1)
132
+	assertGauge(t, ActionsStorageBytes, []string{"step_logs"}, 123)
133
+	assertGauge(t, ActionsStorageObjects, []string{"hot_log_chunks"}, 1)
134
+	assertGauge(t, ActionsStorageBytes, []string{"hot_log_chunks"}, 5)
135
+}
136
+
137
+type labeledGauge interface {
138
+	WithLabelValues(lvs ...string) prometheus.Gauge
139
+}
140
+
141
+func resetActionsObserverGauges() {
142
+	ActionsQueueDepth.Reset()
143
+	ActionsActive.Reset()
144
+	ActionsRunnerHeartbeatAgeSeconds.Reset()
145
+	ActionsRunnerCapacity.Reset()
146
+	ActionsStorageObjects.Reset()
147
+	ActionsStorageBytes.Reset()
148
+}
149
+
150
+func assertGauge(t *testing.T, vec labeledGauge, labels []string, want float64) {
151
+	t.Helper()
152
+	if got := gaugeValue(t, vec, labels); got != want {
153
+		t.Fatalf("gauge %v = %v, want %v", labels, got, want)
154
+	}
155
+}
156
+
157
+func gaugeValue(t *testing.T, vec labeledGauge, labels []string) float64 {
158
+	t.Helper()
159
+	var metric dto.Metric
160
+	if err := vec.WithLabelValues(labels...).Write(&metric); err != nil {
161
+		t.Fatalf("read gauge %v: %v", labels, err)
162
+	}
163
+	if metric.Gauge == nil {
164
+		t.Fatalf("gauge %v missing", labels)
165
+	}
166
+	return metric.Gauge.GetValue()
167
+}
internal/infra/metrics/metrics.gomodified
@@ -156,6 +156,28 @@ var (
156
 		},
156
 		},
157
 		[]string{"reason"},
157
 		[]string{"reason"},
158
 	)
158
 	)
159
+	ActionsRunsCompletedTotal = prometheus.NewCounterVec(
160
+		prometheus.CounterOpts{
161
+			Name: "shithub_actions_runs_completed_total",
162
+			Help: "Total terminal Actions workflow runs by event kind and conclusion.",
163
+		},
164
+		[]string{"event", "conclusion"},
165
+	)
166
+	ActionsRunDurationSeconds = prometheus.NewHistogramVec(
167
+		prometheus.HistogramOpts{
168
+			Name:    "shithub_actions_run_duration_seconds",
169
+			Help:    "Actions workflow run duration from started_at or created_at to completed_at, by event kind and conclusion.",
170
+			Buckets: prometheus.ExponentialBuckets(1, 2.5, 12),
171
+		},
172
+		[]string{"event", "conclusion"},
173
+	)
174
+	ActionsStepsCompletedTotal = prometheus.NewCounterVec(
175
+		prometheus.CounterOpts{
176
+			Name: "shithub_actions_steps_completed_total",
177
+			Help: "Total terminal Actions steps by bounded step type and conclusion.",
178
+		},
179
+		[]string{"step_type", "conclusion"},
180
+	)
159
 	ActionsConcurrencyQueuedTotal = prometheus.NewCounter(
181
 	ActionsConcurrencyQueuedTotal = prometheus.NewCounter(
160
 		prometheus.CounterOpts{
182
 		prometheus.CounterOpts{
161
 			Name: "shithub_actions_concurrency_queued_total",
183
 			Name: "shithub_actions_concurrency_queued_total",
@@ -169,6 +191,20 @@ var (
169
 		},
191
 		},
170
 		[]string{"location"},
192
 		[]string{"location"},
171
 	)
193
 	)
194
+	ActionsLogChunksTotal = prometheus.NewCounterVec(
195
+		prometheus.CounterOpts{
196
+			Name: "shithub_actions_log_chunks_total",
197
+			Help: "Total Actions log chunks accepted by location.",
198
+		},
199
+		[]string{"location"},
200
+	)
201
+	ActionsLogChunkBytesTotal = prometheus.NewCounterVec(
202
+		prometheus.CounterOpts{
203
+			Name: "shithub_actions_log_chunk_bytes_total",
204
+			Help: "Total Actions log chunk bytes accepted by location before durable storage.",
205
+		},
206
+		[]string{"location"},
207
+	)
172
 	ActionsRunsPrunedTotal = prometheus.NewCounterVec(
208
 	ActionsRunsPrunedTotal = prometheus.NewCounterVec(
173
 		prometheus.CounterOpts{
209
 		prometheus.CounterOpts{
174
 			Name: "shithub_actions_runs_pruned_total",
210
 			Name: "shithub_actions_runs_pruned_total",
@@ -182,6 +218,48 @@ var (
182
 			Help: "Total Actions steps reported as timed out by runners.",
218
 			Help: "Total Actions steps reported as timed out by runners.",
183
 		},
219
 		},
184
 	)
220
 	)
221
+	ActionsQueueDepth = prometheus.NewGaugeVec(
222
+		prometheus.GaugeOpts{
223
+			Name: "shithub_actions_queue_depth",
224
+			Help: "Current queued Actions workflow items by resource (runs, jobs).",
225
+		},
226
+		[]string{"resource"},
227
+	)
228
+	ActionsActive = prometheus.NewGaugeVec(
229
+		prometheus.GaugeOpts{
230
+			Name: "shithub_actions_active",
231
+			Help: "Current running Actions workflow items by resource (runs, jobs).",
232
+		},
233
+		[]string{"resource"},
234
+	)
235
+	ActionsRunnerHeartbeatAgeSeconds = prometheus.NewGaugeVec(
236
+		prometheus.GaugeOpts{
237
+			Name: "shithub_actions_runner_heartbeat_age_seconds",
238
+			Help: "Seconds since each registered Actions runner last heartbeated. Offline runners that never heartbeated are omitted.",
239
+		},
240
+		[]string{"runner", "status"},
241
+	)
242
+	ActionsRunnerCapacity = prometheus.NewGaugeVec(
243
+		prometheus.GaugeOpts{
244
+			Name: "shithub_actions_runner_capacity",
245
+			Help: "Configured Actions runner capacity by runner and status.",
246
+		},
247
+		[]string{"runner", "status"},
248
+	)
249
+	ActionsStorageObjects = prometheus.NewGaugeVec(
250
+		prometheus.GaugeOpts{
251
+			Name: "shithub_actions_storage_objects",
252
+			Help: "Current durable Actions storage object count by kind.",
253
+		},
254
+		[]string{"kind"},
255
+	)
256
+	ActionsStorageBytes = prometheus.NewGaugeVec(
257
+		prometheus.GaugeOpts{
258
+			Name: "shithub_actions_storage_bytes",
259
+			Help: "Current durable Actions storage byte count by kind.",
260
+		},
261
+		[]string{"kind"},
262
+	)
185
 )
263
 )
186
 
264
 
187
 func init() {
265
 func init() {
@@ -203,10 +281,21 @@ func init() {
203
 		ActionsRunnerHeartbeatsTotal,
281
 		ActionsRunnerHeartbeatsTotal,
204
 		ActionsRunnerJWTTotal,
282
 		ActionsRunnerJWTTotal,
205
 		ActionsJobsCancelledTotal,
283
 		ActionsJobsCancelledTotal,
284
+		ActionsRunsCompletedTotal,
285
+		ActionsRunDurationSeconds,
286
+		ActionsStepsCompletedTotal,
206
 		ActionsConcurrencyQueuedTotal,
287
 		ActionsConcurrencyQueuedTotal,
207
 		ActionsLogScrubReplacementsTotal,
288
 		ActionsLogScrubReplacementsTotal,
289
+		ActionsLogChunksTotal,
290
+		ActionsLogChunkBytesTotal,
208
 		ActionsRunsPrunedTotal,
291
 		ActionsRunsPrunedTotal,
209
 		ActionsStepTimeoutsTotal,
292
 		ActionsStepTimeoutsTotal,
293
+		ActionsQueueDepth,
294
+		ActionsActive,
295
+		ActionsRunnerHeartbeatAgeSeconds,
296
+		ActionsRunnerCapacity,
297
+		ActionsStorageObjects,
298
+		ActionsStorageBytes,
210
 	)
299
 	)
211
 }
300
 }
212
 
301
 
internal/web/handlers/api/runners.gomodified
@@ -28,6 +28,7 @@ import (
28
 	"github.com/tenseleyFlow/shithub/internal/actions/runnerlabels"
28
 	"github.com/tenseleyFlow/shithub/internal/actions/runnerlabels"
29
 	"github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
29
 	"github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
30
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
30
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
31
+	actionstelemetry "github.com/tenseleyFlow/shithub/internal/actions/telemetry"
31
 	"github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
32
 	"github.com/tenseleyFlow/shithub/internal/auth/runnerjwt"
32
 	"github.com/tenseleyFlow/shithub/internal/infra/metrics"
33
 	"github.com/tenseleyFlow/shithub/internal/infra/metrics"
33
 	"github.com/tenseleyFlow/shithub/internal/ratelimit"
34
 	"github.com/tenseleyFlow/shithub/internal/ratelimit"
@@ -494,6 +495,9 @@ func (h *Handlers) runnerStepStatus(w http.ResponseWriter, r *http.Request) {
494
 		return
495
 		return
495
 	}
496
 	}
496
 	recordStepTimeout(step, updated)
497
 	recordStepTimeout(step, updated)
498
+	if terminal && stepLifecycleChanged(step, updated) {
499
+		actionstelemetry.RecordStepTerminal(updated)
500
+	}
497
 	h.writeNextTokenResponse(w, r, http.StatusOK, auth, map[string]any{
501
 	h.writeNextTokenResponse(w, r, http.StatusOK, auth, map[string]any{
498
 		"status":     string(updated.Status),
502
 		"status":     string(updated.Status),
499
 		"conclusion": nullableConclusion(updated.Conclusion),
503
 		"conclusion": nullableConclusion(updated.Conclusion),
@@ -816,6 +820,9 @@ func (h *Handlers) applyJobStatus(
816
 			h.d.Logger.WarnContext(ctx, "runner cancelled-step finalizer notify failed", "job_id", updated.ID, "error", err)
820
 			h.d.Logger.WarnContext(ctx, "runner cancelled-step finalizer notify failed", "job_id", updated.ID, "error", err)
817
 		}
821
 		}
818
 	}
822
 	}
823
+	if runTerminalChanged {
824
+		actionstelemetry.RecordRunTerminal(runAfter)
825
+	}
819
 	return updated, complete, runConclusion, nil
826
 	return updated, complete, runConclusion, nil
820
 }
827
 }
821
 
828
 
@@ -854,6 +861,16 @@ func jobLifecycleChanged(before, after actionsdb.WorkflowJob) bool {
854
 	return before.Conclusion.Valid && before.Conclusion.CheckConclusion != after.Conclusion.CheckConclusion
861
 	return before.Conclusion.Valid && before.Conclusion.CheckConclusion != after.Conclusion.CheckConclusion
855
 }
862
 }
856
 
863
 
864
+func stepLifecycleChanged(before, after actionsdb.WorkflowStep) bool {
865
+	if before.Status != after.Status {
866
+		return true
867
+	}
868
+	if before.Conclusion.Valid != after.Conclusion.Valid {
869
+		return true
870
+	}
871
+	return before.Conclusion.Valid && before.Conclusion.CheckConclusion != after.Conclusion.CheckConclusion
872
+}
873
+
857
 func workflowRunLifecycleChanged(before, after actionsdb.WorkflowRun) bool {
874
 func workflowRunLifecycleChanged(before, after actionsdb.WorkflowRun) bool {
858
 	if before.Status != after.Status {
875
 	if before.Status != after.Status {
859
 		return true
876
 		return true
@@ -1149,6 +1166,7 @@ func cloneStringMap(in map[string]string) map[string]string {
1149
 
1166
 
1150
 func (h *Handlers) appendScrubbedLogChunk(ctx context.Context, stepID int64, seq int32, chunk []byte, values []string) error {
1167
 func (h *Handlers) appendScrubbedLogChunk(ctx context.Context, stepID int64, seq int32, chunk []byte, values []string) error {
1151
 	q := actionsdb.New()
1168
 	q := actionsdb.New()
1169
+	acceptedChunkBytes := len(chunk)
1152
 	if len(values) == 0 {
1170
 	if len(values) == 0 {
1153
 		row, err := q.AppendStepLogChunk(ctx, h.d.Pool, actionsdb.AppendStepLogChunkParams{
1171
 		row, err := q.AppendStepLogChunk(ctx, h.d.Pool, actionsdb.AppendStepLogChunkParams{
1154
 			StepID: stepID,
1172
 			StepID: stepID,
@@ -1161,6 +1179,8 @@ func (h *Handlers) appendScrubbedLogChunk(ctx context.Context, stepID int64, seq
1161
 		if err != nil {
1179
 		if err != nil {
1162
 			return err
1180
 			return err
1163
 		}
1181
 		}
1182
+		metrics.ActionsLogChunksTotal.WithLabelValues("server").Inc()
1183
+		metrics.ActionsLogChunkBytesTotal.WithLabelValues("server").Add(float64(acceptedChunkBytes))
1164
 		return logstream.NotifyChunk(ctx, h.d.Pool, stepID, row.Seq)
1184
 		return logstream.NotifyChunk(ctx, h.d.Pool, stepID, row.Seq)
1165
 	}
1185
 	}
1166
 
1186
 
@@ -1214,6 +1234,7 @@ func (h *Handlers) appendScrubbedLogChunk(ctx context.Context, stepID int64, seq
1214
 		return err
1234
 		return err
1215
 	}
1235
 	}
1216
 
1236
 
1237
+	accepted := false
1217
 	row, err := q.AppendStepLogChunk(ctx, tx, actionsdb.AppendStepLogChunkParams{
1238
 	row, err := q.AppendStepLogChunk(ctx, tx, actionsdb.AppendStepLogChunkParams{
1218
 		StepID: stepID,
1239
 		StepID: stepID,
1219
 		Seq:    seq,
1240
 		Seq:    seq,
@@ -1221,6 +1242,7 @@ func (h *Handlers) appendScrubbedLogChunk(ctx context.Context, stepID int64, seq
1221
 	})
1242
 	})
1222
 	switch {
1243
 	switch {
1223
 	case err == nil:
1244
 	case err == nil:
1245
+		accepted = true
1224
 		if err := logstream.NotifyChunk(ctx, tx, stepID, row.Seq); err != nil {
1246
 		if err := logstream.NotifyChunk(ctx, tx, stepID, row.Seq); err != nil {
1225
 			return err
1247
 			return err
1226
 		}
1248
 		}
@@ -1232,8 +1254,12 @@ func (h *Handlers) appendScrubbedLogChunk(ctx context.Context, stepID int64, seq
1232
 		return err
1254
 		return err
1233
 	}
1255
 	}
1234
 	committed = true
1256
 	committed = true
1235
-	if replacements > 0 {
1257
+	if accepted {
1236
-		metrics.ActionsLogScrubReplacementsTotal.WithLabelValues("server").Add(float64(replacements))
1258
+		if replacements > 0 {
1259
+			metrics.ActionsLogScrubReplacementsTotal.WithLabelValues("server").Add(float64(replacements))
1260
+		}
1261
+		metrics.ActionsLogChunksTotal.WithLabelValues("server").Inc()
1262
+		metrics.ActionsLogChunkBytesTotal.WithLabelValues("server").Add(float64(acceptedChunkBytes))
1237
 	}
1263
 	}
1238
 	return nil
1264
 	return nil
1239
 }
1265
 }
internal/web/server.gomodified
@@ -115,6 +115,7 @@ func Run(ctx context.Context, opts Options) error {
115
 			pool = p
115
 			pool = p
116
 			defer p.Close()
116
 			defer p.Close()
117
 			metrics.ObserveDBPool(ctx, pool, 10*time.Second)
117
 			metrics.ObserveDBPool(ctx, pool, 10*time.Second)
118
+			metrics.ObserveActions(ctx, pool, 15*time.Second)
118
 		}
119
 		}
119
 	}
120
 	}
120
 
121