tenseleyflow/shithub / c5696d0

Browse files

Add internal/infra/db: pgxpool Open, WithTx, Healthcheck, goose Migrate

Authored by mfwolffe <wolffemf@dukes.jmu.edu>
SHA
c5696d0f7d43cc165dfa0f1a856431d923970692
Parents
bedb6d0
Tree
f92740d

2 changed files

StatusFile+-
A internal/infra/db/db.go 143 0
A internal/infra/db/migrate.go 90 0
internal/infra/db/db.goadded
@@ -0,0 +1,143 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+// Package db owns the Postgres connection lifecycle. S01 ships the
4
+// open/healthcheck/transaction helpers; later sprints add domain-specific
5
+// wrappers but always go through this package for the pool.
6
+package db
7
+
8
+import (
9
+	"context"
10
+	"errors"
11
+	"fmt"
12
+	"os"
13
+	"time"
14
+
15
+	"github.com/jackc/pgx/v5"
16
+	"github.com/jackc/pgx/v5/pgxpool"
17
+)
18
+
19
+// Config carries the pool's connection settings. S03 will populate this from
20
+// the layered config loader; for S01 we accept an optional explicit URL or
21
+// fall back to SHITHUB_DATABASE_URL.
22
+type Config struct {
23
+	URL             string
24
+	MaxConns        int32
25
+	MinConns        int32
26
+	ConnectTimeout  time.Duration
27
+	StatementCancel time.Duration
28
+}
29
+
30
+// Defaults returns sensible defaults for a dev pool. Prod values land via
31
+// the config loader in S03.
32
+func Defaults() Config {
33
+	return Config{
34
+		MaxConns:        10,
35
+		MinConns:        1,
36
+		ConnectTimeout:  5 * time.Second,
37
+		StatementCancel: 30 * time.Second,
38
+	}
39
+}
40
+
41
+// Resolve fills in URL from env if missing and clamps numeric defaults.
42
+func (c Config) Resolve() Config {
43
+	if c.URL == "" {
44
+		c.URL = os.Getenv("SHITHUB_DATABASE_URL")
45
+	}
46
+	if c.MaxConns <= 0 {
47
+		c.MaxConns = 10
48
+	}
49
+	if c.MinConns < 0 {
50
+		c.MinConns = 0
51
+	}
52
+	if c.ConnectTimeout <= 0 {
53
+		c.ConnectTimeout = 5 * time.Second
54
+	}
55
+	if c.StatementCancel <= 0 {
56
+		c.StatementCancel = 30 * time.Second
57
+	}
58
+	return c
59
+}
60
+
61
+// ErrNoURL is returned by Open when neither cfg.URL nor SHITHUB_DATABASE_URL
62
+// is set.
63
+var ErrNoURL = errors.New("db: no DATABASE_URL configured (set SHITHUB_DATABASE_URL)")
64
+
65
+// Open creates a new pgx pool from cfg. The caller owns the pool's lifecycle
66
+// and must call pool.Close() on shutdown.
67
+func Open(ctx context.Context, cfg Config) (*pgxpool.Pool, error) {
68
+	cfg = cfg.Resolve()
69
+	if cfg.URL == "" {
70
+		return nil, ErrNoURL
71
+	}
72
+
73
+	pcfg, err := pgxpool.ParseConfig(cfg.URL)
74
+	if err != nil {
75
+		return nil, fmt.Errorf("db: parse config: %w", err)
76
+	}
77
+	pcfg.MaxConns = cfg.MaxConns
78
+	pcfg.MinConns = cfg.MinConns
79
+	pcfg.ConnConfig.ConnectTimeout = cfg.ConnectTimeout
80
+
81
+	openCtx, cancel := context.WithTimeout(ctx, cfg.ConnectTimeout)
82
+	defer cancel()
83
+
84
+	pool, err := pgxpool.NewWithConfig(openCtx, pcfg)
85
+	if err != nil {
86
+		return nil, fmt.Errorf("db: open pool: %w", err)
87
+	}
88
+
89
+	// Verify connectivity before returning. A pool that can't talk to
90
+	// Postgres at startup is a hard failure, not a retry-on-first-query
91
+	// situation.
92
+	if err := pool.Ping(openCtx); err != nil {
93
+		pool.Close()
94
+		return nil, fmt.Errorf("db: ping: %w", err)
95
+	}
96
+
97
+	return pool, nil
98
+}
99
+
100
+// Healthcheck performs a fast SELECT 1 against the pool with a short
101
+// timeout. Used by /readyz.
102
+func Healthcheck(ctx context.Context, pool *pgxpool.Pool) error {
103
+	if pool == nil {
104
+		return errors.New("db: nil pool")
105
+	}
106
+	hc, cancel := context.WithTimeout(ctx, 2*time.Second)
107
+	defer cancel()
108
+	var v int
109
+	if err := pool.QueryRow(hc, "SELECT 1").Scan(&v); err != nil {
110
+		return fmt.Errorf("db: healthcheck: %w", err)
111
+	}
112
+	if v != 1 {
113
+		return fmt.Errorf("db: healthcheck: unexpected scalar %d", v)
114
+	}
115
+	return nil
116
+}
117
+
118
+// WithTx runs fn inside a Postgres transaction, committing on nil error and
119
+// rolling back otherwise. Panics inside fn are recovered and re-raised after
120
+// rollback so callers see them as panics rather than silent commits.
121
+func WithTx(ctx context.Context, pool *pgxpool.Pool, fn func(pgx.Tx) error) (err error) {
122
+	tx, err := pool.Begin(ctx)
123
+	if err != nil {
124
+		return fmt.Errorf("db: begin: %w", err)
125
+	}
126
+	defer func() {
127
+		if p := recover(); p != nil {
128
+			_ = tx.Rollback(ctx)
129
+			panic(p)
130
+		}
131
+		if err != nil {
132
+			if rbErr := tx.Rollback(ctx); rbErr != nil && !errors.Is(rbErr, pgx.ErrTxClosed) {
133
+				err = fmt.Errorf("%w (rollback: %v)", err, rbErr)
134
+			}
135
+			return
136
+		}
137
+		if cmErr := tx.Commit(ctx); cmErr != nil {
138
+			err = fmt.Errorf("db: commit: %w", cmErr)
139
+		}
140
+	}()
141
+	err = fn(tx)
142
+	return err
143
+}
internal/infra/db/migrate.goadded
@@ -0,0 +1,90 @@
1
+// SPDX-License-Identifier: AGPL-3.0-or-later
2
+
3
+package db
4
+
5
+import (
6
+	"context"
7
+	"errors"
8
+	"fmt"
9
+	"io/fs"
10
+
11
+	"github.com/jackc/pgx/v5"
12
+	"github.com/jackc/pgx/v5/stdlib"
13
+	"github.com/pressly/goose/v3"
14
+)
15
+
16
+// MigrationsFS holds the embedded migrations. The web/cmd packages set this
17
+// at init time via SetMigrationsFS(); we don't embed here because embed
18
+// directives can't traverse upward to the migrations/ directory at the repo
19
+// root.
20
+var migrationsFS fs.FS
21
+
22
+// SetMigrationsFS registers the migrations filesystem. The repo's
23
+// `internal/migrationsfs` package embeds and registers it; tests can swap
24
+// it for a fixture FS.
25
+func SetMigrationsFS(fsys fs.FS) {
26
+	migrationsFS = fsys
27
+}
28
+
29
+// MigrateAction is one of the migrate operations the CLI exposes.
30
+type MigrateAction string
31
+
32
+const (
33
+	MigrateUp      MigrateAction = "up"
34
+	MigrateDown    MigrateAction = "down"
35
+	MigrateStatus  MigrateAction = "status"
36
+	MigrateVersion MigrateAction = "version"
37
+	MigrateRedo    MigrateAction = "redo"
38
+	MigrateReset   MigrateAction = "reset"
39
+)
40
+
41
+// Migrate runs the requested goose action against the pool's underlying
42
+// database. We reuse the pool's connection config rather than opening a
43
+// fresh sql.DB from scratch, so the same DSN env-var-driven configuration
44
+// applies to migrations.
45
+//
46
+// goose requires `database/sql`. We bridge from pgx's connection config
47
+// using `pgx/v5/stdlib`.
48
+func Migrate(ctx context.Context, cfg Config, action MigrateAction) error {
49
+	if migrationsFS == nil {
50
+		return errors.New("db: migrationsFS not registered (call SetMigrationsFS)")
51
+	}
52
+	cfg = cfg.Resolve()
53
+	if cfg.URL == "" {
54
+		return ErrNoURL
55
+	}
56
+
57
+	connCfg, err := pgx.ParseConfig(cfg.URL)
58
+	if err != nil {
59
+		return fmt.Errorf("db: parse migrate URL: %w", err)
60
+	}
61
+	sqldb := stdlib.OpenDB(*connCfg)
62
+	defer func() { _ = sqldb.Close() }()
63
+
64
+	if err := goose.SetDialect("postgres"); err != nil {
65
+		return fmt.Errorf("db: goose dialect: %w", err)
66
+	}
67
+	goose.SetBaseFS(migrationsFS)
68
+
69
+	switch action {
70
+	case MigrateUp:
71
+		return goose.UpContext(ctx, sqldb, ".")
72
+	case MigrateDown:
73
+		return goose.DownContext(ctx, sqldb, ".")
74
+	case MigrateStatus:
75
+		return goose.StatusContext(ctx, sqldb, ".")
76
+	case MigrateVersion:
77
+		v, err := goose.GetDBVersionContext(ctx, sqldb)
78
+		if err != nil {
79
+			return fmt.Errorf("db: version: %w", err)
80
+		}
81
+		fmt.Printf("schema version: %d\n", v)
82
+		return nil
83
+	case MigrateRedo:
84
+		return goose.RedoContext(ctx, sqldb, ".")
85
+	case MigrateReset:
86
+		return goose.ResetContext(ctx, sqldb, ".")
87
+	default:
88
+		return fmt.Errorf("db: unknown migrate action %q", action)
89
+	}
90
+}