Go · 2012 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 // Package logstream owns the small Postgres LISTEN/NOTIFY contract used by
4 // Actions step-log live tailing. NOTIFY payloads intentionally carry only the
5 // chunk sequence or terminal marker; the SSE handler reads chunk bytes from
6 // workflow_step_log_chunks so verbose logs never hit Postgres's payload cap.
7 package logstream
8
9 import (
10 "context"
11 "strconv"
12 "strings"
13
14 "github.com/jackc/pgx/v5"
15 "github.com/jackc/pgx/v5/pgconn"
16 )
17
18 const donePayload = "done"
19
20 // DBTX is the Exec-only subset shared by pgxpool.Pool and pgx.Tx.
21 type DBTX interface {
22 Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
23 }
24
25 // Channel returns the per-step NOTIFY channel. stepID comes from postgres, so
26 // the numeric suffix is stable and safe to expose as a channel component.
27 func Channel(stepID int64) string {
28 return "step_log_" + strconv.FormatInt(stepID, 10)
29 }
30
31 // ListenSQL returns a quoted LISTEN statement for the per-step channel.
32 func ListenSQL(stepID int64) string {
33 return "LISTEN " + pgx.Identifier{Channel(stepID)}.Sanitize()
34 }
35
36 // NotifyChunk wakes log tailers for a newly-persisted chunk.
37 func NotifyChunk(ctx context.Context, db DBTX, stepID int64, seq int32) error {
38 return notify(ctx, db, stepID, strconv.FormatInt(int64(seq), 10))
39 }
40
41 // NotifyDone wakes log tailers and tells them to send the final done event.
42 func NotifyDone(ctx context.Context, db DBTX, stepID int64) error {
43 return notify(ctx, db, stepID, donePayload)
44 }
45
46 // ParsePayload parses the NOTIFY payload.
47 func ParsePayload(payload string) (seq int32, done bool, ok bool) {
48 payload = strings.TrimSpace(payload)
49 if payload == donePayload {
50 return 0, true, true
51 }
52 n, err := strconv.ParseInt(payload, 10, 32)
53 if err != nil || n < 0 {
54 return 0, false, false
55 }
56 return int32(n), false, true
57 }
58
59 func notify(ctx context.Context, db DBTX, stepID int64, payload string) error {
60 _, err := db.Exec(ctx, "SELECT pg_notify($1, $2)", Channel(stepID), payload)
61 return err
62 }
63