| 1 | // SPDX-License-Identifier: AGPL-3.0-or-later |
| 2 | |
| 3 | // Package logstream owns the small Postgres LISTEN/NOTIFY contract used by |
| 4 | // Actions step-log live tailing. NOTIFY payloads intentionally carry only the |
| 5 | // chunk sequence or terminal marker; the SSE handler reads chunk bytes from |
| 6 | // workflow_step_log_chunks so verbose logs never hit Postgres's payload cap. |
| 7 | package logstream |
| 8 | |
| 9 | import ( |
| 10 | "context" |
| 11 | "strconv" |
| 12 | "strings" |
| 13 | |
| 14 | "github.com/jackc/pgx/v5" |
| 15 | "github.com/jackc/pgx/v5/pgconn" |
| 16 | ) |
| 17 | |
| 18 | const donePayload = "done" |
| 19 | |
| 20 | // DBTX is the Exec-only subset shared by pgxpool.Pool and pgx.Tx. |
| 21 | type DBTX interface { |
| 22 | Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) |
| 23 | } |
| 24 | |
| 25 | // Channel returns the per-step NOTIFY channel. stepID comes from postgres, so |
| 26 | // the numeric suffix is stable and safe to expose as a channel component. |
| 27 | func Channel(stepID int64) string { |
| 28 | return "step_log_" + strconv.FormatInt(stepID, 10) |
| 29 | } |
| 30 | |
| 31 | // ListenSQL returns a quoted LISTEN statement for the per-step channel. |
| 32 | func ListenSQL(stepID int64) string { |
| 33 | return "LISTEN " + pgx.Identifier{Channel(stepID)}.Sanitize() |
| 34 | } |
| 35 | |
| 36 | // UnlistenSQL returns a quoted UNLISTEN statement for the per-step channel. |
| 37 | func UnlistenSQL(stepID int64) string { |
| 38 | return "UNLISTEN " + pgx.Identifier{Channel(stepID)}.Sanitize() |
| 39 | } |
| 40 | |
| 41 | // NotifyChunk wakes log tailers for a newly-persisted chunk. |
| 42 | func NotifyChunk(ctx context.Context, db DBTX, stepID int64, seq int32) error { |
| 43 | return notify(ctx, db, stepID, strconv.FormatInt(int64(seq), 10)) |
| 44 | } |
| 45 | |
| 46 | // NotifyDone wakes log tailers and tells them to send the final done event. |
| 47 | func NotifyDone(ctx context.Context, db DBTX, stepID int64) error { |
| 48 | return notify(ctx, db, stepID, donePayload) |
| 49 | } |
| 50 | |
| 51 | // ParsePayload parses the NOTIFY payload. |
| 52 | func ParsePayload(payload string) (seq int32, done bool, ok bool) { |
| 53 | payload = strings.TrimSpace(payload) |
| 54 | if payload == donePayload { |
| 55 | return 0, true, true |
| 56 | } |
| 57 | n, err := strconv.ParseInt(payload, 10, 32) |
| 58 | if err != nil || n < 0 { |
| 59 | return 0, false, false |
| 60 | } |
| 61 | return int32(n), false, true |
| 62 | } |
| 63 | |
| 64 | func notify(ctx context.Context, db DBTX, stepID int64, payload string) error { |
| 65 | _, err := db.Exec(ctx, "SELECT pg_notify($1, $2)", Channel(stepID), payload) |
| 66 | return err |
| 67 | } |
| 68 |