tenseleyflow/shithub / b4c53a7

Browse files

actions/logstream: isolate SSE transport cleanup

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
b4c53a7c75afc2e31b1bb19d4cb39e36448db112
Parents
50f04e8
Tree
fad643c

8 changed files

StatusFile+-
M docs/internal/runbooks/actions.md 5 0
M internal/actions/logstream/logstream.go 5 0
M internal/actions/logstream/logstream_test.go 3 0
M internal/web/handlers/handlers.go 16 1
M internal/web/handlers/handlers_test.go 42 0
M internal/web/handlers/repo/actions_log_stream.go 7 0
M internal/web/handlers/repo/repo.go 6 1
M internal/web/server.go 1 0
docs/internal/runbooks/actions.mdmodified
@@ -13,6 +13,11 @@ The stream sends `event: chunk` records with the chunk sequence as the SSE
1313
 `?after=<seq>` for the first connection from a rendered log page. A terminal
1414
 step sends `event: done` and closes the stream.
1515
 
16
+In `shithubd`, this route is mounted outside the normal app compression and
17
+30-second timeout middleware. If a future route move puts live logs back under
18
+either middleware, EventSource clients will churn and logs can buffer despite
19
+the Caddy flush setting.
20
+
1621
 Log chunks are never sent through Postgres `NOTIFY`. Runner log writes append
1722
 to `workflow_step_log_chunks`, then `NOTIFY step_log_<step_id>` with only the
1823
 sequence number. Step completion notifies `done`.
internal/actions/logstream/logstream.gomodified
@@ -33,6 +33,11 @@ func ListenSQL(stepID int64) string {
3333
 	return "LISTEN " + pgx.Identifier{Channel(stepID)}.Sanitize()
3434
 }
3535
 
36
+// UnlistenSQL returns a quoted UNLISTEN statement for the per-step channel.
37
+func UnlistenSQL(stepID int64) string {
38
+	return "UNLISTEN " + pgx.Identifier{Channel(stepID)}.Sanitize()
39
+}
40
+
3641
 // NotifyChunk wakes log tailers for a newly-persisted chunk.
3742
 func NotifyChunk(ctx context.Context, db DBTX, stepID int64, seq int32) error {
3843
 	return notify(ctx, db, stepID, strconv.FormatInt(int64(seq), 10))
internal/actions/logstream/logstream_test.gomodified
@@ -12,6 +12,9 @@ func TestChannelAndListenSQL(t *testing.T) {
1212
 	if got := ListenSQL(42); got != `LISTEN "step_log_42"` {
1313
 		t.Fatalf("ListenSQL=%q", got)
1414
 	}
15
+	if got := UnlistenSQL(42); got != `UNLISTEN "step_log_42"` {
16
+		t.Fatalf("UnlistenSQL=%q", got)
17
+	}
1518
 }
1619
 
1720
 func TestParsePayload(t *testing.T) {
internal/web/handlers/handlers.gomodified
@@ -88,6 +88,10 @@ type Deps struct {
8888
 	// workflow_dispatch endpoint (S41b). Auth-required + per-handler
8989
 	// repo-write check.
9090
 	RepoActionsAPIMounter func(chi.Router)
91
+	// RepoActionsStreamMounter registers long-lived Actions log-stream
92
+	// routes. It MUST bypass response compression and request timeout;
93
+	// the handler still runs the normal repo-read policy gate.
94
+	RepoActionsStreamMounter func(chi.Router)
9195
 	// RepoSettingsGeneralMounter registers the General/Access tabs and
9296
 	// the deferred-tab placeholders (webhooks, keys, notifications,
9397
 	// tags) under /{owner}/{repo}/settings/* (S32). Auth-required.
@@ -255,9 +259,20 @@ func RegisterChi(r *chi.Mux, deps Deps) (*chi.Mux, middleware.PanicHandler, http
255259
 		})
256260
 	}
257261
 
262
+	// Actions step-log SSE also streams for minutes. Keep it out of the
263
+	// app group's timeout/compression stack so EventSource receives each
264
+	// event as the handler flushes it. Browser CSRF protection is not
265
+	// needed for this GET-only route; repo visibility is enforced inside
266
+	// the handler through policy.ActionRepoRead.
267
+	if deps.RepoActionsStreamMounter != nil {
268
+		r.Group(func(r chi.Router) {
269
+			deps.RepoActionsStreamMounter(r)
270
+		})
271
+	}
272
+
258273
 	// Application routes — CSRF protected. Compress + Timeout live in
259274
 	// this group (and the static one above) rather than globally so the
260
-	// git-HTTP group can opt out.
275
+	// streaming groups can opt out.
261276
 	r.Group(func(r chi.Router) {
262277
 		r.Use(middleware.Compress)
263278
 		r.Use(middleware.Timeout(30 * time.Second))
internal/web/handlers/handlers_test.gomodified
@@ -9,6 +9,8 @@ import (
99
 	"net/http/httptest"
1010
 	"strings"
1111
 	"testing"
12
+
13
+	"github.com/go-chi/chi/v5"
1214
 )
1315
 
1416
 func TestHandlers(t *testing.T) {
@@ -135,3 +137,43 @@ func TestHealthzHEAD(t *testing.T) {
135137
 		t.Fatalf("HEAD /healthz: status %d, want 200", rec.Code)
136138
 	}
137139
 }
140
+
141
+func TestActionsLogStreamRouteBypassesCompressAndTimeout(t *testing.T) {
142
+	t.Parallel()
143
+
144
+	mux := http.NewServeMux()
145
+	logger := slog.New(slog.NewTextHandler(io.Discard, nil))
146
+	if err := Register(mux, Deps{
147
+		Logger:      logger,
148
+		TemplatesFS: testTemplatesFS(t),
149
+		StaticFS:    testStaticFS(t),
150
+		LogoSVG:     `<svg xmlns="http://www.w3.org/2000/svg"><title>shithub</title></svg>`,
151
+		RepoActionsStreamMounter: func(r chi.Router) {
152
+			r.Get("/{owner}/{repo}/actions/runs/{runIndex}/jobs/{jobIndex}/steps/{stepIndex}/log/stream", func(w http.ResponseWriter, r *http.Request) {
153
+				w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
154
+				if _, ok := r.Context().Deadline(); ok {
155
+					_, _ = io.WriteString(w, "deadline")
156
+					return
157
+				}
158
+				_, _ = io.WriteString(w, "no-deadline")
159
+			})
160
+		},
161
+	}); err != nil {
162
+		t.Fatalf("Register: %v", err)
163
+	}
164
+
165
+	req := httptest.NewRequest(http.MethodGet, "/octo/demo/actions/runs/1/jobs/0/steps/0/log/stream", nil)
166
+	req.Header.Set("Accept-Encoding", "gzip")
167
+	rec := httptest.NewRecorder()
168
+	mux.ServeHTTP(rec, req)
169
+
170
+	if rec.Code != http.StatusOK {
171
+		t.Fatalf("status: got %d, want %d body=%q", rec.Code, http.StatusOK, rec.Body.String())
172
+	}
173
+	if got := rec.Header().Get("Content-Encoding"); got != "" {
174
+		t.Fatalf("Content-Encoding: got %q, want empty", got)
175
+	}
176
+	if got := rec.Body.String(); got != "no-deadline" {
177
+		t.Fatalf("body: got %q, want no-deadline", got)
178
+	}
179
+}
internal/web/handlers/repo/actions_log_stream.gomodified
@@ -117,6 +117,13 @@ func (h *Handlers) repoActionStepLogStream(w http.ResponseWriter, r *http.Reques
117117
 		h.d.Render.HTTPError(w, r, http.StatusInternalServerError, "")
118118
 		return
119119
 	}
120
+	defer func() {
121
+		ctx, cancel := context.WithTimeout(context.Background(), actionsLogStreamReleaseTimeout)
122
+		defer cancel()
123
+		if _, err := conn.Exec(ctx, logstream.UnlistenSQL(step.ID)); err != nil && h.d.Logger != nil {
124
+			h.d.Logger.WarnContext(ctx, "repo actions: unlisten log stream", "step_id", step.ID, "error", err)
125
+		}
126
+	}()
120127
 
121128
 	w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
122129
 	w.Header().Set("Cache-Control", "no-cache, no-transform")
internal/web/handlers/repo/repo.gomodified
@@ -133,12 +133,17 @@ func (h *Handlers) MountRepoActionsAPI(r chi.Router) {
133133
 	r.Post("/{owner}/{repo}/actions/workflows/{file}/dispatches", h.repoActionsDispatch)
134134
 }
135135
 
136
+// MountRepoActionsStreams registers long-lived Actions streaming routes.
137
+// Caller must mount this outside compression and request-timeout middleware.
138
+func (h *Handlers) MountRepoActionsStreams(r chi.Router) {
139
+	r.Get("/{owner}/{repo}/actions/runs/{runIndex}/jobs/{jobIndex}/steps/{stepIndex}/log/stream", h.repoActionStepLogStream)
140
+}
141
+
136142
 // MountRepoHome registers the root repository route plus product-tab shells
137143
 // that are intentionally public and read-gated like the Code tab. The
138144
 // two-segment route doesn't collide with the /{username} catch-all from S09;
139145
 // caller is responsible for ordering this BEFORE /{username}.
140146
 func (h *Handlers) MountRepoHome(r chi.Router) {
141
-	r.Get("/{owner}/{repo}/actions/runs/{runIndex}/jobs/{jobIndex}/steps/{stepIndex}/log/stream", h.repoActionStepLogStream)
142147
 	r.Get("/{owner}/{repo}/actions/runs/{runIndex}/jobs/{jobIndex}/steps/{stepIndex}", h.repoActionStepLog)
143148
 	r.Get("/{owner}/{repo}/actions/runs/{runIndex}/status", h.repoActionRunStatus)
144149
 	r.Get("/{owner}/{repo}/actions/runs/{runIndex}", h.repoActionRun)
internal/web/server.gomodified
@@ -211,6 +211,7 @@ func Run(ctx context.Context, opts Options) error {
211211
 			})
212212
 		}
213213
 		deps.RepoHomeMounter = repoH.MountRepoHome
214
+		deps.RepoActionsStreamMounter = repoH.MountRepoActionsStreams
214215
 		deps.RepoCodeMounter = repoH.MountCode
215216
 		deps.RepoHistoryMounter = repoH.MountHistory
216217
 		deps.RepoRefsMounter = repoH.MountRefs