Go · 5254 bytes Raw Blame History
1 // SPDX-License-Identifier: AGPL-3.0-or-later
2
3 package finalize
4
5 import (
6 "context"
7 "encoding/json"
8 "errors"
9 "io"
10 "strings"
11 "testing"
12
13 "github.com/jackc/pgx/v5/pgtype"
14 "github.com/jackc/pgx/v5/pgxpool"
15
16 actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
17 "github.com/tenseleyFlow/shithub/internal/infra/storage"
18 reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
19 "github.com/tenseleyFlow/shithub/internal/testing/dbtest"
20 usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
21 "github.com/tenseleyFlow/shithub/internal/worker"
22 )
23
24 const finalizeFixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
25 "AAAAAAAAAAAAAAAA$" +
26 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
27
28 func TestHandlerUploadsConcatenatedLogAndDeletesChunks(t *testing.T) {
29 ctx := context.Background()
30 pool := dbtest.NewTestDB(t)
31 store := storage.NewMemoryStore()
32 _, job, step := insertFinalizeFixture(t, pool)
33
34 q := actionsdb.New()
35 if _, err := q.AppendStepLogChunk(ctx, pool, actionsdb.AppendStepLogChunkParams{
36 StepID: step.ID,
37 Seq: 0,
38 Chunk: []byte("hello "),
39 }); err != nil {
40 t.Fatalf("AppendStepLogChunk 0: %v", err)
41 }
42 if _, err := q.AppendStepLogChunk(ctx, pool, actionsdb.AppendStepLogChunkParams{
43 StepID: step.ID,
44 Seq: 1,
45 Chunk: []byte("world\n"),
46 }); err != nil {
47 t.Fatalf("AppendStepLogChunk 1: %v", err)
48 }
49
50 payload, err := json.Marshal(Payload{StepID: step.ID})
51 if err != nil {
52 t.Fatalf("marshal payload: %v", err)
53 }
54 if err := Handler(Deps{Pool: pool, ObjectStore: store})(ctx, payload); err != nil {
55 t.Fatalf("Handler: %v", err)
56 }
57
58 updated, err := q.GetWorkflowStepByID(ctx, pool, step.ID)
59 if err != nil {
60 t.Fatalf("GetWorkflowStepByID: %v", err)
61 }
62 wantKey := StepLogObjectKey(job.RunID, job.ID, step.ID)
63 if !updated.LogObjectKey.Valid || updated.LogObjectKey.String != wantKey || updated.LogByteCount != int64(len("hello world\n")) {
64 t.Fatalf("updated step log metadata: %+v", updated)
65 }
66 rc, meta, err := store.Get(ctx, wantKey)
67 if err != nil {
68 t.Fatalf("store.Get: %v", err)
69 }
70 defer rc.Close()
71 body, err := io.ReadAll(rc)
72 if err != nil {
73 t.Fatalf("read object: %v", err)
74 }
75 if string(body) != "hello world\n" || meta.ContentType != logContentType {
76 t.Fatalf("object: body=%q meta=%+v", body, meta)
77 }
78 chunks, err := q.ListStepLogChunks(ctx, pool, actionsdb.ListStepLogChunksParams{
79 StepID: step.ID,
80 Seq: -1,
81 Limit: 10,
82 })
83 if err != nil {
84 t.Fatalf("ListStepLogChunks: %v", err)
85 }
86 if len(chunks) != 0 {
87 t.Fatalf("chunks were not deleted: %+v", chunks)
88 }
89 }
90
91 func TestHandlerPoisonsOversizedLogs(t *testing.T) {
92 ctx := context.Background()
93 pool := dbtest.NewTestDB(t)
94 _, _, step := insertFinalizeFixture(t, pool)
95 q := actionsdb.New()
96 if _, err := q.AppendStepLogChunk(ctx, pool, actionsdb.AppendStepLogChunkParams{
97 StepID: step.ID,
98 Seq: 0,
99 Chunk: []byte("too large"),
100 }); err != nil {
101 t.Fatalf("AppendStepLogChunk: %v", err)
102 }
103 payload, err := json.Marshal(Payload{StepID: step.ID})
104 if err != nil {
105 t.Fatalf("marshal payload: %v", err)
106 }
107 err = Handler(Deps{Pool: pool, ObjectStore: storage.NewMemoryStore(), MaxLogBytes: 4})(ctx, payload)
108 if !errors.Is(err, worker.ErrPoison) || !strings.Contains(err.Error(), "exceeds 4 bytes") {
109 t.Fatalf("error: got %v, want poison oversized error", err)
110 }
111 }
112
113 func insertFinalizeFixture(t *testing.T, pool *pgxpool.Pool) (actionsdb.WorkflowRun, actionsdb.WorkflowJob, actionsdb.WorkflowStep) {
114 t.Helper()
115 ctx := context.Background()
116 user, err := usersdb.New().CreateUser(ctx, pool, usersdb.CreateUserParams{
117 Username: "alice",
118 DisplayName: "Alice",
119 PasswordHash: finalizeFixtureHash,
120 })
121 if err != nil {
122 t.Fatalf("CreateUser: %v", err)
123 }
124 repo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
125 OwnerUserID: pgtype.Int8{Int64: user.ID, Valid: true},
126 Name: "demo",
127 DefaultBranch: "trunk",
128 Visibility: reposdb.RepoVisibilityPublic,
129 })
130 if err != nil {
131 t.Fatalf("CreateRepo: %v", err)
132 }
133 q := actionsdb.New()
134 run, err := q.InsertWorkflowRun(ctx, pool, actionsdb.InsertWorkflowRunParams{
135 RepoID: repo.ID,
136 RunIndex: 1,
137 WorkflowFile: ".shithub/workflows/ci.yml",
138 WorkflowName: "ci",
139 HeadSha: strings.Repeat("a", 40),
140 HeadRef: "refs/heads/trunk",
141 Event: actionsdb.WorkflowRunEventPush,
142 EventPayload: []byte(`{}`),
143 ActorUserID: pgtype.Int8{Int64: user.ID, Valid: true},
144 })
145 if err != nil {
146 t.Fatalf("InsertWorkflowRun: %v", err)
147 }
148 job, err := q.InsertWorkflowJob(ctx, pool, actionsdb.InsertWorkflowJobParams{
149 RunID: run.ID,
150 JobIndex: 0,
151 JobKey: "build",
152 JobName: "build",
153 RunsOn: "ubuntu-latest",
154 TimeoutMinutes: 360,
155 Permissions: []byte(`{}`),
156 JobEnv: []byte(`{}`),
157 })
158 if err != nil {
159 t.Fatalf("InsertWorkflowJob: %v", err)
160 }
161 step, err := q.InsertWorkflowStep(ctx, pool, actionsdb.InsertWorkflowStepParams{
162 JobID: job.ID,
163 StepIndex: 0,
164 StepName: "test",
165 RunCommand: "go test ./...",
166 StepEnv: []byte(`{}`),
167 WorkingDirectory: "",
168 StepWith: []byte(`{}`),
169 })
170 if err != nil {
171 t.Fatalf("InsertWorkflowStep: %v", err)
172 }
173 return run, job, step
174 }
175