From d67ac8d9f2478f6f2bb15a3a2cb701d958e54273 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Thu, 14 May 2026 18:13:45 -0600 Subject: [PATCH] feat(webicached,webid): add pgstore backend and --pg flag Adds an optional PostgreSQL storage backend for webicached and webid. When --pg is given a DSN, the daemon writes/reads classified assets to/from Postgres instead of the legacy fsstore. Brings in pgx/v5 as a direct dependency. --- cmd/webicached/main.go | 20 +- cmd/webid/main.go | 19 +- go.mod | 9 + go.sum | 26 +++ internal/storage/pgstore/pgstore.go | 295 ++++++++++++++++++++++++++++ 5 files changed, 361 insertions(+), 8 deletions(-) create mode 100644 internal/storage/pgstore/pgstore.go diff --git a/cmd/webicached/main.go b/cmd/webicached/main.go index d88bcdc..3afe6d4 100644 --- a/cmd/webicached/main.go +++ b/cmd/webicached/main.go @@ -55,6 +55,7 @@ import ( "github.com/webinstall/webi-installers/internal/releases/zigdist" "github.com/webinstall/webi-installers/internal/storage" "github.com/webinstall/webi-installers/internal/storage/fsstore" + "github.com/webinstall/webi-installers/internal/storage/pgstore" ) var ( @@ -78,6 +79,7 @@ type MainConfig struct { envFile string confDir string cacheDir string + pgDSN string rawDir string token string once bool @@ -156,11 +158,20 @@ func main() { cfg.token = os.Getenv("GITHUB_TOKEN") } - fss, err := fsstore.New(cfg.cacheDir) - if err != nil { - log.Fatalf("fsstore: %v", err) + var store storage.Store + if cfg.pgDSN != "" { + pg, err := pgstore.New(context.Background(), cfg.pgDSN) + if err != nil { + log.Fatalf("pgstore: %v", err) + } + store = pg + } else { + fss, err := fsstore.New(cfg.cacheDir) + if err != nil { + log.Fatalf("fsstore: %v", err) + } + store = fss } - var store storage.Store = fss var auth *githubish.Auth if cfg.token != "" { @@ -298,6 +309,7 @@ func registerFlags(fs *flag.FlagSet, cfg *MainConfig) { fs.StringVar(&cfg.envFile, "envfile", "", "path to .env file to load before running") fs.StringVar(&cfg.confDir, "conf", ".", "root directory containing {pkg}/releases.conf files") fs.StringVar(&cfg.cacheDir, "legacy", "~/.cache/webi/legacy", "legacy cache directory (fsstore root)") + fs.StringVar(&cfg.pgDSN, "pg", "", "PostgreSQL DSN (enables pgstore; mutually exclusive with -legacy)") fs.StringVar(&cfg.rawDir, "raw", "~/.cache/webi/raw", "raw cache directory for upstream responses") fs.StringVar(&cfg.token, "token", "", "GitHub API token (or set $GITHUB_TOKEN)") fs.BoolVar(&cfg.once, "once", false, "run once then exit (no periodic refresh)") diff --git a/cmd/webid/main.go b/cmd/webid/main.go index 4c5de38..50970f7 100644 --- a/cmd/webid/main.go +++ b/cmd/webid/main.go @@ -37,6 +37,7 @@ import ( "github.com/webinstall/webi-installers/internal/storage" "github.com/webinstall/webi-installers/internal/storage/fsstore" + "github.com/webinstall/webi-installers/internal/storage/pgstore" "github.com/webinstall/webi-installers/internal/uadetect" ) @@ -60,6 +61,7 @@ func printVersion(w io.Writer) { func main() { addr := flag.String("addr", ":3001", "listen address") cacheDir := flag.String("legacy", "~/.cache/webi/legacy", "legacy cache directory") + pgDSN := flag.String("pg", "", "PostgreSQL DSN (enables pgstore; mutually exclusive with -legacy)") installersDir := flag.String("installers", ".", "installers repo root (for install.sh/ps1)") if len(os.Args) > 1 { @@ -80,11 +82,20 @@ func main() { cachePath := expandHome(*cacheDir) - fss, err := fsstore.New(cachePath) - if err != nil { - log.Fatalf("fsstore: %v", err) + var store storage.Store + if *pgDSN != "" { + pg, err := pgstore.New(context.Background(), *pgDSN) + if err != nil { + log.Fatalf("pgstore: %v", err) + } + store = pg + } else { + fss, err := fsstore.New(cachePath) + if err != nil { + log.Fatalf("fsstore: %v", err) + } + store = fss } - var store storage.Store = fss srv := &server{ store: store, diff --git a/go.mod b/go.mod index 4e98a44..984189d 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,16 @@ module github.com/webinstall/webi-installers go 1.26.1 require ( + github.com/jackc/pgx/v5 v5.9.2 github.com/joho/godotenv v1.5.1 github.com/jszwec/csvutil v1.10.0 github.com/therootcompany/golib/http/middleware/v2 v2.0.1 ) + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum index cce88cd..369b746 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,32 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw= +github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/jszwec/csvutil v1.10.0 h1:upMDUxhQKqZ5ZDCs/wy+8Kib8rZR8I8lOR34yJkdqhI= github.com/jszwec/csvutil v1.10.0/go.mod h1:/E4ONrmGkwmWsk9ae9jpXnv9QT8pLHEPcCirMFhxG9I= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/therootcompany/golib/http/middleware/v2 v2.0.1 h1:VNKpHcwyEW7cMct7/eO4fyrxwIQk2ycb6juVXSPs2Sk= github.com/therootcompany/golib/http/middleware/v2 v2.0.1/go.mod h1:g5gb9qBidw74nW6/mwIauTKMpOKchiN2l0gt5qzJ2aQ= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/storage/pgstore/pgstore.go b/internal/storage/pgstore/pgstore.go new file mode 100644 index 0000000..4449ac3 --- /dev/null +++ b/internal/storage/pgstore/pgstore.go @@ -0,0 +1,295 @@ +// Package pgstore implements [storage.Store] on PostgreSQL. +// +// Schema uses double-buffering: two asset generations per package (0 and 1). +// The active generation pointer in webi_packages is updated atomically on +// Commit, so readers always see a complete consistent snapshot. +// +// Write path: +// +// BeginRefresh → clears inactive generation, returns tx +// Put → stages assets in-memory +// Commit → bulk-inserts assets (COPY), swaps generation pointer +// +// Read path: +// +// Load → reads active generation from webi_packages, fetches assets +// +// Connection string format: standard libpq / pgx DSN, e.g.: +// +// postgres://user:pass@host/dbname?sslmode=require +// host=localhost user=webi dbname=webi sslmode=disable +package pgstore + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/webinstall/webi-installers/internal/storage" +) + +// Schema holds the DDL for creating the required tables. +// Run once on startup or deploy to ensure the schema exists. +const Schema = ` +CREATE TABLE IF NOT EXISTS webi_packages ( + name TEXT NOT NULL PRIMARY KEY, + active_gen SMALLINT NOT NULL DEFAULT 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS webi_assets ( + id BIGSERIAL PRIMARY KEY, + pkg TEXT NOT NULL, + gen SMALLINT NOT NULL, + filename TEXT NOT NULL DEFAULT '', + version TEXT NOT NULL DEFAULT '', + lts BOOLEAN NOT NULL DEFAULT FALSE, + channel TEXT NOT NULL DEFAULT '', + date TEXT NOT NULL DEFAULT '', + os TEXT NOT NULL DEFAULT '', + arch TEXT NOT NULL DEFAULT '', + libc TEXT NOT NULL DEFAULT '', + format TEXT NOT NULL DEFAULT '', + download TEXT NOT NULL DEFAULT '', + extra TEXT NOT NULL DEFAULT '', + variants TEXT[] NOT NULL DEFAULT '{}' +); + +CREATE INDEX IF NOT EXISTS webi_assets_pkg_gen ON webi_assets (pkg, gen); +` + +// Store is a PostgreSQL-backed asset store. +type Store struct { + pool *pgxpool.Pool +} + +// New opens a connection pool to the given DSN and applies the schema. +// Returns an error if the connection or schema creation fails. +func New(ctx context.Context, dsn string) (*Store, error) { + cfg, err := pgxpool.ParseConfig(dsn) + if err != nil { + return nil, fmt.Errorf("pgstore: parse dsn: %w", err) + } + + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("pgstore: connect: %w", err) + } + + if err := applySchema(ctx, pool); err != nil { + pool.Close() + return nil, err + } + + return &Store{pool: pool}, nil +} + +// Close releases the connection pool. +func (s *Store) Close() { + s.pool.Close() +} + +// ListPackages returns the names of all packages in the store. +func (s *Store) ListPackages(ctx context.Context) ([]string, error) { + rows, err := s.pool.Query(ctx, + `SELECT name FROM webi_packages ORDER BY name`, + ) + if err != nil { + return nil, fmt.Errorf("pgstore: list packages: %w", err) + } + defer rows.Close() + + var pkgs []string + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return nil, fmt.Errorf("pgstore: scan package name: %w", err) + } + pkgs = append(pkgs, name) + } + return pkgs, rows.Err() +} + +// Load returns all assets for a package using the active generation. +// Returns nil (not an error) if the package is not cached. +func (s *Store) Load(ctx context.Context, pkg string) (*storage.PackageData, error) { + // Fetch active generation and updated_at. + var gen int16 + var updatedAt time.Time + err := s.pool.QueryRow(ctx, + `SELECT active_gen, updated_at FROM webi_packages WHERE name = $1`, + pkg, + ).Scan(&gen, &updatedAt) + if err == pgx.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("pgstore: load %s: %w", pkg, err) + } + + // Fetch all assets for this generation. + rows, err := s.pool.Query(ctx, ` + SELECT filename, version, lts, channel, date, + os, arch, libc, format, download, extra, variants + FROM webi_assets + WHERE pkg = $1 AND gen = $2 + ORDER BY id + `, pkg, gen) + if err != nil { + return nil, fmt.Errorf("pgstore: load assets %s: %w", pkg, err) + } + defer rows.Close() + + var assets []storage.Asset + for rows.Next() { + var a storage.Asset + if err := rows.Scan( + &a.Filename, &a.Version, &a.LTS, &a.Channel, &a.Date, + &a.OS, &a.Arch, &a.Libc, &a.Format, &a.Download, + &a.Extra, &a.Variants, + ); err != nil { + return nil, fmt.Errorf("pgstore: scan asset %s: %w", pkg, err) + } + assets = append(assets, a) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("pgstore: rows %s: %w", pkg, err) + } + + return &storage.PackageData{ + Assets: assets, + UpdatedAt: updatedAt, + }, nil +} + +// BeginRefresh starts a write transaction for a package. +// It determines the inactive generation and clears it, ready for new data. +func (s *Store) BeginRefresh(ctx context.Context, pkg string) (storage.RefreshTx, error) { + // Determine which generation to write into (the inactive one). + var activeGen int16 + err := s.pool.QueryRow(ctx, + `SELECT active_gen FROM webi_packages WHERE name = $1`, + pkg, + ).Scan(&activeGen) + if err != nil && err != pgx.ErrNoRows { + return nil, fmt.Errorf("pgstore: begin refresh %s: %w", pkg, err) + } + // If package doesn't exist yet, activeGen defaults to 0 and we write to gen 1. + // If package exists, we write to the inactive generation (1 - activeGen). + var writeGen int16 + if err == pgx.ErrNoRows { + writeGen = 1 + } else { + writeGen = 1 - activeGen + } + + // Clear the write generation so we start fresh. + if _, err := s.pool.Exec(ctx, + `DELETE FROM webi_assets WHERE pkg = $1 AND gen = $2`, + pkg, writeGen, + ); err != nil { + return nil, fmt.Errorf("pgstore: clear gen %d for %s: %w", writeGen, pkg, err) + } + + return &refreshTx{ + pool: s.pool, + pkg: pkg, + gen: writeGen, + }, nil +} + +// refreshTx is an in-progress write for one package. +type refreshTx struct { + pool *pgxpool.Pool + pkg string + gen int16 + assets []storage.Asset +} + +// Put stages assets for writing. May be called multiple times. +func (tx *refreshTx) Put(assets []storage.Asset) error { + tx.assets = append(tx.assets, assets...) + return nil +} + +// Commit bulk-inserts all staged assets, then atomically swaps the +// active generation pointer in webi_packages. +func (tx *refreshTx) Commit(ctx context.Context) error { + if len(tx.assets) == 0 { + return tx.swapGeneration(ctx) + } + + // Build rows for pgx.CopyFromRows. + rows := make([][]any, len(tx.assets)) + for i, a := range tx.assets { + variants := a.Variants + if variants == nil { + variants = []string{} + } + rows[i] = []any{ + tx.pkg, + tx.gen, + a.Filename, + a.Version, + a.LTS, + a.Channel, + a.Date, + a.OS, + a.Arch, + a.Libc, + a.Format, + a.Download, + a.Extra, + variants, + } + } + + cols := []string{ + "pkg", "gen", + "filename", "version", "lts", "channel", "date", + "os", "arch", "libc", "format", "download", "extra", "variants", + } + + _, err := tx.pool.CopyFrom(ctx, + pgx.Identifier{"webi_assets"}, + cols, + pgx.CopyFromRows(rows), + ) + if err != nil { + return fmt.Errorf("pgstore: copy assets %s: %w", tx.pkg, err) + } + + return tx.swapGeneration(ctx) +} + +// swapGeneration atomically updates the active generation pointer. +func (tx *refreshTx) swapGeneration(ctx context.Context) error { + _, err := tx.pool.Exec(ctx, ` + INSERT INTO webi_packages (name, active_gen, updated_at) + VALUES ($1, $2, now()) + ON CONFLICT (name) + DO UPDATE SET active_gen = $2, updated_at = now() + `, tx.pkg, tx.gen) + if err != nil { + return fmt.Errorf("pgstore: swap gen %s: %w", tx.pkg, err) + } + tx.assets = nil + return nil +} + +// Rollback discards all staged assets without writing anything. +func (tx *refreshTx) Rollback() error { + tx.assets = nil + return nil +} + +// applySchema runs the schema DDL idempotently. +func applySchema(ctx context.Context, pool *pgxpool.Pool) error { + if _, err := pool.Exec(ctx, Schema); err != nil { + return fmt.Errorf("pgstore: apply schema: %w", err) + } + return nil +}