tenseleyflow/shithub / ff0e41e

Browse files

S33: worker job handlers — fanout, deliver, purge_old

Authored by espadonne
SHA
ff0e41e4da677b355ef1c43a1cfd4482210ff0f6
Parents
7bfdb60
Tree
55ee195

1 changed file

StatusFile+-
A internal/worker/jobs/webhook.go 112 0
internal/worker/jobs/webhook.goadded
@@ -0,0 +1,112 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package jobs
4
+
5
+import (
6
+	"context"
7
+	"encoding/json"
8
+	"log/slog"
9
+	"time"
10
+
11
+	"github.com/jackc/pgx/v5/pgtype"
12
+	"github.com/jackc/pgx/v5/pgxpool"
13
+
14
+	"github.com/tenseleyFlow/shithub/internal/auth/secretbox"
15
+	"github.com/tenseleyFlow/shithub/internal/webhook"
16
+	webhookdb "github.com/tenseleyFlow/shithub/internal/webhook/sqlc"
17
+	"github.com/tenseleyFlow/shithub/internal/worker"
18
+)
19
+
20
+// WebhookFanoutDeps wires the fan-out handler.
21
+type WebhookFanoutDeps struct {
22
+	Pool   *pgxpool.Pool
23
+	Logger *slog.Logger
24
+}
25
+
26
+// WebhookFanout drains domain_events past the webhook cursor and
27
+// creates per-subscriber delivery rows + deliver jobs. Self-throttles:
28
+// when a tick drains a full batch, the handler re-enqueues itself so
29
+// the next tick keeps draining without waiting for the cron beat.
30
+func WebhookFanout(deps WebhookFanoutDeps) worker.Handler {
31
+	return func(ctx context.Context, _ json.RawMessage) error {
32
+		processed, err := webhook.FanoutOnce(ctx, webhook.FanoutDeps{
33
+			Pool: deps.Pool, Logger: deps.Logger,
34
+		})
35
+		if err != nil {
36
+			return err
37
+		}
38
+		if deps.Logger != nil && processed > 0 {
39
+			deps.Logger.InfoContext(ctx, "webhook:fanout drained events",
40
+				"count", processed)
41
+		}
42
+		if processed >= webhook.FanoutBatch {
43
+			if _, err := worker.Enqueue(ctx, deps.Pool, webhook.KindWebhookFanout,
44
+				map[string]any{}, worker.EnqueueOptions{}); err != nil && deps.Logger != nil {
45
+				deps.Logger.WarnContext(ctx, "webhook:fanout self-enqueue failed",
46
+					"error", err)
47
+			}
48
+		}
49
+		return nil
50
+	}
51
+}
52
+
53
+// WebhookDeliverDeps wires the deliverer.
54
+type WebhookDeliverDeps struct {
55
+	Pool      *pgxpool.Pool
56
+	Logger    *slog.Logger
57
+	SecretBox *secretbox.Box
58
+	SSRF      webhook.SSRFConfig
59
+}
60
+
61
+// WebhookDeliver dispatches one delivery row. The job kind is fired by
62
+// fanout (1 row → 1 job); a row for which the deliverer schedules a
63
+// retry will get re-enqueued through next-retry-at scheduling — see
64
+// the cron-driven webhook:retry sweep documented in webhook docs.
65
+func WebhookDeliver(deps WebhookDeliverDeps) worker.Handler {
66
+	return func(ctx context.Context, raw json.RawMessage) error {
67
+		id, err := webhook.UnmarshalDeliverPayload(raw)
68
+		if err != nil {
69
+			return worker.PoisonError(err)
70
+		}
71
+		if id <= 0 {
72
+			return worker.PoisonError(jsonError("delivery_id must be positive"))
73
+		}
74
+		return webhook.Deliver(ctx, webhook.DeliverDeps{
75
+			Pool:      deps.Pool,
76
+			Logger:    deps.Logger,
77
+			SecretBox: deps.SecretBox,
78
+			SSRF:      deps.SSRF,
79
+		}, id)
80
+	}
81
+}
82
+
83
+// WebhookPurgeOldDeps wires the retention sweep.
84
+type WebhookPurgeOldDeps struct {
85
+	Pool      *pgxpool.Pool
86
+	Logger    *slog.Logger
87
+	Retention time.Duration
88
+}
89
+
90
+// WebhookPurgeOld drops terminal delivery rows older than the
91
+// retention window. Cron-driven; pending/retry rows are left alone.
92
+func WebhookPurgeOld(deps WebhookPurgeOldDeps) worker.Handler {
93
+	return func(ctx context.Context, _ json.RawMessage) error {
94
+		retention := deps.Retention
95
+		if retention <= 0 {
96
+			retention = 30 * 24 * time.Hour
97
+		}
98
+		ivl := pgtype.Interval{
99
+			Microseconds: int64(retention / time.Microsecond),
100
+			Valid:        true,
101
+		}
102
+		_, err := webhookdb.New().PurgeOldDeliveries(ctx, deps.Pool, ivl)
103
+		return err
104
+	}
105
+}
106
+
107
+// jsonError is a tiny error wrapper so the poison wrap above keeps
108
+// reading naturally. Its message is preserved in last_error.
109
+type jsonErr string
110
+
111
+func (e jsonErr) Error() string { return string(e) }
112
+func jsonError(s string) error  { return jsonErr(s) }