Go · 2510 bytes Raw Blame History
1 // Code generated by sqlc. DO NOT EDIT.
2 // versions:
3 // sqlc v1.31.1
4 // source: cursor.sql
5
6 package notifdb
7
8 import (
9 "context"
10 )
11
12 const getEventCursor = `-- name: GetEventCursor :one
13
14 SELECT consumer, last_event_id, updated_at
15 FROM domain_events_processed
16 WHERE consumer = $1
17 `
18
19 // ─── domain_events_processed ──────────────────────────────────────
20 func (q *Queries) GetEventCursor(ctx context.Context, db DBTX, consumer string) (DomainEventsProcessed, error) {
21 row := db.QueryRow(ctx, getEventCursor, consumer)
22 var i DomainEventsProcessed
23 err := row.Scan(&i.Consumer, &i.LastEventID, &i.UpdatedAt)
24 return i, err
25 }
26
27 const listUnprocessedDomainEvents = `-- name: ListUnprocessedDomainEvents :many
28 SELECT id, actor_user_id, kind, repo_id, source_kind, source_id,
29 public, payload, created_at
30 FROM domain_events
31 WHERE id > $1
32 ORDER BY id
33 LIMIT $2
34 `
35
36 type ListUnprocessedDomainEventsParams struct {
37 ID int64
38 Limit int32
39 }
40
41 // The fan-out worker's read cursor. Bounded so a single tick
42 // doesn't try to drain a million-row backlog.
43 func (q *Queries) ListUnprocessedDomainEvents(ctx context.Context, db DBTX, arg ListUnprocessedDomainEventsParams) ([]DomainEvent, error) {
44 rows, err := db.Query(ctx, listUnprocessedDomainEvents, arg.ID, arg.Limit)
45 if err != nil {
46 return nil, err
47 }
48 defer rows.Close()
49 items := []DomainEvent{}
50 for rows.Next() {
51 var i DomainEvent
52 if err := rows.Scan(
53 &i.ID,
54 &i.ActorUserID,
55 &i.Kind,
56 &i.RepoID,
57 &i.SourceKind,
58 &i.SourceID,
59 &i.Public,
60 &i.Payload,
61 &i.CreatedAt,
62 ); err != nil {
63 return nil, err
64 }
65 items = append(items, i)
66 }
67 if err := rows.Err(); err != nil {
68 return nil, err
69 }
70 return items, nil
71 }
72
73 const setEventCursor = `-- name: SetEventCursor :exec
74 INSERT INTO domain_events_processed (consumer, last_event_id)
75 VALUES ($1, $2)
76 ON CONFLICT (consumer)
77 DO UPDATE SET last_event_id = EXCLUDED.last_event_id,
78 updated_at = now()
79 `
80
81 type SetEventCursorParams struct {
82 Consumer string
83 LastEventID int64
84 }
85
86 // Always-write upsert so the worker doesn't have to special-case
87 // the missing-row branch (the migration seeds 'notify_fanout' at
88 // 0; future consumers like 'webhook_deliver' do the same on first
89 // run via this same call).
90 func (q *Queries) SetEventCursor(ctx context.Context, db DBTX, arg SetEventCursorParams) error {
91 _, err := db.Exec(ctx, setEventCursor, arg.Consumer, arg.LastEventID)
92 return err
93 }
94