@@ -16,6 +16,7 @@ import ( |
| 16 | "strconv" | 16 | "strconv" |
| 17 | "strings" | 17 | "strings" |
| 18 | "time" | 18 | "time" |
| | 19 | + "unicode/utf8" |
| 19 | | 20 | |
| 20 | "github.com/go-chi/chi/v5" | 21 | "github.com/go-chi/chi/v5" |
| 21 | "github.com/jackc/pgx/v5" | 22 | "github.com/jackc/pgx/v5" |
@@ -55,8 +56,14 @@ func (h *Handlers) mountRunners(r chi.Router) { |
| 55 | type runnerHeartbeatRequest struct { | 56 | type runnerHeartbeatRequest struct { |
| 56 | Labels []string `json:"labels"` | 57 | Labels []string `json:"labels"` |
| 57 | Capacity int `json:"capacity"` | 58 | Capacity int `json:"capacity"` |
| | 59 | + HostName string `json:"host_name"` |
| | 60 | + Version string `json:"version"` |
| 58 | } | 61 | } |
| 59 | | 62 | |
| | 63 | +const runnerMetadataMaxBytes = 255 |
| | 64 | + |
| | 65 | +var errRunnerRevoked = errors.New("runner is revoked") |
| | 66 | + |
| 60 | func (h *Handlers) runnerHeartbeat(w http.ResponseWriter, r *http.Request) { | 67 | func (h *Handlers) runnerHeartbeat(w http.ResponseWriter, r *http.Request) { |
| 61 | if h.d.RunnerJWT == nil { | 68 | if h.d.RunnerJWT == nil { |
| 62 | writeAPIError(w, http.StatusServiceUnavailable, "runner API is not configured") | 69 | writeAPIError(w, http.StatusServiceUnavailable, "runner API is not configured") |
@@ -94,9 +101,22 @@ func (h *Handlers) runnerHeartbeat(w http.ResponseWriter, r *http.Request) { |
| 94 | writeAPIError(w, http.StatusBadRequest, "capacity must be between 1 and 64") | 101 | writeAPIError(w, http.StatusBadRequest, "capacity must be between 1 and 64") |
| 95 | return | 102 | return |
| 96 | } | 103 | } |
| | 104 | + hostName := cleanRunnerMetadata(body.HostName) |
| | 105 | + if hostName == "" { |
| | 106 | + hostName = runner.HostName |
| | 107 | + } |
| | 108 | + version := cleanRunnerMetadata(body.Version) |
| | 109 | + if version == "" { |
| | 110 | + version = runner.Version |
| | 111 | + } |
| 97 | | 112 | |
| 98 | - job, steps, resolvedSecrets, claimed, err := h.claimRunnerJob(r.Context(), runner.ID, labels, int32(capacity)) | 113 | + job, steps, resolvedSecrets, claimed, err := h.claimRunnerJob(r.Context(), runner.ID, labels, int32(capacity), hostName, version) |
| 99 | if err != nil { | 114 | if err != nil { |
| | 115 | + if errors.Is(err, errRunnerRevoked) { |
| | 116 | + metrics.ActionsRunnerHeartbeatsTotal.WithLabelValues("rejected").Inc() |
| | 117 | + writeAPIError(w, http.StatusUnauthorized, "runner revoked") |
| | 118 | + return |
| | 119 | + } |
| 100 | h.d.Logger.ErrorContext(r.Context(), "runner heartbeat claim failed", "runner_id", runner.ID, "error", err) | 120 | h.d.Logger.ErrorContext(r.Context(), "runner heartbeat claim failed", "runner_id", runner.ID, "error", err) |
| 101 | writeAPIError(w, http.StatusInternalServerError, "runner heartbeat failed") | 121 | writeAPIError(w, http.StatusInternalServerError, "runner heartbeat failed") |
| 102 | return | 122 | return |
@@ -136,6 +156,22 @@ func (h *Handlers) runnerHeartbeat(w http.ResponseWriter, r *http.Request) { |
| 136 | writeJSON(w, http.StatusOK, h.presentRunnerClaim(job, steps, resolvedSecrets, token, checkoutToken, time.Unix(claims.Exp, 0))) | 156 | writeJSON(w, http.StatusOK, h.presentRunnerClaim(job, steps, resolvedSecrets, token, checkoutToken, time.Unix(claims.Exp, 0))) |
| 137 | } | 157 | } |
| 138 | | 158 | |
| | 159 | +func cleanRunnerMetadata(value string) string { |
| | 160 | + value = strings.TrimSpace(value) |
| | 161 | + if len(value) <= runnerMetadataMaxBytes { |
| | 162 | + return value |
| | 163 | + } |
| | 164 | + var b strings.Builder |
| | 165 | + for _, r := range value { |
| | 166 | + runeLen := utf8.RuneLen(r) |
| | 167 | + if runeLen < 0 || b.Len()+runeLen > runnerMetadataMaxBytes { |
| | 168 | + break |
| | 169 | + } |
| | 170 | + b.WriteRune(r) |
| | 171 | + } |
| | 172 | + return strings.TrimSpace(b.String()) |
| | 173 | +} |
| | 174 | + |
| 139 | func (h *Handlers) authenticateRunner(w http.ResponseWriter, r *http.Request) (actionsdb.GetRunnerByTokenHashRow, bool) { | 175 | func (h *Handlers) authenticateRunner(w http.ResponseWriter, r *http.Request) (actionsdb.GetRunnerByTokenHashRow, bool) { |
| 140 | const prefix = "Bearer " | 176 | const prefix = "Bearer " |
| 141 | authz := r.Header.Get("Authorization") | 177 | authz := r.Header.Get("Authorization") |
@@ -178,6 +214,8 @@ func (h *Handlers) claimRunnerJob( |
| 178 | runnerID int64, | 214 | runnerID int64, |
| 179 | labels []string, | 215 | labels []string, |
| 180 | capacity int32, | 216 | capacity int32, |
| | 217 | + hostName string, |
| | 218 | + version string, |
| 181 | ) (actionsdb.ClaimQueuedWorkflowJobRow, []actionsdb.ListRunnerStepsForJobRow, map[string]string, bool, error) { | 219 | ) (actionsdb.ClaimQueuedWorkflowJobRow, []actionsdb.ListRunnerStepsForJobRow, map[string]string, bool, error) { |
| 182 | q := actionsdb.New() | 220 | q := actionsdb.New() |
| 183 | tx, err := h.d.Pool.Begin(ctx) | 221 | tx, err := h.d.Pool.Begin(ctx) |
@@ -191,20 +229,44 @@ func (h *Handlers) claimRunnerJob( |
| 191 | } | 229 | } |
| 192 | }() | 230 | }() |
| 193 | | 231 | |
| 194 | - if _, err := q.LockRunnerByID(ctx, tx, runnerID); err != nil { | 232 | + lockedRunner, err := q.LockRunnerByID(ctx, tx, runnerID) |
| | 233 | + if err != nil { |
| 195 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err | 234 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 196 | } | 235 | } |
| | 236 | + if lockedRunner.RevokedAt.Valid { |
| | 237 | + return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, errRunnerRevoked |
| | 238 | + } |
| 197 | running, err := q.CountRunningJobsForRunner(ctx, tx, runnerID) | 239 | running, err := q.CountRunningJobsForRunner(ctx, tx, runnerID) |
| 198 | if err != nil { | 240 | if err != nil { |
| 199 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err | 241 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 200 | } | 242 | } |
| 201 | - if running >= capacity { | 243 | + heartbeat := func(status actionsdb.WorkflowRunnerStatus) error { |
| 202 | - if _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{ | 244 | + _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{ |
| 203 | ID: runnerID, | 245 | ID: runnerID, |
| 204 | Labels: labels, | 246 | Labels: labels, |
| 205 | Capacity: capacity, | 247 | Capacity: capacity, |
| 206 | - Status: actionsdb.WorkflowRunnerStatusBusy, | 248 | + Status: status, |
| 207 | - }); err != nil { | 249 | + HostName: hostName, |
| | 250 | + Version: version, |
| | 251 | + }) |
| | 252 | + return err |
| | 253 | + } |
| | 254 | + if lockedRunner.DrainingAt.Valid { |
| | 255 | + status := actionsdb.WorkflowRunnerStatusIdle |
| | 256 | + if running > 0 { |
| | 257 | + status = actionsdb.WorkflowRunnerStatusBusy |
| | 258 | + } |
| | 259 | + if err := heartbeat(status); err != nil { |
| | 260 | + return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| | 261 | + } |
| | 262 | + if err := tx.Commit(ctx); err != nil { |
| | 263 | + return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| | 264 | + } |
| | 265 | + committed = true |
| | 266 | + return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, nil |
| | 267 | + } |
| | 268 | + if running >= capacity { |
| | 269 | + if err := heartbeat(actionsdb.WorkflowRunnerStatusBusy); err != nil { |
| 208 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err | 270 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 209 | } | 271 | } |
| 210 | if err := tx.Commit(ctx); err != nil { | 272 | if err := tx.Commit(ctx); err != nil { |
@@ -222,12 +284,7 @@ func (h *Handlers) claimRunnerJob( |
| 222 | if !errors.Is(err, pgx.ErrNoRows) { | 284 | if !errors.Is(err, pgx.ErrNoRows) { |
| 223 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err | 285 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 224 | } | 286 | } |
| 225 | - if _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{ | 287 | + if err := heartbeat(actionsdb.WorkflowRunnerStatusIdle); err != nil { |
| 226 | - ID: runnerID, | | |
| 227 | - Labels: labels, | | |
| 228 | - Capacity: capacity, | | |
| 229 | - Status: actionsdb.WorkflowRunnerStatusIdle, | | |
| 230 | - }); err != nil { | | |
| 231 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err | 288 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 232 | } | 289 | } |
| 233 | if err := tx.Commit(ctx); err != nil { | 290 | if err := tx.Commit(ctx); err != nil { |
@@ -268,18 +325,24 @@ func (h *Handlers) claimRunnerJob( |
| 268 | if running+1 >= capacity { | 325 | if running+1 >= capacity { |
| 269 | status = actionsdb.WorkflowRunnerStatusBusy | 326 | status = actionsdb.WorkflowRunnerStatusBusy |
| 270 | } | 327 | } |
| 271 | - if _, err := q.HeartbeatRunner(ctx, tx, actionsdb.HeartbeatRunnerParams{ | 328 | + if err := heartbeat(status); err != nil { |
| 272 | - ID: runnerID, | | |
| 273 | - Labels: labels, | | |
| 274 | - Capacity: capacity, | | |
| 275 | - Status: status, | | |
| 276 | - }); err != nil { | | |
| 277 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err | 329 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 278 | } | 330 | } |
| | 331 | + var claimLatencySeconds float64 |
| | 332 | + observeClaimLatency := false |
| | 333 | + if job.CreatedAt.Valid { |
| | 334 | + if latency := time.Since(job.CreatedAt.Time); latency >= 0 { |
| | 335 | + claimLatencySeconds = latency.Seconds() |
| | 336 | + observeClaimLatency = true |
| | 337 | + } |
| | 338 | + } |
| 279 | if err := tx.Commit(ctx); err != nil { | 339 | if err := tx.Commit(ctx); err != nil { |
| 280 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err | 340 | return actionsdb.ClaimQueuedWorkflowJobRow{}, nil, nil, false, err |
| 281 | } | 341 | } |
| 282 | committed = true | 342 | committed = true |
| | 343 | + if observeClaimLatency { |
| | 344 | + metrics.ActionsJobClaimLatencySeconds.Observe(claimLatencySeconds) |
| | 345 | + } |
| 283 | return job, steps, resolvedSecrets, true, nil | 346 | return job, steps, resolvedSecrets, true, nil |
| 284 | } | 347 | } |
| 285 | | 348 | |
@@ -326,7 +389,14 @@ func (h *Handlers) authenticateRunnerJob(w http.ResponseWriter, r *http.Request) |
| 326 | writeAPIError(w, http.StatusUnauthorized, "job token invalid") | 389 | writeAPIError(w, http.StatusUnauthorized, "job token invalid") |
| 327 | return runnerJobAuth{}, false | 390 | return runnerJobAuth{}, false |
| 328 | } | 391 | } |
| 329 | - job, err := actionsdb.New().GetWorkflowJobByID(r.Context(), h.d.Pool, pathJobID) | 392 | + q := actionsdb.New() |
| | 393 | + runner, err := q.GetRunnerByID(r.Context(), h.d.Pool, runnerID) |
| | 394 | + if err != nil || runner.RevokedAt.Valid { |
| | 395 | + metrics.ActionsRunnerJWTTotal.WithLabelValues("rejected").Inc() |
| | 396 | + writeAPIError(w, http.StatusUnauthorized, "job token invalid") |
| | 397 | + return runnerJobAuth{}, false |
| | 398 | + } |
| | 399 | + job, err := q.GetWorkflowJobByID(r.Context(), h.d.Pool, pathJobID) |
| 330 | if err != nil { | 400 | if err != nil { |
| 331 | if errors.Is(err, pgx.ErrNoRows) { | 401 | if errors.Is(err, pgx.ErrNoRows) { |
| 332 | writeAPIError(w, http.StatusNotFound, "job not found") | 402 | writeAPIError(w, http.StatusNotFound, "job not found") |