tenseleyflow/shithub / 1fba1c8

Browse files

actions: live log tail and dispatch UI (S41f)

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
1fba1c88b6f434c2058e257e390a342b23263c90
Parents
1d3f3ad
Tree
4824f90

21 changed files

StatusFile+-
M deploy/Caddyfile.j2 18 1
A docs/internal/runbooks/actions.md 41 0
A internal/actions/logstream/logstream.go 62 0
A internal/actions/logstream/logstream_test.go 41 0
M internal/ratelimit/bucket.go 85 0
M internal/ratelimit/bucket_test.go 69 0
M internal/ratelimit/queries/rate_limits.sql 27 0
M internal/ratelimit/sqlc/querier.go 5 0
M internal/ratelimit/sqlc/rate_limits.sql.go 66 0
M internal/web/handlers/api/runners.go 20 4
M internal/web/handlers/repo/actions.go 21 0
M internal/web/handlers/repo/actions_dispatch.go 130 15
A internal/web/handlers/repo/actions_dispatch_ui.go 130 0
A internal/web/handlers/repo/actions_log_stream.go 292 0
M internal/web/handlers/repo/actions_test.go 226 0
M internal/web/handlers/repo/repo.go 6 0
M internal/web/handlers/repo/repo_test.go 2 2
M internal/web/repo_wiring.go 2 0
M internal/web/static/css/shithub.css 96 0
M internal/web/templates/repo/action_step_log.html 38 0
M internal/web/templates/repo/actions.html 43 1
deploy/Caddyfile.j2modified
@@ -17,7 +17,24 @@
1717
 }
1818
 
1919
 {{ shithub_domain }} {
20
-    encode gzip
20
+    @compressible {
21
+        not path_regexp actions_log_stream_for_compression ^/[^/]+/[^/]+/actions/runs/[0-9]+/jobs/[0-9]+/steps/[0-9]+/log/stream$
22
+    }
23
+    encode @compressible gzip
24
+
25
+    # Actions step-log SSE must flush each event immediately and must
26
+    # bypass gzip; buffering here makes logs appear in delayed chunks.
27
+    @actions_log_stream path_regexp actions_log_stream ^/[^/]+/[^/]+/actions/runs/[0-9]+/jobs/[0-9]+/steps/[0-9]+/log/stream$
28
+    handle @actions_log_stream {
29
+        reverse_proxy 127.0.0.1:8080 {
30
+            transport http {
31
+                read_timeout 30m
32
+                write_timeout 30m
33
+                response_header_timeout 30m
34
+            }
35
+            flush_interval -1
36
+        }
37
+    }
2138
 
2239
     # Long-timeout git smart-HTTP routes (S12). The fetch + push
2340
     # subprotocols stream and benefit from the buffer being off.
docs/internal/runbooks/actions.mdadded
@@ -0,0 +1,41 @@
1
+# Actions runbook
2
+
3
+## Live log tail
4
+
5
+Step log pages open an SSE stream at:
6
+
7
+```text
8
+/{owner}/{repo}/actions/runs/{run}/jobs/{job}/steps/{step}/log/stream
9
+```
10
+
11
+The stream sends `event: chunk` records with the chunk sequence as the SSE
12
+`id`. Browsers reconnect with `Last-Event-ID`; the handler also accepts
13
+`?after=<seq>` for the first connection from a rendered log page. A terminal
14
+step sends `event: done` and closes the stream.
15
+
16
+Log chunks are never sent through Postgres `NOTIFY`. Runner log writes append
17
+to `workflow_step_log_chunks`, then `NOTIFY step_log_<step_id>` with only the
18
+sequence number. Step completion notifies `done`.
19
+
20
+## Rate limit
21
+
22
+Live tails use `internal/ratelimit` scope `actions:logtail` with five
23
+concurrent streams per viewer. Authenticated viewers key by user id; anonymous
24
+public-repo viewers key by client IP. The limiter uses a short lease TTL so a
25
+dropped connection cannot hold a slot permanently.
26
+
27
+## Caddy
28
+
29
+The production Caddy template has a dedicated Actions log-stream route with:
30
+
31
+```caddy
32
+flush_interval -1
33
+```
34
+
35
+The same route is excluded from gzip compression. If logs arrive only after
36
+several kilobytes accumulate, verify the deployed `/etc/caddy/Caddyfile`
37
+contains that route and reload Caddy:
38
+
39
+```sh
40
+sudo caddy reload --config /etc/caddy/Caddyfile
41
+```
internal/actions/logstream/logstream.goadded
@@ -0,0 +1,62 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package logstream owns the small Postgres LISTEN/NOTIFY contract used by
4
+// Actions step-log live tailing. NOTIFY payloads intentionally carry only the
5
+// chunk sequence or terminal marker; the SSE handler reads chunk bytes from
6
+// workflow_step_log_chunks so verbose logs never hit Postgres's payload cap.
7
+package logstream
8
+
9
+import (
10
+	"context"
11
+	"strconv"
12
+	"strings"
13
+
14
+	"github.com/jackc/pgx/v5"
15
+	"github.com/jackc/pgx/v5/pgconn"
16
+)
17
+
18
+const donePayload = "done"
19
+
20
+// DBTX is the Exec-only subset shared by pgxpool.Pool and pgx.Tx.
21
+type DBTX interface {
22
+	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
23
+}
24
+
25
+// Channel returns the per-step NOTIFY channel. stepID comes from postgres, so
26
+// the numeric suffix is stable and safe to expose as a channel component.
27
+func Channel(stepID int64) string {
28
+	return "step_log_" + strconv.FormatInt(stepID, 10)
29
+}
30
+
31
+// ListenSQL returns a quoted LISTEN statement for the per-step channel.
32
+func ListenSQL(stepID int64) string {
33
+	return "LISTEN " + pgx.Identifier{Channel(stepID)}.Sanitize()
34
+}
35
+
36
+// NotifyChunk wakes log tailers for a newly-persisted chunk.
37
+func NotifyChunk(ctx context.Context, db DBTX, stepID int64, seq int32) error {
38
+	return notify(ctx, db, stepID, strconv.FormatInt(int64(seq), 10))
39
+}
40
+
41
+// NotifyDone wakes log tailers and tells them to send the final done event.
42
+func NotifyDone(ctx context.Context, db DBTX, stepID int64) error {
43
+	return notify(ctx, db, stepID, donePayload)
44
+}
45
+
46
+// ParsePayload parses the NOTIFY payload.
47
+func ParsePayload(payload string) (seq int32, done bool, ok bool) {
48
+	payload = strings.TrimSpace(payload)
49
+	if payload == donePayload {
50
+		return 0, true, true
51
+	}
52
+	n, err := strconv.ParseInt(payload, 10, 32)
53
+	if err != nil || n < 0 {
54
+		return 0, false, false
55
+	}
56
+	return int32(n), false, true
57
+}
58
+
59
+func notify(ctx context.Context, db DBTX, stepID int64, payload string) error {
60
+	_, err := db.Exec(ctx, "SELECT pg_notify($1, $2)", Channel(stepID), payload)
61
+	return err
62
+}
internal/actions/logstream/logstream_test.goadded
@@ -0,0 +1,41 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package logstream
4
+
5
+import "testing"
6
+
7
+func TestChannelAndListenSQL(t *testing.T) {
8
+	t.Parallel()
9
+	if got := Channel(42); got != "step_log_42" {
10
+		t.Fatalf("Channel=%q", got)
11
+	}
12
+	if got := ListenSQL(42); got != `LISTEN "step_log_42"` {
13
+		t.Fatalf("ListenSQL=%q", got)
14
+	}
15
+}
16
+
17
+func TestParsePayload(t *testing.T) {
18
+	t.Parallel()
19
+	tests := []struct {
20
+		name     string
21
+		payload  string
22
+		wantSeq  int32
23
+		wantDone bool
24
+		wantOK   bool
25
+	}{
26
+		{name: "chunk", payload: "7", wantSeq: 7, wantOK: true},
27
+		{name: "done", payload: "done", wantDone: true, wantOK: true},
28
+		{name: "trim", payload: " 8 ", wantSeq: 8, wantOK: true},
29
+		{name: "negative", payload: "-1"},
30
+		{name: "invalid", payload: "chunk:1"},
31
+	}
32
+	for _, tt := range tests {
33
+		t.Run(tt.name, func(t *testing.T) {
34
+			t.Parallel()
35
+			gotSeq, gotDone, gotOK := ParsePayload(tt.payload)
36
+			if gotSeq != tt.wantSeq || gotDone != tt.wantDone || gotOK != tt.wantOK {
37
+				t.Fatalf("ParsePayload(%q)=(%d,%v,%v)", tt.payload, gotSeq, gotDone, gotOK)
38
+			}
39
+		})
40
+	}
41
+}
internal/ratelimit/bucket.gomodified
@@ -16,8 +16,10 @@ import (
1616
 	"errors"
1717
 	"fmt"
1818
 	"net/netip"
19
+	"sync/atomic"
1920
 	"time"
2021
 
22
+	"github.com/jackc/pgx/v5"
2123
 	"github.com/jackc/pgx/v5/pgtype"
2224
 	"github.com/jackc/pgx/v5/pgxpool"
2325
 
@@ -57,6 +59,14 @@ type Decision struct {
5759
 	RetryAfter time.Duration // 0 when Allowed; otherwise the wait the client should respect
5860
 }
5961
 
62
+// Lease represents one held concurrent slot. Release is idempotent.
63
+type Lease struct {
64
+	limiter  *Limiter
65
+	policy   Policy
66
+	key      string
67
+	released atomic.Bool
68
+}
69
+
6070
 // Allow increments the (scope, key) counter and reports whether the
6171
 // caller is under or over the configured Max. Returns the post-
6272
 // increment Remaining + the time until the current window rolls.
@@ -100,6 +110,81 @@ func (l *Limiter) Allow(ctx context.Context, p Policy, key string) (Decision, er
100110
 	return d, nil
101111
 }
102112
 
113
+// AcquireLease holds one concurrent slot for long-lived requests such as SSE
114
+// streams. Callers must Release when the request exits. Policy.Window is the
115
+// stale-lease TTL: it bounds leak duration if a process exits without release.
116
+//
117
+// Like Allow, transient Postgres errors fail open. The returned lease is nil in
118
+// that case, so callers can continue without a release hook while still logging
119
+// the error.
120
+func (l *Limiter) AcquireLease(ctx context.Context, p Policy, key string) (*Lease, Decision, error) {
121
+	if p.Max <= 0 || p.Window <= 0 {
122
+		return nil, Decision{}, errors.New("ratelimit: Policy.Max and Window must be positive")
123
+	}
124
+	if p.Scope == "" || key == "" {
125
+		return nil, Decision{}, errors.New("ratelimit: Policy.Scope and key must be non-empty")
126
+	}
127
+	row, err := l.q.AcquireRateLimitLease(ctx, l.pool, ratelimitdb.AcquireRateLimitLeaseParams{
128
+		Scope:   p.Scope,
129
+		Key:     key,
130
+		Ttl:     pgtype.Interval{Microseconds: int64(p.Window / time.Microsecond), Valid: true},
131
+		MaxHits: int32(p.Max),
132
+	})
133
+	if errors.Is(err, pgx.ErrNoRows) {
134
+		return nil, l.blockedLeaseDecision(ctx, p, key), nil
135
+	}
136
+	if err != nil {
137
+		return nil, Decision{Allowed: true, Remaining: p.Max, Limit: p.Max, ResetIn: p.Window}, fmt.Errorf("ratelimit: acquire lease: %w", err)
138
+	}
139
+	hits := int(row.Hits)
140
+	resetIn := time.Until(row.WindowStartedAt.Time.Add(p.Window))
141
+	if resetIn < 0 {
142
+		resetIn = 0
143
+	}
144
+	return &Lease{limiter: l, policy: p, key: key}, Decision{
145
+		Allowed:   true,
146
+		Limit:     p.Max,
147
+		Remaining: max0(p.Max - hits),
148
+		ResetIn:   resetIn,
149
+	}, nil
150
+}
151
+
152
+// Release returns the held slot to the limiter. It is safe to call multiple
153
+// times; only the first call touches postgres.
154
+func (l *Lease) Release(ctx context.Context) error {
155
+	if l == nil || l.limiter == nil {
156
+		return nil
157
+	}
158
+	if !l.released.CompareAndSwap(false, true) {
159
+		return nil
160
+	}
161
+	_, err := l.limiter.q.ReleaseRateLimitLease(ctx, l.limiter.pool, ratelimitdb.ReleaseRateLimitLeaseParams{
162
+		Scope: l.policy.Scope,
163
+		Key:   l.key,
164
+	})
165
+	if err != nil {
166
+		return fmt.Errorf("ratelimit: release lease: %w", err)
167
+	}
168
+	return nil
169
+}
170
+
171
+func (l *Limiter) blockedLeaseDecision(ctx context.Context, p Policy, key string) Decision {
172
+	resetIn := p.Window
173
+	if row, err := l.q.PeekRateLimit(ctx, l.pool, ratelimitdb.PeekRateLimitParams{Scope: p.Scope, Key: key}); err == nil {
174
+		resetIn = time.Until(row.WindowStartedAt.Time.Add(p.Window))
175
+		if resetIn <= 0 {
176
+			resetIn = time.Second
177
+		}
178
+	}
179
+	return Decision{
180
+		Allowed:    false,
181
+		Limit:      p.Max,
182
+		Remaining:  0,
183
+		ResetIn:    resetIn,
184
+		RetryAfter: resetIn,
185
+	}
186
+}
187
+
103188
 // AllowSignupIP is the inet-keyed sibling of Allow against the
104189
 // signup_ip_throttle table. ip is masked to /24 (v4) or /48 (v6)
105190
 // so a single residential allocation shares one counter — matches
internal/ratelimit/bucket_test.gomodified
@@ -62,6 +62,75 @@ func TestAllow_DistinctKeysAreIndependent(t *testing.T) {
6262
 	}
6363
 }
6464
 
65
+func TestAcquireLease_BlocksUntilRelease(t *testing.T) {
66
+	t.Parallel()
67
+	l := New(dbtest.NewTestDB(t))
68
+	ctx := context.Background()
69
+	p := Policy{Scope: "test:lease", Max: 2, Window: time.Hour}
70
+
71
+	lease1, d, err := l.AcquireLease(ctx, p, "alice")
72
+	if err != nil {
73
+		t.Fatalf("lease1 err: %v", err)
74
+	}
75
+	if !d.Allowed || d.Remaining != 1 {
76
+		t.Fatalf("lease1 decision=%+v", d)
77
+	}
78
+	lease2, d, err := l.AcquireLease(ctx, p, "alice")
79
+	if err != nil {
80
+		t.Fatalf("lease2 err: %v", err)
81
+	}
82
+	if !d.Allowed || d.Remaining != 0 {
83
+		t.Fatalf("lease2 decision=%+v", d)
84
+	}
85
+	lease3, d, err := l.AcquireLease(ctx, p, "alice")
86
+	if err != nil {
87
+		t.Fatalf("lease3 err: %v", err)
88
+	}
89
+	if lease3 != nil || d.Allowed {
90
+		t.Fatalf("lease3=(%v,%+v), want blocked without lease", lease3, d)
91
+	}
92
+	if err := lease1.Release(ctx); err != nil {
93
+		t.Fatalf("release lease1: %v", err)
94
+	}
95
+	lease4, d, err := l.AcquireLease(ctx, p, "alice")
96
+	if err != nil {
97
+		t.Fatalf("lease4 err: %v", err)
98
+	}
99
+	if lease4 == nil || !d.Allowed {
100
+		t.Fatalf("lease4=(%v,%+v), want allowed", lease4, d)
101
+	}
102
+	if err := lease1.Release(ctx); err != nil {
103
+		t.Fatalf("second release lease1: %v", err)
104
+	}
105
+	_ = lease2.Release(ctx)
106
+	_ = lease4.Release(ctx)
107
+}
108
+
109
+func TestAcquireLease_RollsStaleWindow(t *testing.T) {
110
+	t.Parallel()
111
+	l := New(dbtest.NewTestDB(t))
112
+	ctx := context.Background()
113
+	p := Policy{Scope: "test:lease:ttl", Max: 1, Window: time.Millisecond}
114
+
115
+	lease1, d, err := l.AcquireLease(ctx, p, "alice")
116
+	if err != nil {
117
+		t.Fatalf("lease1 err: %v", err)
118
+	}
119
+	if !d.Allowed {
120
+		t.Fatalf("lease1 blocked: %+v", d)
121
+	}
122
+	time.Sleep(5 * time.Millisecond)
123
+	lease2, d, err := l.AcquireLease(ctx, p, "alice")
124
+	if err != nil {
125
+		t.Fatalf("lease2 err: %v", err)
126
+	}
127
+	if lease2 == nil || !d.Allowed {
128
+		t.Fatalf("stale lease did not roll forward: lease=%v decision=%+v", lease2, d)
129
+	}
130
+	_ = lease1.Release(ctx)
131
+	_ = lease2.Release(ctx)
132
+}
133
+
65134
 func TestAllow_RejectsBadPolicy(t *testing.T) {
66135
 	t.Parallel()
67136
 	l := New(dbtest.NewTestDB(t))
internal/ratelimit/queries/rate_limits.sqlmodified
@@ -38,6 +38,33 @@ SELECT scope, key, hits, window_started_at
3838
 FROM rate_limits
3939
 WHERE scope = $1 AND key = $2;
4040
 
41
+-- name: AcquireRateLimitLease :one
42
+-- Concurrent-lease variant for long-lived streams. `hits` is the
43
+-- currently-held lease count. The ttl rolls stale rows forward so a process
44
+-- crash or severed TCP connection cannot consume capacity indefinitely.
45
+INSERT INTO rate_limits (scope, key, hits, window_started_at)
46
+VALUES (sqlc.arg(scope), sqlc.arg(key), 1, now())
47
+ON CONFLICT (scope, key)
48
+DO UPDATE SET
49
+    hits              = CASE
50
+                          WHEN rate_limits.window_started_at < now() - sqlc.arg(ttl)::interval
51
+                          THEN 1
52
+                          ELSE rate_limits.hits + 1
53
+                        END,
54
+    window_started_at = CASE
55
+                          WHEN rate_limits.window_started_at < now() - sqlc.arg(ttl)::interval
56
+                          THEN now()
57
+                          ELSE rate_limits.window_started_at
58
+                        END
59
+WHERE rate_limits.window_started_at < now() - sqlc.arg(ttl)::interval
60
+   OR rate_limits.hits < sqlc.arg(max_hits)::integer
61
+RETURNING hits, window_started_at;
62
+
63
+-- name: ReleaseRateLimitLease :execrows
64
+UPDATE rate_limits
65
+SET hits = GREATEST(hits - 1, 0)
66
+WHERE scope = $1 AND key = $2;
67
+
4168
 -- name: PruneRateLimits :execrows
4269
 DELETE FROM rate_limits
4370
 WHERE window_started_at < now() - sqlc.arg(retention)::interval;
internal/ratelimit/sqlc/querier.gomodified
@@ -11,6 +11,10 @@ import (
1111
 )
1212
 
1313
 type Querier interface {
14
+	// Concurrent-lease variant for long-lived streams. `hits` is the
15
+	// currently-held lease count. The ttl rolls stale rows forward so a process
16
+	// crash or severed TCP connection cannot consume capacity indefinitely.
17
+	AcquireRateLimitLease(ctx context.Context, db DBTX, arg AcquireRateLimitLeaseParams) (AcquireRateLimitLeaseRow, error)
1418
 	// SPDX-License-Identifier: AGPL-3.0-or-later
1519
 	//
1620
 	// Generic rate-limit counter queries (S35). Two write paths:
@@ -34,6 +38,7 @@ type Querier interface {
3438
 	PeekRateLimit(ctx context.Context, db DBTX, arg PeekRateLimitParams) (RateLimit, error)
3539
 	PruneRateLimits(ctx context.Context, db DBTX, retention pgtype.Interval) (int64, error)
3640
 	PruneSignupIPThrottle(ctx context.Context, db DBTX, retention pgtype.Interval) (int64, error)
41
+	ReleaseRateLimitLease(ctx context.Context, db DBTX, arg ReleaseRateLimitLeaseParams) (int64, error)
3742
 }
3843
 
3944
 var _ Querier = (*Queries)(nil)
internal/ratelimit/sqlc/rate_limits.sql.gomodified
@@ -12,6 +12,53 @@ import (
1212
 	"github.com/jackc/pgx/v5/pgtype"
1313
 )
1414
 
15
+const acquireRateLimitLease = `-- name: AcquireRateLimitLease :one
16
+INSERT INTO rate_limits (scope, key, hits, window_started_at)
17
+VALUES ($1, $2, 1, now())
18
+ON CONFLICT (scope, key)
19
+DO UPDATE SET
20
+    hits              = CASE
21
+                          WHEN rate_limits.window_started_at < now() - $3::interval
22
+                          THEN 1
23
+                          ELSE rate_limits.hits + 1
24
+                        END,
25
+    window_started_at = CASE
26
+                          WHEN rate_limits.window_started_at < now() - $3::interval
27
+                          THEN now()
28
+                          ELSE rate_limits.window_started_at
29
+                        END
30
+WHERE rate_limits.window_started_at < now() - $3::interval
31
+   OR rate_limits.hits < $4::integer
32
+RETURNING hits, window_started_at
33
+`
34
+
35
+type AcquireRateLimitLeaseParams struct {
36
+	Scope   string
37
+	Key     string
38
+	Ttl     pgtype.Interval
39
+	MaxHits int32
40
+}
41
+
42
+type AcquireRateLimitLeaseRow struct {
43
+	Hits            int32
44
+	WindowStartedAt pgtype.Timestamptz
45
+}
46
+
47
+// Concurrent-lease variant for long-lived streams. `hits` is the
48
+// currently-held lease count. The ttl rolls stale rows forward so a process
49
+// crash or severed TCP connection cannot consume capacity indefinitely.
50
+func (q *Queries) AcquireRateLimitLease(ctx context.Context, db DBTX, arg AcquireRateLimitLeaseParams) (AcquireRateLimitLeaseRow, error) {
51
+	row := db.QueryRow(ctx, acquireRateLimitLease,
52
+		arg.Scope,
53
+		arg.Key,
54
+		arg.Ttl,
55
+		arg.MaxHits,
56
+	)
57
+	var i AcquireRateLimitLeaseRow
58
+	err := row.Scan(&i.Hits, &i.WindowStartedAt)
59
+	return i, err
60
+}
61
+
1562
 const bumpRateLimit = `-- name: BumpRateLimit :one
1663
 
1764
 INSERT INTO rate_limits (scope, key, hits, window_started_at)
@@ -150,3 +197,22 @@ func (q *Queries) PruneSignupIPThrottle(ctx context.Context, db DBTX, retention
150197
 	}
151198
 	return result.RowsAffected(), nil
152199
 }
200
+
201
+const releaseRateLimitLease = `-- name: ReleaseRateLimitLease :execrows
202
+UPDATE rate_limits
203
+SET hits = GREATEST(hits - 1, 0)
204
+WHERE scope = $1 AND key = $2
205
+`
206
+
207
+type ReleaseRateLimitLeaseParams struct {
208
+	Scope string
209
+	Key   string
210
+}
211
+
212
+func (q *Queries) ReleaseRateLimitLease(ctx context.Context, db DBTX, arg ReleaseRateLimitLeaseParams) (int64, error) {
213
+	result, err := db.Exec(ctx, releaseRateLimitLease, arg.Scope, arg.Key)
214
+	if err != nil {
215
+		return 0, err
216
+	}
217
+	return result.RowsAffected(), nil
218
+}
internal/web/handlers/api/runners.gomodified
@@ -21,6 +21,7 @@ import (
2121
 	"github.com/jackc/pgx/v5/pgtype"
2222
 
2323
 	"github.com/tenseleyFlow/shithub/internal/actions/finalize"
24
+	"github.com/tenseleyFlow/shithub/internal/actions/logstream"
2425
 	"github.com/tenseleyFlow/shithub/internal/actions/runnerlabels"
2526
 	"github.com/tenseleyFlow/shithub/internal/actions/runnertoken"
2627
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
@@ -656,6 +657,11 @@ func (h *Handlers) applyStepStatus(
656657
 		}
657658
 		shouldNotify = true
658659
 	}
660
+	if terminal {
661
+		if err := logstream.NotifyDone(ctx, tx, step.ID); err != nil {
662
+			return actionsdb.WorkflowStep{}, err
663
+		}
664
+	}
659665
 	if err := tx.Commit(ctx); err != nil {
660666
 		return actionsdb.WorkflowStep{}, err
661667
 	}
@@ -1022,7 +1028,7 @@ func cloneStringMap(in map[string]string) map[string]string {
10221028
 func (h *Handlers) appendScrubbedLogChunk(ctx context.Context, stepID int64, seq int32, chunk []byte, values []string) error {
10231029
 	q := actionsdb.New()
10241030
 	if len(values) == 0 {
1025
-		_, err := q.AppendStepLogChunk(ctx, h.d.Pool, actionsdb.AppendStepLogChunkParams{
1031
+		row, err := q.AppendStepLogChunk(ctx, h.d.Pool, actionsdb.AppendStepLogChunkParams{
10261032
 			StepID: stepID,
10271033
 			Seq:    seq,
10281034
 			Chunk:  chunk,
@@ -1030,7 +1036,10 @@ func (h *Handlers) appendScrubbedLogChunk(ctx context.Context, stepID int64, seq
10301036
 		if errors.Is(err, pgx.ErrNoRows) {
10311037
 			return nil
10321038
 		}
1033
-		return err
1039
+		if err != nil {
1040
+			return err
1041
+		}
1042
+		return logstream.NotifyChunk(ctx, h.d.Pool, stepID, row.Seq)
10341043
 	}
10351044
 
10361045
 	tx, err := h.d.Pool.Begin(ctx)
@@ -1083,11 +1092,18 @@ func (h *Handlers) appendScrubbedLogChunk(ctx context.Context, stepID int64, seq
10831092
 		return err
10841093
 	}
10851094
 
1086
-	if _, err := q.AppendStepLogChunk(ctx, tx, actionsdb.AppendStepLogChunkParams{
1095
+	row, err := q.AppendStepLogChunk(ctx, tx, actionsdb.AppendStepLogChunkParams{
10871096
 		StepID: stepID,
10881097
 		Seq:    seq,
10891098
 		Chunk:  chunk,
1090
-	}); err != nil && !errors.Is(err, pgx.ErrNoRows) {
1099
+	})
1100
+	switch {
1101
+	case err == nil:
1102
+		if err := logstream.NotifyChunk(ctx, tx, stepID, row.Seq); err != nil {
1103
+			return err
1104
+		}
1105
+	case errors.Is(err, pgx.ErrNoRows):
1106
+	default:
10911107
 		return err
10921108
 	}
10931109
 	if err := tx.Commit(ctx); err != nil {
internal/web/handlers/repo/actions.gomodified
@@ -145,6 +145,7 @@ type actionsStepDetailView struct {
145145
 	StateClass   string
146146
 	StateIcon    string
147147
 	Duration     string
148
+	IsTerminal   bool
148149
 	LogByteCount int64
149150
 	LogHref      string
150151
 }
@@ -162,6 +163,7 @@ type actionsStepLogView struct {
162163
 	LogSource    string
163164
 	LogError     string
164165
 	LogTruncated bool
166
+	StreamHref   string
165167
 	DownloadURL  string
166168
 	BackHref     string
167169
 }
@@ -204,11 +206,16 @@ func (h *Handlers) repoTabActions(w http.ResponseWriter, r *http.Request) {
204206
 	for _, run := range runs {
205207
 		runViews = append(runViews, actionsListRunViewFromRow(run, owner.Username, row.Name, now))
206208
 	}
209
+	dispatchWorkflows, err := h.actionsDispatchWorkflowViews(r.Context(), row, owner.Username)
210
+	if err != nil {
211
+		h.d.Logger.WarnContext(r.Context(), "repo actions: discover dispatch workflows", "repo_id", row.ID, "error", err)
212
+	}
207213
 
208214
 	data := h.repoHeaderData(r, row, owner.Username, "actions")
209215
 	data["Title"] = "Actions · " + row.Name
210216
 	data["Runs"] = runViews
211217
 	data["Workflows"] = workflows
218
+	data["DispatchWorkflows"] = dispatchWorkflows
212219
 	data["RunCount"] = allRunCount
213220
 	data["FilteredRunCount"] = filteredCount
214221
 	data["ActiveWorkflowName"] = activeWorkflowName
@@ -694,6 +701,9 @@ func (h *Handlers) repoActionStepLog(w http.ResponseWriter, r *http.Request) {
694701
 		DownloadURL:  logContent.DownloadURL,
695702
 		BackHref:     run.ActionsHref + "/runs/" + strconv.FormatInt(run.RunIndex, 10) + "#job-" + strconv.FormatInt(int64(job.JobIndex), 10),
696703
 	}
704
+	if !step.IsTerminal && logContent.Error == "" && logContent.DownloadURL == "" {
705
+		view.StreamHref = step.LogHref + "/log/stream?after=" + strconv.FormatInt(int64(logContent.LastSeq), 10)
706
+	}
697707
 	data := h.repoHeaderData(r, row, owner.Username, "actions")
698708
 	data["Title"] = step.Name + " · " + run.Title + " #" + strconv.FormatInt(run.RunIndex, 10)
699709
 	data["Log"] = view
@@ -811,6 +821,7 @@ func actionsStepDetailViewFromRow(row actionsdb.ListStepsForJobRow, owner, repoN
811821
 		StateClass:   stateClass,
812822
 		StateIcon:    stateIcon,
813823
 		Duration:     actionItemDuration(string(row.Status), string(actionsdb.WorkflowStepStatusQueued), row.StartedAt, row.CompletedAt, row.CreatedAt, row.UpdatedAt, now),
824
+		IsTerminal:   workflowStepTerminal(row.Status),
814825
 		LogByteCount: row.LogByteCount,
815826
 		LogHref: "/" + owner + "/" + repoName + "/actions/runs/" + strconv.FormatInt(runIndex, 10) +
816827
 			"/jobs/" + strconv.FormatInt(int64(jobIndex), 10) +
@@ -962,6 +973,12 @@ func workflowRunTerminal(status actionsdb.WorkflowRunStatus) bool {
962973
 	return status == actionsdb.WorkflowRunStatusCompleted || status == actionsdb.WorkflowRunStatusCancelled
963974
 }
964975
 
976
+func workflowStepTerminal(status actionsdb.WorkflowStepStatus) bool {
977
+	return status == actionsdb.WorkflowStepStatusCompleted ||
978
+		status == actionsdb.WorkflowStepStatusCancelled ||
979
+		status == actionsdb.WorkflowStepStatusSkipped
980
+}
981
+
965982
 func actionItemDuration(status string, queuedStatus string, startedAt, completedAt, createdAt, updatedAt pgtype.Timestamptz, now time.Time) string {
966983
 	if status == queuedStatus {
967984
 		return "—"
@@ -1013,6 +1030,7 @@ type actionsStepLogContent struct {
10131030
 	Source      string
10141031
 	Error       string
10151032
 	Truncated   bool
1033
+	LastSeq     int32
10161034
 	DownloadURL string
10171035
 }
10181036
 
@@ -1031,7 +1049,9 @@ func (h *Handlers) loadStepLogContent(ctx context.Context, stepID int64) (action
10311049
 	}
10321050
 	buf := bytes.NewBuffer(make([]byte, 0, minInt(actionsStepLogRenderLimit, int(step.LogByteCount)+1)))
10331051
 	truncated := false
1052
+	lastSeq := int32(-1)
10341053
 	for _, chunk := range chunks {
1054
+		lastSeq = chunk.Seq
10351055
 		if buf.Len() >= actionsStepLogRenderLimit {
10361056
 			truncated = true
10371057
 			break
@@ -1048,6 +1068,7 @@ func (h *Handlers) loadStepLogContent(ctx context.Context, stepID int64) (action
10481068
 		Text:      strings.ToValidUTF8(buf.String(), "\uFFFD"),
10491069
 		Source:    "SQL chunks",
10501070
 		Truncated: truncated,
1071
+		LastSeq:   lastSeq,
10511072
 	}, nil
10521073
 }
10531074
 
internal/web/handlers/repo/actions_dispatch.gomodified
@@ -9,7 +9,9 @@ import (
99
 	"errors"
1010
 	"fmt"
1111
 	"io"
12
+	"mime"
1213
 	"net/http"
14
+	"net/url"
1315
 	"strings"
1416
 
1517
 	"github.com/go-chi/chi/v5"
@@ -76,22 +78,10 @@ func (h *Handlers) repoActionsDispatch(w http.ResponseWriter, r *http.Request) {
7678
 		return
7779
 	}
7880
 
79
-	body, err := io.ReadAll(io.LimitReader(r.Body, dispatchMaxBody+1))
80
-	if err != nil {
81
-		h.d.Render.HTTPError(w, r, http.StatusBadRequest, "read body: "+err.Error())
82
-		return
83
-	}
84
-	if len(body) > dispatchMaxBody {
85
-		h.d.Render.HTTPError(w, r, http.StatusRequestEntityTooLarge, "body exceeds 64 KiB")
81
+	req, formPost, ok := h.parseDispatchRequest(w, r)
82
+	if !ok {
8683
 		return
8784
 	}
88
-	var req dispatchRequest
89
-	if len(body) > 0 {
90
-		if err := json.Unmarshal(body, &req); err != nil {
91
-			h.d.Render.HTTPError(w, r, http.StatusBadRequest, "invalid JSON body: "+err.Error())
92
-			return
93
-		}
94
-	}
9585
 
9686
 	ref := req.Ref
9787
 	if ref == "" {
@@ -142,6 +132,11 @@ func (h *Handlers) repoActionsDispatch(w http.ResponseWriter, r *http.Request) {
142132
 			"workflow does not declare on.workflow_dispatch")
143133
 		return
144134
 	}
135
+	inputs, err := normalizeDispatchInputs(req.Inputs, wf.On.WorkflowDispatch.Inputs)
136
+	if err != nil {
137
+		h.d.Render.HTTPError(w, r, http.StatusBadRequest, err.Error())
138
+		return
139
+	}
145140
 
146141
 	// Each dispatch click produces a fresh trigger_event_id with a
147142
 	// unique random suffix — the same workflow file at the same SHA
@@ -158,7 +153,7 @@ func (h *Handlers) repoActionsDispatch(w http.ResponseWriter, r *http.Request) {
158153
 	viewer := middleware.CurrentUserFromContext(r.Context())
159154
 	actorID := viewer.ID // 0 if anonymous, but RequireUser is in front of this route
160155
 
161
-	payload := actionsevent.WorkflowDispatch(req.Inputs)
156
+	payload := actionsevent.WorkflowDispatch(inputs)
162157
 	if _, err := trigger.Enqueue(r.Context(), trigger.Deps{Pool: h.d.Pool, Logger: h.d.Logger}, trigger.EnqueueParams{
163158
 		RepoID:         row.ID,
164159
 		WorkflowFile:   file,
@@ -174,9 +169,129 @@ func (h *Handlers) repoActionsDispatch(w http.ResponseWriter, r *http.Request) {
174169
 		h.d.Render.HTTPError(w, r, http.StatusInternalServerError, "")
175170
 		return
176171
 	}
172
+	if formPost {
173
+		redirectTo := "/" + owner.Username + "/" + row.Name + "/actions?workflow=" + url.QueryEscape(file) + "&event=workflow_dispatch"
174
+		http.Redirect(w, r, redirectTo, http.StatusSeeOther)
175
+		return
176
+	}
177177
 	w.WriteHeader(http.StatusNoContent)
178178
 }
179179
 
180
+func (h *Handlers) parseDispatchRequest(w http.ResponseWriter, r *http.Request) (dispatchRequest, bool, bool) {
181
+	if mediaType := dispatchFormMediaType(r); mediaType != "" {
182
+		r.Body = http.MaxBytesReader(w, r.Body, dispatchMaxBody)
183
+		if err := r.ParseForm(); err != nil {
184
+			h.d.Render.HTTPError(w, r, http.StatusBadRequest, "invalid form body: "+err.Error())
185
+			return dispatchRequest{}, true, false
186
+		}
187
+		return dispatchRequest{
188
+			Ref:    strings.TrimSpace(r.PostFormValue("ref")),
189
+			Inputs: dispatchInputsFromForm(r.PostForm),
190
+		}, true, true
191
+	}
192
+
193
+	body, err := io.ReadAll(io.LimitReader(r.Body, dispatchMaxBody+1))
194
+	if err != nil {
195
+		h.d.Render.HTTPError(w, r, http.StatusBadRequest, "read body: "+err.Error())
196
+		return dispatchRequest{}, false, false
197
+	}
198
+	if len(body) > dispatchMaxBody {
199
+		h.d.Render.HTTPError(w, r, http.StatusRequestEntityTooLarge, "body exceeds 64 KiB")
200
+		return dispatchRequest{}, false, false
201
+	}
202
+	var req dispatchRequest
203
+	if len(body) > 0 {
204
+		if err := json.Unmarshal(body, &req); err != nil {
205
+			h.d.Render.HTTPError(w, r, http.StatusBadRequest, "invalid JSON body: "+err.Error())
206
+			return dispatchRequest{}, false, false
207
+		}
208
+	}
209
+	return req, false, true
210
+}
211
+
212
+func dispatchFormMediaType(r *http.Request) string {
213
+	mediaType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
214
+	if err != nil {
215
+		return ""
216
+	}
217
+	switch mediaType {
218
+	case "application/x-www-form-urlencoded":
219
+		return mediaType
220
+	default:
221
+		return ""
222
+	}
223
+}
224
+
225
+func dispatchInputsFromForm(values url.Values) map[string]string {
226
+	inputs := make(map[string]string)
227
+	for key, vals := range values {
228
+		name, ok := strings.CutPrefix(key, "inputs.")
229
+		if !ok || name == "" || len(vals) == 0 {
230
+			continue
231
+		}
232
+		inputs[name] = vals[len(vals)-1]
233
+	}
234
+	if len(inputs) == 0 {
235
+		return nil
236
+	}
237
+	return inputs
238
+}
239
+
240
+func normalizeDispatchInputs(raw map[string]string, specs []workflow.DispatchInput) (map[string]string, error) {
241
+	if raw == nil {
242
+		raw = map[string]string{}
243
+	}
244
+	known := make(map[string]workflow.DispatchInput, len(specs))
245
+	for _, spec := range specs {
246
+		known[spec.Name] = spec
247
+	}
248
+	for name := range raw {
249
+		if _, ok := known[name]; !ok {
250
+			return nil, fmt.Errorf("unknown workflow_dispatch input %q", name)
251
+		}
252
+	}
253
+
254
+	out := make(map[string]string, len(specs))
255
+	for _, spec := range specs {
256
+		value, provided := raw[spec.Name]
257
+		if !provided || value == "" {
258
+			value = spec.Default
259
+		}
260
+		if spec.Type == "boolean" && !provided && value == "" {
261
+			value = "false"
262
+		}
263
+		if spec.Required && spec.Type != "boolean" && strings.TrimSpace(value) == "" {
264
+			return nil, fmt.Errorf("workflow_dispatch input %q is required", spec.Name)
265
+		}
266
+		switch spec.Type {
267
+		case "boolean":
268
+			if value != "true" && value != "false" {
269
+				return nil, fmt.Errorf("workflow_dispatch input %q must be true or false", spec.Name)
270
+			}
271
+		case "choice":
272
+			if value != "" && !dispatchChoiceAllowed(value, spec.Options) {
273
+				return nil, fmt.Errorf("workflow_dispatch input %q must be one of the declared options", spec.Name)
274
+			}
275
+		}
276
+		if value != "" || spec.Type == "boolean" {
277
+			out[spec.Name] = value
278
+		}
279
+	}
280
+	if len(out) == 0 {
281
+		return nil, nil
282
+	}
283
+	return out, nil
284
+}
285
+
286
+func dispatchChoiceAllowed(value string, options []string) bool {
287
+	for _, option := range options {
288
+		if value == option {
289
+			return true
290
+		}
291
+	}
292
+	return false
293
+}
294
+
180295
 // validWorkflowName guards against URL parameter shenanigans by
181296
 // requiring the resolved file path to look like
182297
 // `.shithub/workflows/<basename>.{yml,yaml}` with no path traversal.
internal/web/handlers/repo/actions_dispatch_ui.goadded
@@ -0,0 +1,130 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package repo
4
+
5
+import (
6
+	"context"
7
+	"errors"
8
+	"net/url"
9
+	"os"
10
+	"path"
11
+	"sort"
12
+
13
+	"github.com/tenseleyFlow/shithub/internal/actions/trigger"
14
+	"github.com/tenseleyFlow/shithub/internal/actions/workflow"
15
+	"github.com/tenseleyFlow/shithub/internal/auth/policy"
16
+	repogit "github.com/tenseleyFlow/shithub/internal/repos/git"
17
+	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
18
+	"github.com/tenseleyFlow/shithub/internal/web/middleware"
19
+)
20
+
21
+type actionsDispatchWorkflowView struct {
22
+	File         string
23
+	Name         string
24
+	DispatchHref string
25
+	Inputs       []actionsDispatchInputView
26
+}
27
+
28
+type actionsDispatchInputView struct {
29
+	Name        string
30
+	Description string
31
+	Type        string
32
+	Default     string
33
+	Required    bool
34
+	IsBoolean   bool
35
+	IsChoice    bool
36
+	Options     []actionsDispatchOptionView
37
+}
38
+
39
+type actionsDispatchOptionView struct {
40
+	Value    string
41
+	Selected bool
42
+}
43
+
44
+func (h *Handlers) actionsDispatchWorkflowViews(ctx context.Context, row reposdb.Repo, owner string) ([]actionsDispatchWorkflowView, error) {
45
+	viewer := middleware.CurrentUserFromContext(ctx)
46
+	if viewer.IsAnonymous() {
47
+		return nil, nil
48
+	}
49
+	dec := policy.Can(ctx, policy.Deps{Pool: h.d.Pool}, viewer.PolicyActor(), policy.ActionRepoWrite, policy.NewRepoRefFromRepo(row))
50
+	if !dec.Allow {
51
+		return nil, nil
52
+	}
53
+	if row.DefaultBranch == "" {
54
+		return nil, nil
55
+	}
56
+	gitDir, err := h.d.RepoFS.RepoPath(owner, row.Name)
57
+	if err != nil {
58
+		return nil, err
59
+	}
60
+	if _, err := os.Stat(gitDir); err != nil {
61
+		if errors.Is(err, os.ErrNotExist) {
62
+			return nil, nil
63
+		}
64
+		return nil, err
65
+	}
66
+	headSHA, err := repogit.ResolveRefOID(ctx, gitDir, row.DefaultBranch)
67
+	if err != nil {
68
+		if errors.Is(err, repogit.ErrRefNotFound) {
69
+			return nil, nil
70
+		}
71
+		return nil, err
72
+	}
73
+	files, _, err := trigger.Discover(ctx, gitDir, headSHA)
74
+	if err != nil {
75
+		return nil, err
76
+	}
77
+
78
+	views := make([]actionsDispatchWorkflowView, 0, len(files))
79
+	for _, file := range files {
80
+		wf, diags, err := workflow.Parse(file.Bytes)
81
+		if err != nil || workflowHasErrorDiagnostics(diags) || wf.On.WorkflowDispatch == nil {
82
+			continue
83
+		}
84
+		views = append(views, actionsDispatchWorkflowView{
85
+			File:         file.Path,
86
+			Name:         workflowDisplayName(wf.Name, file.Path),
87
+			DispatchHref: "/" + owner + "/" + row.Name + "/actions/workflows/" + url.PathEscape(path.Base(file.Path)) + "/dispatches",
88
+			Inputs:       actionsDispatchInputViews(wf.On.WorkflowDispatch.Inputs),
89
+		})
90
+	}
91
+	sort.SliceStable(views, func(i, j int) bool {
92
+		if views[i].Name == views[j].Name {
93
+			return views[i].File < views[j].File
94
+		}
95
+		return views[i].Name < views[j].Name
96
+	})
97
+	return views, nil
98
+}
99
+
100
+func workflowHasErrorDiagnostics(diags []workflow.Diagnostic) bool {
101
+	for _, d := range diags {
102
+		if d.Severity == workflow.Error {
103
+			return true
104
+		}
105
+	}
106
+	return false
107
+}
108
+
109
+func actionsDispatchInputViews(inputs []workflow.DispatchInput) []actionsDispatchInputView {
110
+	views := make([]actionsDispatchInputView, 0, len(inputs))
111
+	for _, input := range inputs {
112
+		view := actionsDispatchInputView{
113
+			Name:        input.Name,
114
+			Description: input.Description,
115
+			Type:        input.Type,
116
+			Default:     input.Default,
117
+			Required:    input.Required,
118
+			IsBoolean:   input.Type == "boolean",
119
+			IsChoice:    input.Type == "choice",
120
+		}
121
+		for _, option := range input.Options {
122
+			view.Options = append(view.Options, actionsDispatchOptionView{
123
+				Value:    option,
124
+				Selected: input.Default == option,
125
+			})
126
+		}
127
+		views = append(views, view)
128
+	}
129
+	return views
130
+}
internal/web/handlers/repo/actions_log_stream.goadded
@@ -0,0 +1,292 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package repo
4
+
5
+import (
6
+	"context"
7
+	"encoding/base64"
8
+	"encoding/json"
9
+	"errors"
10
+	"fmt"
11
+	"net/http"
12
+	"strconv"
13
+	"time"
14
+
15
+	"github.com/jackc/pgx/v5"
16
+
17
+	"github.com/tenseleyFlow/shithub/internal/actions/logstream"
18
+	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
19
+	"github.com/tenseleyFlow/shithub/internal/auth/policy"
20
+	"github.com/tenseleyFlow/shithub/internal/ratelimit"
21
+	"github.com/tenseleyFlow/shithub/internal/web/middleware"
22
+)
23
+
24
+const (
25
+	actionsLogStreamBatchSize      = int32(100)
26
+	actionsLogStreamHeartbeatEvery = 20 * time.Second
27
+	actionsLogStreamReleaseTimeout = 3 * time.Second
28
+)
29
+
30
+var actionsLogStreamLimit = ratelimit.Policy{
31
+	Scope:  "actions:logtail",
32
+	Max:    5,
33
+	Window: 2 * time.Minute,
34
+}
35
+
36
+type actionsLogStreamChunk struct {
37
+	Seq      int32  `json:"seq"`
38
+	ChunkB64 string `json:"chunk_b64"`
39
+}
40
+
41
+func (h *Handlers) repoActionStepLogStream(w http.ResponseWriter, r *http.Request) {
42
+	row, owner, ok := h.loadRepoAndAuthorize(w, r, policy.ActionRepoRead)
43
+	if !ok {
44
+		return
45
+	}
46
+	runIndex, ok := parsePositiveInt64Param(r, "runIndex")
47
+	if !ok {
48
+		h.d.Render.HTTPError(w, r, http.StatusNotFound, "")
49
+		return
50
+	}
51
+	jobIndex, ok := parseNonNegativeInt32Param(r, "jobIndex")
52
+	if !ok {
53
+		h.d.Render.HTTPError(w, r, http.StatusNotFound, "")
54
+		return
55
+	}
56
+	stepIndex, ok := parseNonNegativeInt32Param(r, "stepIndex")
57
+	if !ok {
58
+		h.d.Render.HTTPError(w, r, http.StatusNotFound, "")
59
+		return
60
+	}
61
+	lastSeq, ok := parseLogStreamAfter(r)
62
+	if !ok {
63
+		h.d.Render.HTTPError(w, r, http.StatusBadRequest, "invalid Last-Event-ID")
64
+		return
65
+	}
66
+
67
+	run, err := h.loadActionsRunDetail(r.Context(), row.ID, owner.Username, row.Name, runIndex)
68
+	if err != nil {
69
+		if errors.Is(err, pgx.ErrNoRows) {
70
+			h.d.Render.HTTPError(w, r, http.StatusNotFound, "")
71
+		} else {
72
+			h.d.Logger.WarnContext(r.Context(), "repo actions: get run for step log stream", "repo_id", row.ID, "run_index", runIndex, "error", err)
73
+			h.d.Render.HTTPError(w, r, http.StatusInternalServerError, "")
74
+		}
75
+		return
76
+	}
77
+	_, step, ok := findActionStep(run, jobIndex, stepIndex)
78
+	if !ok {
79
+		h.d.Render.HTTPError(w, r, http.StatusNotFound, "")
80
+		return
81
+	}
82
+
83
+	flusher, ok := w.(http.Flusher)
84
+	if !ok {
85
+		h.d.Render.HTTPError(w, r, http.StatusInternalServerError, "streaming is not supported")
86
+		return
87
+	}
88
+
89
+	lease, decision, leaseErr := h.acquireLogStreamLease(r.Context(), w, r)
90
+	if leaseErr != nil && h.d.Logger != nil {
91
+		h.d.Logger.WarnContext(r.Context(), "repo actions: log stream rate-limit failed", "step_id", step.ID, "error", leaseErr)
92
+	}
93
+	if !decision.Allowed {
94
+		w.Header().Set("Retry-After", strconv.Itoa(int(decision.RetryAfter/time.Second)))
95
+		h.d.Render.HTTPError(w, r, http.StatusTooManyRequests, "too many live log streams")
96
+		return
97
+	}
98
+	if lease != nil {
99
+		defer func() {
100
+			ctx, cancel := context.WithTimeout(context.Background(), actionsLogStreamReleaseTimeout)
101
+			defer cancel()
102
+			if err := lease.Release(ctx); err != nil && h.d.Logger != nil {
103
+				h.d.Logger.WarnContext(r.Context(), "repo actions: release log stream lease", "step_id", step.ID, "error", err)
104
+			}
105
+		}()
106
+	}
107
+
108
+	conn, err := h.d.Pool.Acquire(r.Context())
109
+	if err != nil {
110
+		h.d.Logger.WarnContext(r.Context(), "repo actions: acquire log stream conn", "step_id", step.ID, "error", err)
111
+		h.d.Render.HTTPError(w, r, http.StatusInternalServerError, "")
112
+		return
113
+	}
114
+	defer conn.Release()
115
+	if _, err := conn.Exec(r.Context(), logstream.ListenSQL(step.ID)); err != nil {
116
+		h.d.Logger.WarnContext(r.Context(), "repo actions: listen log stream", "step_id", step.ID, "error", err)
117
+		h.d.Render.HTTPError(w, r, http.StatusInternalServerError, "")
118
+		return
119
+	}
120
+
121
+	w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
122
+	w.Header().Set("Cache-Control", "no-cache, no-transform")
123
+	w.Header().Set("Connection", "keep-alive")
124
+	w.Header().Set("X-Accel-Buffering", "no")
125
+	w.WriteHeader(http.StatusOK)
126
+
127
+	nextSeq, err := h.flushStepLogChunks(r.Context(), w, conn, step.ID, lastSeq)
128
+	if err != nil {
129
+		h.d.Logger.WarnContext(r.Context(), "repo actions: write initial log chunks", "step_id", step.ID, "error", err)
130
+		return
131
+	}
132
+	done, err := h.stepLogStreamDone(r.Context(), conn, step.ID)
133
+	if err != nil {
134
+		h.d.Logger.WarnContext(r.Context(), "repo actions: check step terminal", "step_id", step.ID, "error", err)
135
+		return
136
+	}
137
+	if done {
138
+		_ = writeSSEEvent(w, "done", -1, []byte(`{}`))
139
+		flusher.Flush()
140
+		return
141
+	}
142
+	flusher.Flush()
143
+
144
+	for {
145
+		waitCtx, cancel := context.WithTimeout(r.Context(), actionsLogStreamHeartbeatEvery)
146
+		notification, err := conn.Conn().WaitForNotification(waitCtx)
147
+		cancel()
148
+		if r.Context().Err() != nil {
149
+			return
150
+		}
151
+		if errors.Is(err, context.DeadlineExceeded) {
152
+			nextSeq, err = h.flushStepLogChunks(r.Context(), w, conn, step.ID, nextSeq)
153
+			if err != nil {
154
+				h.d.Logger.WarnContext(r.Context(), "repo actions: write heartbeat log chunks", "step_id", step.ID, "error", err)
155
+				return
156
+			}
157
+			done, err := h.stepLogStreamDone(r.Context(), conn, step.ID)
158
+			if err != nil {
159
+				h.d.Logger.WarnContext(r.Context(), "repo actions: heartbeat terminal check", "step_id", step.ID, "error", err)
160
+				return
161
+			}
162
+			if done {
163
+				_ = writeSSEEvent(w, "done", -1, []byte(`{}`))
164
+				flusher.Flush()
165
+				return
166
+			}
167
+			if _, err := fmt.Fprint(w, ": keep-alive\n\n"); err != nil {
168
+				return
169
+			}
170
+			flusher.Flush()
171
+			continue
172
+		}
173
+		if err != nil {
174
+			h.d.Logger.WarnContext(r.Context(), "repo actions: wait log notification", "step_id", step.ID, "error", err)
175
+			return
176
+		}
177
+		if notification.Channel != logstream.Channel(step.ID) {
178
+			continue
179
+		}
180
+		_, done, ok := logstream.ParsePayload(notification.Payload)
181
+		if !ok {
182
+			h.d.Logger.WarnContext(r.Context(), "repo actions: invalid log notification", "step_id", step.ID, "payload", notification.Payload)
183
+			continue
184
+		}
185
+		nextSeq, err = h.flushStepLogChunks(r.Context(), w, conn, step.ID, nextSeq)
186
+		if err != nil {
187
+			h.d.Logger.WarnContext(r.Context(), "repo actions: write log chunks", "step_id", step.ID, "error", err)
188
+			return
189
+		}
190
+		if done {
191
+			_ = writeSSEEvent(w, "done", -1, []byte(`{}`))
192
+			flusher.Flush()
193
+			return
194
+		}
195
+		flusher.Flush()
196
+	}
197
+}
198
+
199
+func (h *Handlers) acquireLogStreamLease(ctx context.Context, w http.ResponseWriter, r *http.Request) (*ratelimit.Lease, ratelimit.Decision, error) {
200
+	if h.d.RateLimiter == nil {
201
+		return nil, ratelimit.Decision{Allowed: true}, nil
202
+	}
203
+	key := logStreamRateLimitKey(r)
204
+	if key == "" {
205
+		return nil, ratelimit.Decision{Allowed: true}, nil
206
+	}
207
+	lease, decision, err := h.d.RateLimiter.AcquireLease(ctx, actionsLogStreamLimit, key)
208
+	ratelimit.StampHeaders(w, decision)
209
+	return lease, decision, err
210
+}
211
+
212
+func logStreamRateLimitKey(r *http.Request) string {
213
+	viewer := middleware.CurrentUserFromContext(r.Context())
214
+	if !viewer.IsAnonymous() {
215
+		return "u:" + strconv.FormatInt(viewer.ID, 10)
216
+	}
217
+	if ip, ok := ratelimit.ClientIP(r, true); ok {
218
+		return "ip:" + ip.String()
219
+	}
220
+	return ""
221
+}
222
+
223
+func parseLogStreamAfter(r *http.Request) (int32, bool) {
224
+	raw := r.Header.Get("Last-Event-ID")
225
+	if raw == "" {
226
+		raw = r.URL.Query().Get("after")
227
+	}
228
+	if raw == "" {
229
+		return -1, true
230
+	}
231
+	n, err := strconv.ParseInt(raw, 10, 32)
232
+	if err != nil || n < -1 {
233
+		return 0, false
234
+	}
235
+	return int32(n), true
236
+}
237
+
238
+func (h *Handlers) flushStepLogChunks(ctx context.Context, w http.ResponseWriter, db actionsdb.DBTX, stepID int64, afterSeq int32) (int32, error) {
239
+	q := actionsdb.New()
240
+	nextSeq := afterSeq
241
+	for {
242
+		chunks, err := q.ListStepLogChunks(ctx, db, actionsdb.ListStepLogChunksParams{
243
+			StepID: stepID,
244
+			Seq:    nextSeq,
245
+			Limit:  actionsLogStreamBatchSize,
246
+		})
247
+		if err != nil {
248
+			return nextSeq, err
249
+		}
250
+		for _, chunk := range chunks {
251
+			payload, err := json.Marshal(actionsLogStreamChunk{
252
+				Seq:      chunk.Seq,
253
+				ChunkB64: base64.StdEncoding.EncodeToString(chunk.Chunk),
254
+			})
255
+			if err != nil {
256
+				return nextSeq, err
257
+			}
258
+			if err := writeSSEEvent(w, "chunk", chunk.Seq, payload); err != nil {
259
+				return nextSeq, err
260
+			}
261
+			nextSeq = chunk.Seq
262
+		}
263
+		if int32(len(chunks)) < actionsLogStreamBatchSize {
264
+			return nextSeq, nil
265
+		}
266
+	}
267
+}
268
+
269
+func (h *Handlers) stepLogStreamDone(ctx context.Context, db actionsdb.DBTX, stepID int64) (bool, error) {
270
+	step, err := actionsdb.New().GetWorkflowStepByID(ctx, db, stepID)
271
+	if err != nil {
272
+		return false, err
273
+	}
274
+	return workflowStepTerminal(step.Status), nil
275
+}
276
+
277
+func writeSSEEvent(w http.ResponseWriter, event string, id int32, payload []byte) error {
278
+	if id >= 0 {
279
+		if _, err := fmt.Fprintf(w, "id: %d\n", id); err != nil {
280
+			return err
281
+		}
282
+	}
283
+	if event != "" {
284
+		if _, err := fmt.Fprintf(w, "event: %s\n", event); err != nil {
285
+			return err
286
+		}
287
+	}
288
+	if _, err := fmt.Fprintf(w, "data: %s\n\n", payload); err != nil {
289
+		return err
290
+	}
291
+	return nil
292
+}
internal/web/handlers/repo/actions_test.gomodified
@@ -5,8 +5,10 @@ package repo
55
 import (
66
 	"bytes"
77
 	"context"
8
+	"encoding/json"
89
 	"net/http"
910
 	"net/http/httptest"
11
+	"net/url"
1012
 	"strconv"
1113
 	"strings"
1214
 	"testing"
@@ -16,7 +18,9 @@ import (
1618
 	"github.com/jackc/pgx/v5/pgtype"
1719
 
1820
 	actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
21
+	"github.com/tenseleyFlow/shithub/internal/actions/workflow"
1922
 	"github.com/tenseleyFlow/shithub/internal/infra/storage"
23
+	repogit "github.com/tenseleyFlow/shithub/internal/repos/git"
2024
 	"github.com/tenseleyFlow/shithub/internal/web/middleware"
2125
 )
2226
 
@@ -132,6 +136,94 @@ func TestRepoTabActionsPaginatesTwentyRuns(t *testing.T) {
132136
 	}
133137
 }
134138
 
139
+func TestRepoTabActionsRendersDispatchWorkflowsForWriters(t *testing.T) {
140
+	t.Parallel()
141
+	f := newRepoFixture(t)
142
+	f.seedWorkflowFile(t, "manual.yml", dispatchWorkflowFixture)
143
+
144
+	resp := httptest.NewRecorder()
145
+	req := httptest.NewRequest(http.MethodGet, "/alice/public-repo/actions", nil)
146
+	f.actionsMux(viewerFor(f.owner)).ServeHTTP(resp, req)
147
+	if resp.Code != http.StatusOK {
148
+		t.Fatalf("owner status=%d body=%s", resp.Code, resp.Body.String())
149
+	}
150
+	body := resp.Body.String()
151
+	for _, want := range []string{
152
+		"DISPATCH=Manual:/alice/public-repo/actions/workflows/manual.yml/dispatches:",
153
+		"env/choice/true//staging|prod|,",
154
+		"dry_run/boolean/false/true/,",
155
+	} {
156
+		if !strings.Contains(body, want) {
157
+			t.Fatalf("owner body missing %q in %s", want, body)
158
+		}
159
+	}
160
+
161
+	resp = httptest.NewRecorder()
162
+	req = httptest.NewRequest(http.MethodGet, "/alice/public-repo/actions", nil)
163
+	f.actionsMux(viewerFor(f.stranger)).ServeHTTP(resp, req)
164
+	if resp.Code != http.StatusOK {
165
+		t.Fatalf("stranger status=%d body=%s", resp.Code, resp.Body.String())
166
+	}
167
+	if strings.Contains(resp.Body.String(), "DISPATCH=") {
168
+		t.Fatalf("dispatch controls leaked to non-writer: %s", resp.Body.String())
169
+	}
170
+}
171
+
172
+func TestRepoActionsDispatchAcceptsFormInputs(t *testing.T) {
173
+	t.Parallel()
174
+	f := newRepoFixture(t)
175
+	f.seedWorkflowFile(t, "manual.yml", dispatchWorkflowFixture)
176
+
177
+	form := url.Values{}
178
+	form.Set("ref", "trunk")
179
+	form.Set("inputs.env", "prod")
180
+	req := httptest.NewRequest(http.MethodPost, "/alice/public-repo/actions/workflows/manual.yml/dispatches", strings.NewReader(form.Encode()))
181
+	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
182
+	resp := httptest.NewRecorder()
183
+	f.actionsMux(viewerFor(f.owner)).ServeHTTP(resp, req)
184
+	if resp.Code != http.StatusSeeOther {
185
+		t.Fatalf("status=%d body=%s", resp.Code, resp.Body.String())
186
+	}
187
+	if loc := resp.Header().Get("Location"); loc != "/alice/public-repo/actions?workflow=.shithub%2Fworkflows%2Fmanual.yml&event=workflow_dispatch" {
188
+		t.Fatalf("Location=%q", loc)
189
+	}
190
+
191
+	var raw []byte
192
+	err := f.pool.QueryRow(context.Background(), `
193
+		SELECT event_payload
194
+		FROM workflow_runs
195
+		WHERE repo_id = $1 AND workflow_file = '.shithub/workflows/manual.yml'`,
196
+		f.publicRepo.ID,
197
+	).Scan(&raw)
198
+	if err != nil {
199
+		t.Fatalf("select workflow dispatch run: %v", err)
200
+	}
201
+	var payload map[string]map[string]string
202
+	if err := json.Unmarshal(raw, &payload); err != nil {
203
+		t.Fatalf("payload json: %v", err)
204
+	}
205
+	if got := payload["inputs"]["env"]; got != "prod" {
206
+		t.Fatalf("env input=%q", got)
207
+	}
208
+	if got := payload["inputs"]["dry_run"]; got != "true" {
209
+		t.Fatalf("dry_run default=%q", got)
210
+	}
211
+}
212
+
213
+func TestNormalizeDispatchInputsRejectsUnknownAndInvalidChoice(t *testing.T) {
214
+	t.Parallel()
215
+	specs := dispatchWorkflowInputSpecs()
216
+	if _, err := normalizeDispatchInputs(map[string]string{"bogus": "x"}, specs); err == nil {
217
+		t.Fatal("unknown input accepted")
218
+	}
219
+	if _, err := normalizeDispatchInputs(map[string]string{"env": "qa"}, specs); err == nil {
220
+		t.Fatal("invalid choice accepted")
221
+	}
222
+	if _, err := normalizeDispatchInputs(nil, specs); err == nil {
223
+		t.Fatal("missing required input accepted")
224
+	}
225
+}
226
+
135227
 func TestRepoActionRunRendersWorkflowRunJobsAndSteps(t *testing.T) {
136228
 	t.Parallel()
137229
 	f := newRepoFixture(t)
@@ -283,6 +375,7 @@ func TestRepoActionStepLogRendersSQLChunks(t *testing.T) {
283375
 	body := resp.Body.String()
284376
 	for _, want := range []string{
285377
 		"STEPLOG=Build:Run tests:SQL chunks::false;",
378
+		"STREAM=/alice/public-repo/actions/runs/9/jobs/0/steps/0/log/stream?after=1;",
286379
 		"LOG=hello\nworld\n;",
287380
 	} {
288381
 		if !strings.Contains(body, want) {
@@ -291,6 +384,72 @@ func TestRepoActionStepLogRendersSQLChunks(t *testing.T) {
291384
 	}
292385
 }
293386
 
387
+func TestRepoActionStepLogStreamResumesAndClosesForTerminalStep(t *testing.T) {
388
+	t.Parallel()
389
+	f := newRepoFixture(t)
390
+	now := time.Date(2026, 5, 11, 12, 0, 0, 0, time.UTC)
391
+	runID := f.insertWorkflowRun(t, workflowRunFixture{
392
+		RunIndex:      11,
393
+		WorkflowFile:  ".shithub/workflows/ci.yml",
394
+		WorkflowName:  "CI",
395
+		HeadRef:       "trunk",
396
+		Event:         actionsdb.WorkflowRunEventPush,
397
+		Status:        actionsdb.WorkflowRunStatusCompleted,
398
+		Conclusion:    actionsdb.CheckConclusionSuccess,
399
+		ActorUserID:   f.owner.ID,
400
+		CreatedOffset: -5 * time.Minute,
401
+		StartedOffset: -4 * time.Minute,
402
+		DoneOffset:    -1 * time.Minute,
403
+	}, now)
404
+	jobID := f.insertWorkflowJob(t, workflowJobFixture{
405
+		RunID:       runID,
406
+		JobIndex:    0,
407
+		JobKey:      "build",
408
+		JobName:     "Build",
409
+		RunsOn:      "ubuntu-latest",
410
+		Status:      actionsdb.WorkflowJobStatusCompleted,
411
+		Conclusion:  actionsdb.CheckConclusionSuccess,
412
+		StartedAt:   now.Add(-4 * time.Minute),
413
+		CompletedAt: now.Add(-1 * time.Minute),
414
+	})
415
+	stepID := f.insertWorkflowStep(t, workflowStepFixture{
416
+		JobID:       jobID,
417
+		StepIndex:   0,
418
+		StepName:    "Run",
419
+		RunCommand:  "printf done",
420
+		Status:      actionsdb.WorkflowStepStatusCompleted,
421
+		Conclusion:  actionsdb.CheckConclusionSuccess,
422
+		StartedAt:   now.Add(-3 * time.Minute),
423
+		CompletedAt: now.Add(-1 * time.Minute),
424
+	})
425
+	f.insertStepLogChunk(t, stepID, 0, "hello\n")
426
+	f.insertStepLogChunk(t, stepID, 1, "world\n")
427
+
428
+	resp := httptest.NewRecorder()
429
+	req := httptest.NewRequest(http.MethodGet, "/alice/public-repo/actions/runs/11/jobs/0/steps/0/log/stream?after=0", nil)
430
+	f.actionsMux(viewerFor(f.owner)).ServeHTTP(resp, req)
431
+	if resp.Code != http.StatusOK {
432
+		t.Fatalf("status=%d body=%s", resp.Code, resp.Body.String())
433
+	}
434
+	if ct := resp.Header().Get("Content-Type"); !strings.HasPrefix(ct, "text/event-stream") {
435
+		t.Fatalf("content-type=%q", ct)
436
+	}
437
+	body := resp.Body.String()
438
+	for _, want := range []string{
439
+		"id: 1\n",
440
+		"event: chunk\n",
441
+		`"chunk_b64":"d29ybGQK"`,
442
+		"event: done\n",
443
+	} {
444
+		if !strings.Contains(body, want) {
445
+			t.Fatalf("stream body missing %q in %s", want, body)
446
+		}
447
+	}
448
+	if strings.Contains(body, "aGVsbG8K") {
449
+		t.Fatalf("stream replayed chunk before Last-Event-ID: %s", body)
450
+	}
451
+}
452
+
294453
 func TestRepoActionStepLogRendersArchivedObject(t *testing.T) {
295454
 	t.Parallel()
296455
 	f := newRepoFixture(t)
@@ -365,13 +524,80 @@ func (f *repoFixture) actionsMux(viewer middleware.CurrentUser) http.Handler {
365524
 			next.ServeHTTP(w, r.WithContext(middleware.WithCurrentUserForTest(r.Context(), viewer)))
366525
 		})
367526
 	})
527
+	mux.Get("/{owner}/{repo}/actions/runs/{runIndex}/jobs/{jobIndex}/steps/{stepIndex}/log/stream", f.handlers.repoActionStepLogStream)
368528
 	mux.Get("/{owner}/{repo}/actions/runs/{runIndex}/jobs/{jobIndex}/steps/{stepIndex}", f.handlers.repoActionStepLog)
369529
 	mux.Get("/{owner}/{repo}/actions/runs/{runIndex}/status", f.handlers.repoActionRunStatus)
370530
 	mux.Get("/{owner}/{repo}/actions/runs/{runIndex}", f.handlers.repoActionRun)
531
+	mux.Post("/{owner}/{repo}/actions/workflows/{file}/dispatches", f.handlers.repoActionsDispatch)
371532
 	mux.Get("/{owner}/{repo}/actions", f.handlers.repoTabActions)
372533
 	return mux
373534
 }
374535
 
536
+const dispatchWorkflowFixture = `name: Manual
537
+on:
538
+  workflow_dispatch:
539
+    inputs:
540
+      env:
541
+        description: Environment
542
+        required: true
543
+        type: choice
544
+        options:
545
+          - staging
546
+          - prod
547
+      dry_run:
548
+        description: Dry run
549
+        type: boolean
550
+        default: "true"
551
+jobs:
552
+  build:
553
+    runs-on: ubuntu-latest
554
+    steps:
555
+      - run: echo hello
556
+`
557
+
558
+func dispatchWorkflowInputSpecs() []workflow.DispatchInput {
559
+	return []workflow.DispatchInput{
560
+		{
561
+			Name:     "env",
562
+			Type:     "choice",
563
+			Required: true,
564
+			Options:  []string{"staging", "prod"},
565
+		},
566
+		{
567
+			Name:    "dry_run",
568
+			Type:    "boolean",
569
+			Default: "true",
570
+		},
571
+	}
572
+}
573
+
574
+func (f *repoFixture) seedWorkflowFile(t *testing.T, name, body string) string {
575
+	t.Helper()
576
+	ctx := context.Background()
577
+	gitDir, err := f.handlers.d.RepoFS.RepoPath(f.owner.Username, f.publicRepo.Name)
578
+	if err != nil {
579
+		t.Fatalf("RepoPath: %v", err)
580
+	}
581
+	if err := f.handlers.d.RepoFS.InitBare(ctx, gitDir); err != nil {
582
+		t.Fatalf("InitBare: %v", err)
583
+	}
584
+	commit, err := (repogit.InitialCommit{
585
+		GitDir:      gitDir,
586
+		AuthorName:  "Alice",
587
+		AuthorEmail: "alice@example.test",
588
+		Branch:      "trunk",
589
+		Message:     "Add workflow",
590
+		When:        time.Date(2026, 5, 11, 12, 0, 0, 0, time.UTC),
591
+		Files: []repogit.FileEntry{
592
+			{Path: ".shithub/workflows/" + name, Body: []byte(body)},
593
+		},
594
+	}).Build(ctx)
595
+	if err != nil {
596
+		t.Fatalf("InitialCommit.Build: %v", err)
597
+	}
598
+	return commit
599
+}
600
+
375601
 type workflowRunFixture struct {
376602
 	RunIndex      int64
377603
 	WorkflowFile  string
internal/web/handlers/repo/repo.gomodified
@@ -29,6 +29,7 @@ import (
2929
 	"github.com/tenseleyFlow/shithub/internal/orgs"
3030
 	orgsdb "github.com/tenseleyFlow/shithub/internal/orgs/sqlc"
3131
 	pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc"
32
+	"github.com/tenseleyFlow/shithub/internal/ratelimit"
3233
 	"github.com/tenseleyFlow/shithub/internal/repos"
3334
 	repogit "github.com/tenseleyFlow/shithub/internal/repos/git"
3435
 	reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
@@ -71,6 +72,7 @@ type Deps struct {
7172
 	ObjectStore storage.ObjectStore
7273
 	Audit       *audit.Recorder
7374
 	Limiter     *throttle.Limiter
75
+	RateLimiter *ratelimit.Limiter
7476
 	CloneURLs   CloneURLs
7577
 	// SecretBox AEAD-wraps webhook secrets at rest (S33). nil disables
7678
 	// the webhook surface (the handler renders a placeholder page).
@@ -109,6 +111,9 @@ func New(d Deps) (*Handlers, error) {
109111
 	if d.Limiter == nil {
110112
 		d.Limiter = throttle.NewLimiter()
111113
 	}
114
+	if d.RateLimiter == nil {
115
+		d.RateLimiter = ratelimit.New(d.Pool)
116
+	}
112117
 	return &Handlers{d: d, rq: reposdb.New(), uq: usersdb.New(), iq: issuesdb.New(), pq: pullsdb.New(), cq: checksdb.New()}, nil
113118
 }
114119
 
@@ -132,6 +137,7 @@ func (h *Handlers) MountRepoActionsAPI(r chi.Router) {
132137
 // two-segment route doesn't collide with the /{username} catch-all from S09;
133138
 // caller is responsible for ordering this BEFORE /{username}.
134139
 func (h *Handlers) MountRepoHome(r chi.Router) {
140
+	r.Get("/{owner}/{repo}/actions/runs/{runIndex}/jobs/{jobIndex}/steps/{stepIndex}/log/stream", h.repoActionStepLogStream)
135141
 	r.Get("/{owner}/{repo}/actions/runs/{runIndex}/jobs/{jobIndex}/steps/{stepIndex}", h.repoActionStepLog)
136142
 	r.Get("/{owner}/{repo}/actions/runs/{runIndex}/status", h.repoActionRunStatus)
137143
 	r.Get("/{owner}/{repo}/actions/runs/{runIndex}", h.repoActionRun)
internal/web/handlers/repo/repo_test.gomodified
@@ -148,11 +148,11 @@ func minimalTemplatesFS() fstest.MapFS {
148148
 		"errors/429.html":              {Data: body},
149149
 		"errors/500.html":              {Data: body},
150150
 		"repo/new.html":                {Data: []byte(`{{ define "page" }}OWNERS={{ range .Owners }}{{ .Token }}:{{ if eq .Token $.Form.Owner }}selected{{ end }}:{{ .Slug }};{{ end }}{{ end }}`)},
151
-		"repo/actions.html":            {Data: []byte(`{{ define "page" }}COUNT={{ .RunCount }};FILTERED={{ .FilteredRunCount }};PAGE={{ .Pagination.ResultText }};{{ range .Workflows }}WF={{ .Name }}:{{ .Count }}:{{ .Active }};{{ end }}{{ range .Runs }}RUN={{ .Title }}:#{{ .RunIndex }}:{{ .Event }}:{{ .HeadRef }}:{{ .ActorUsername }}:{{ .StateClass }};{{ end }}{{ end }}`)},
151
+		"repo/actions.html":            {Data: []byte(`{{ define "page" }}COUNT={{ .RunCount }};FILTERED={{ .FilteredRunCount }};PAGE={{ .Pagination.ResultText }};{{ range .DispatchWorkflows }}DISPATCH={{ .Name }}:{{ .DispatchHref }}:{{ range .Inputs }}{{ .Name }}/{{ .Type }}/{{ .Required }}/{{ .Default }}/{{ range .Options }}{{ .Value }}|{{ end }},{{ end }};{{ end }}{{ range .Workflows }}WF={{ .Name }}:{{ .Count }}:{{ .Active }};{{ end }}{{ range .Runs }}RUN={{ .Title }}:#{{ .RunIndex }}:{{ .Event }}:{{ .HeadRef }}:{{ .ActorUsername }}:{{ .StateClass }};{{ end }}{{ end }}`)},
152152
 		"repo/_action_run_status.html": {Data: []byte(`{{ define "action-run-status" }}STATUS={{ .Run.StateClass }}:{{ .Run.IsTerminal }}:{{ .Run.StatusHref }};{{ end }}`)},
153153
 		"repo/action_run.html":         {Data: []byte(`{{ define "page" }}RUN={{ .Run.Title }}:#{{ .Run.RunIndex }}:{{ .Run.Event }}:{{ .Run.ActorUsername }}:{{ .Run.StateClass }};SUMMARY={{ .Run.JobCount }}:{{ .Run.CompletedCount }}:{{ .Run.FailureCount }}:{{ .Run.ArtifactCount }};{{ range .Run.Jobs }}JOB={{ .Name }}:{{ .StateClass }}:{{ .NeedsText }}:{{ .RunsOn }};{{ range .Steps }}STEP={{ .Name }}:{{ .StateClass }}:{{ .LogHref }};{{ end }}{{ end }}{{ end }}`)},
154154
 		"repo/action_run_status.html":  {Data: []byte(`{{ define "page" }}{{ template "action-run-status" . }}{{ end }}`)},
155
-		"repo/action_step_log.html":    {Data: []byte(`{{ define "page" }}STEPLOG={{ .Log.Job.Name }}:{{ .Log.Step.Name }}:{{ .Log.LogSource }}:{{ .Log.DownloadURL }}:{{ .Log.LogTruncated }};{{ with .Log.LogError }}ERROR={{ . }};{{ end }}LOG={{ .Log.LogText }};{{ end }}`)},
155
+		"repo/action_step_log.html":    {Data: []byte(`{{ define "page" }}STEPLOG={{ .Log.Job.Name }}:{{ .Log.Step.Name }}:{{ .Log.LogSource }}:{{ .Log.DownloadURL }}:{{ .Log.LogTruncated }};{{ with .Log.StreamHref }}STREAM={{ . }};{{ end }}{{ with .Log.LogError }}ERROR={{ . }};{{ end }}LOG={{ .Log.LogText }};{{ end }}`)},
156156
 		"repo/settings_secrets.html":   {Data: []byte(`{{ define "page" }}{{ with .Error }}ERROR={{ . }}{{ end }}{{ range .Secrets }}SECRET={{ .Name }};{{ end }}{{ range .Variables }}VAR={{ .Name }}:{{ .Value }};{{ end }}{{ end }}`)},
157157
 	}
158158
 }
internal/web/repo_wiring.gomodified
@@ -17,6 +17,7 @@ import (
1717
 	"github.com/tenseleyFlow/shithub/internal/auth/throttle"
1818
 	"github.com/tenseleyFlow/shithub/internal/infra/config"
1919
 	"github.com/tenseleyFlow/shithub/internal/infra/storage"
20
+	"github.com/tenseleyFlow/shithub/internal/ratelimit"
2021
 	repoh "github.com/tenseleyFlow/shithub/internal/web/handlers/repo"
2122
 	"github.com/tenseleyFlow/shithub/internal/web/render"
2223
 )
@@ -79,6 +80,7 @@ func buildRepoHandlers(
7980
 		ObjectStore:  objectStore,
8081
 		Audit:        audit.NewRecorder(),
8182
 		Limiter:      throttle.NewLimiter(),
83
+		RateLimiter:  ratelimit.New(pool),
8284
 		SecretBox:    hookBox,
8385
 		ShithubdPath: shithubdPath,
8486
 		CloneURLs: repoh.CloneURLs{
internal/web/static/css/shithub.cssmodified
@@ -4854,6 +4854,83 @@ button.shithub-repo-action {
48544854
   margin: 0.2rem 0 0;
48554855
   color: var(--fg-muted);
48564856
 }
4857
+.shithub-actions-head-actions {
4858
+  position: relative;
4859
+  display: flex;
4860
+  align-items: center;
4861
+  gap: 0.5rem;
4862
+}
4863
+.shithub-actions-dispatch {
4864
+  position: relative;
4865
+}
4866
+.shithub-actions-dispatch summary {
4867
+  list-style: none;
4868
+  cursor: pointer;
4869
+}
4870
+.shithub-actions-dispatch summary::-webkit-details-marker {
4871
+  display: none;
4872
+}
4873
+.shithub-actions-dispatch-menu {
4874
+  position: absolute;
4875
+  z-index: 20;
4876
+  top: calc(100% + 0.4rem);
4877
+  right: 0;
4878
+  width: min(24rem, calc(100vw - 2rem));
4879
+  max-height: min(38rem, calc(100vh - 8rem));
4880
+  overflow: auto;
4881
+  padding: 0.75rem;
4882
+  border: 1px solid var(--border-default);
4883
+  border-radius: 6px;
4884
+  background: var(--canvas-overlay, var(--canvas-default));
4885
+  box-shadow: var(--shadow-large, 0 8px 24px rgba(140, 149, 159, 0.2));
4886
+}
4887
+.shithub-actions-dispatch-form {
4888
+  display: grid;
4889
+  gap: 0.7rem;
4890
+  padding-bottom: 0.75rem;
4891
+  border-bottom: 1px solid var(--border-muted);
4892
+}
4893
+.shithub-actions-dispatch-form + .shithub-actions-dispatch-form {
4894
+  padding-top: 0.75rem;
4895
+}
4896
+.shithub-actions-dispatch-form:last-child {
4897
+  padding-bottom: 0;
4898
+  border-bottom: 0;
4899
+}
4900
+.shithub-actions-dispatch-form header h2 {
4901
+  margin: 0;
4902
+  font-size: 0.95rem;
4903
+}
4904
+.shithub-actions-dispatch-form header code,
4905
+.shithub-actions-dispatch-form small {
4906
+  color: var(--fg-muted);
4907
+  font-size: 0.78rem;
4908
+}
4909
+.shithub-actions-dispatch-form label {
4910
+  display: grid;
4911
+  gap: 0.25rem;
4912
+  color: var(--fg-default);
4913
+  font-size: 0.84rem;
4914
+}
4915
+.shithub-actions-dispatch-form input[type="text"],
4916
+.shithub-actions-dispatch-form select {
4917
+  width: 100%;
4918
+  min-width: 0;
4919
+  padding: 0.38rem 0.5rem;
4920
+  border: 1px solid var(--border-default);
4921
+  border-radius: 6px;
4922
+  background: var(--canvas-default);
4923
+  color: var(--fg-default);
4924
+}
4925
+.shithub-actions-dispatch-check {
4926
+  display: inline-flex;
4927
+  align-items: center;
4928
+  width: fit-content;
4929
+}
4930
+.shithub-actions-dispatch-check input[type="checkbox"] {
4931
+  width: 1rem;
4932
+  height: 1rem;
4933
+}
48574934
 .shithub-actions-filters {
48584935
   display: grid;
48594936
   grid-template-columns: minmax(9rem, 1fr) repeat(3, minmax(8rem, 0.7fr)) minmax(8rem, 0.8fr) auto auto;
@@ -5292,6 +5369,13 @@ button.shithub-repo-action {
52925369
 .shithub-actions-log-output code {
52935370
   font: inherit;
52945371
 }
5372
+.shithub-actions-log-live {
5373
+  padding: 0.5rem 1rem;
5374
+  border-top: 1px solid var(--border-default);
5375
+  background: var(--canvas-subtle);
5376
+  color: var(--fg-muted);
5377
+  font-size: 0.82rem;
5378
+}
52955379
 .shithub-actions-log-empty {
52965380
   padding: 2rem 1rem;
52975381
   color: var(--fg-muted);
@@ -5330,6 +5414,18 @@ button.shithub-repo-action {
53305414
   .shithub-actions-run-head {
53315415
     flex-direction: column;
53325416
   }
5417
+  .shithub-actions-head {
5418
+    align-items: flex-start;
5419
+    flex-direction: column;
5420
+  }
5421
+  .shithub-actions-head-actions {
5422
+    width: 100%;
5423
+    flex-wrap: wrap;
5424
+  }
5425
+  .shithub-actions-dispatch-menu {
5426
+    right: auto;
5427
+    left: 0;
5428
+  }
53335429
 }
53345430
 .shithub-tree-panel {
53355431
   border: 1px solid var(--border-default);
internal/web/templates/repo/action_step_log.htmlmodified
@@ -43,6 +43,9 @@
4343
         </header>
4444
         {{ if .Log.LogError }}
4545
           <p class="shithub-actions-log-empty">{{ .Log.LogError }}</p>
46
+        {{ else if .Log.StreamHref }}
47
+          <pre class="shithub-actions-log-output"><code data-actions-log-stream="{{ .Log.StreamHref }}">{{ .Log.LogText }}</code></pre>
48
+          <p class="shithub-actions-log-live" data-actions-log-status>Live</p>
4649
         {{ else if .Log.LogText }}
4750
           <pre class="shithub-actions-log-output"><code>{{ .Log.LogText }}</code></pre>
4851
         {{ else }}
@@ -52,4 +55,39 @@
5255
     </div>
5356
   </div>
5457
 </section>
58
+{{ if .Log.StreamHref }}
59
+<script>
60
+(() => {
61
+  const code = document.querySelector("[data-actions-log-stream]");
62
+  if (!code || !window.EventSource) return;
63
+  const status = document.querySelector("[data-actions-log-status]");
64
+  const scroller = code.closest(".shithub-actions-log-output");
65
+  const decoder = new TextDecoder();
66
+  const stream = new EventSource(code.dataset.actionsLogStream);
67
+  const scrollToBottom = () => {
68
+    if (scroller) scroller.scrollTop = scroller.scrollHeight;
69
+  };
70
+  stream.addEventListener("chunk", (event) => {
71
+    try {
72
+      const message = JSON.parse(event.data);
73
+      const raw = atob(message.chunk_b64 || "");
74
+      const bytes = Uint8Array.from(raw, (char) => char.charCodeAt(0));
75
+      code.textContent += decoder.decode(bytes, { stream: true });
76
+      scrollToBottom();
77
+    } catch {
78
+      if (status) status.textContent = "Live log interrupted";
79
+    }
80
+  });
81
+  stream.addEventListener("done", () => {
82
+    stream.close();
83
+    if (status) status.textContent = "Log complete";
84
+    window.setTimeout(() => window.location.reload(), 600);
85
+  });
86
+  stream.onerror = () => {
87
+    if (status) status.textContent = "Reconnecting";
88
+  };
89
+  scrollToBottom();
90
+})();
91
+</script>
92
+{{ end }}
5593
 {{- end }}
internal/web/templates/repo/actions.htmlmodified
@@ -27,7 +27,49 @@
2727
         <h1>{{ if .ActiveWorkflowName }}{{ .ActiveWorkflowName }}{{ else }}All workflows{{ end }}</h1>
2828
         <p>{{ .Pagination.ResultText }}</p>
2929
       </div>
30
-      <a class="shithub-button" href="/{{ .Owner }}/{{ .Repo.Name }}/settings/secrets/actions">{{ octicon "gear" }} Actions settings</a>
30
+      <div class="shithub-actions-head-actions">
31
+        {{ if .DispatchWorkflows }}
32
+          <details class="shithub-actions-dispatch">
33
+            <summary class="shithub-button">{{ octicon "play" }} Run workflow</summary>
34
+            <div class="shithub-actions-dispatch-menu">
35
+              {{ range .DispatchWorkflows }}
36
+                <form method="POST" action="{{ .DispatchHref }}" class="shithub-actions-dispatch-form">
37
+                  <input type="hidden" name="csrf_token" value="{{ $.CSRFToken }}">
38
+                  <header>
39
+                    <h2>{{ .Name }}</h2>
40
+                    <code>{{ .File }}</code>
41
+                  </header>
42
+                  <label>
43
+                    <span>Branch</span>
44
+                    <input type="text" name="ref" value="{{ $.Repo.DefaultBranch }}" required>
45
+                  </label>
46
+                  {{ range .Inputs }}
47
+                    <label>
48
+                      <span>{{ .Name }}{{ if .Required }} <strong>*</strong>{{ end }}</span>
49
+                      {{ if .Description }}<small>{{ .Description }}</small>{{ end }}
50
+                      {{ if .IsChoice }}
51
+                        <select name="inputs.{{ .Name }}"{{ if .Required }} required{{ end }}>
52
+                          {{ if not .Required }}<option value=""></option>{{ end }}
53
+                          {{ range .Options }}<option value="{{ .Value }}"{{ if .Selected }} selected{{ end }}>{{ .Value }}</option>{{ end }}
54
+                        </select>
55
+                      {{ else if .IsBoolean }}
56
+                        <span class="shithub-actions-dispatch-check">
57
+                          <input type="hidden" name="inputs.{{ .Name }}" value="false">
58
+                          <input type="checkbox" name="inputs.{{ .Name }}" value="true"{{ if eq .Default "true" }} checked{{ end }}>
59
+                        </span>
60
+                      {{ else }}
61
+                        <input type="text" name="inputs.{{ .Name }}" value="{{ .Default }}"{{ if .Required }} required{{ end }}>
62
+                      {{ end }}
63
+                    </label>
64
+                  {{ end }}
65
+                  <button type="submit" class="shithub-button shithub-button-primary">{{ octicon "play" }} Run</button>
66
+                </form>
67
+              {{ end }}
68
+            </div>
69
+          </details>
70
+        {{ end }}
71
+        <a class="shithub-button" href="/{{ .Owner }}/{{ .Repo.Name }}/settings/secrets/actions">{{ octicon "gear" }} Actions settings</a>
72
+      </div>
3173
     </header>
3274
 
3375
     <form class="shithub-actions-filters" method="GET" action="/{{ .Owner }}/{{ .Repo.Name }}/actions" aria-label="Workflow run filters">