mirror of
https://github.com/webinstall/webi-installers.git
synced 2026-05-31 13:02:46 +00:00
feat(webicached,webid): add pgstore backend and --pg flag
Adds an optional PostgreSQL storage backend for webicached and webid. When --pg is given a DSN, the daemon writes/reads classified assets to/from Postgres instead of the legacy fsstore. Brings in pgx/v5 as a direct dependency.
This commit is contained in:
@@ -55,6 +55,7 @@ import (
|
||||
"github.com/webinstall/webi-installers/internal/releases/zigdist"
|
||||
"github.com/webinstall/webi-installers/internal/storage"
|
||||
"github.com/webinstall/webi-installers/internal/storage/fsstore"
|
||||
"github.com/webinstall/webi-installers/internal/storage/pgstore"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -78,6 +79,7 @@ type MainConfig struct {
|
||||
envFile string
|
||||
confDir string
|
||||
cacheDir string
|
||||
pgDSN string
|
||||
rawDir string
|
||||
token string
|
||||
once bool
|
||||
@@ -156,11 +158,20 @@ func main() {
|
||||
cfg.token = os.Getenv("GITHUB_TOKEN")
|
||||
}
|
||||
|
||||
fss, err := fsstore.New(cfg.cacheDir)
|
||||
if err != nil {
|
||||
log.Fatalf("fsstore: %v", err)
|
||||
var store storage.Store
|
||||
if cfg.pgDSN != "" {
|
||||
pg, err := pgstore.New(context.Background(), cfg.pgDSN)
|
||||
if err != nil {
|
||||
log.Fatalf("pgstore: %v", err)
|
||||
}
|
||||
store = pg
|
||||
} else {
|
||||
fss, err := fsstore.New(cfg.cacheDir)
|
||||
if err != nil {
|
||||
log.Fatalf("fsstore: %v", err)
|
||||
}
|
||||
store = fss
|
||||
}
|
||||
var store storage.Store = fss
|
||||
|
||||
var auth *githubish.Auth
|
||||
if cfg.token != "" {
|
||||
@@ -298,6 +309,7 @@ func registerFlags(fs *flag.FlagSet, cfg *MainConfig) {
|
||||
fs.StringVar(&cfg.envFile, "envfile", "", "path to .env file to load before running")
|
||||
fs.StringVar(&cfg.confDir, "conf", ".", "root directory containing {pkg}/releases.conf files")
|
||||
fs.StringVar(&cfg.cacheDir, "legacy", "~/.cache/webi/legacy", "legacy cache directory (fsstore root)")
|
||||
fs.StringVar(&cfg.pgDSN, "pg", "", "PostgreSQL DSN (enables pgstore; mutually exclusive with -legacy)")
|
||||
fs.StringVar(&cfg.rawDir, "raw", "~/.cache/webi/raw", "raw cache directory for upstream responses")
|
||||
fs.StringVar(&cfg.token, "token", "", "GitHub API token (or set $GITHUB_TOKEN)")
|
||||
fs.BoolVar(&cfg.once, "once", false, "run once then exit (no periodic refresh)")
|
||||
|
||||
@@ -37,6 +37,7 @@ import (
|
||||
|
||||
"github.com/webinstall/webi-installers/internal/storage"
|
||||
"github.com/webinstall/webi-installers/internal/storage/fsstore"
|
||||
"github.com/webinstall/webi-installers/internal/storage/pgstore"
|
||||
"github.com/webinstall/webi-installers/internal/uadetect"
|
||||
)
|
||||
|
||||
@@ -60,6 +61,7 @@ func printVersion(w io.Writer) {
|
||||
func main() {
|
||||
addr := flag.String("addr", ":3001", "listen address")
|
||||
cacheDir := flag.String("legacy", "~/.cache/webi/legacy", "legacy cache directory")
|
||||
pgDSN := flag.String("pg", "", "PostgreSQL DSN (enables pgstore; mutually exclusive with -legacy)")
|
||||
installersDir := flag.String("installers", ".", "installers repo root (for install.sh/ps1)")
|
||||
|
||||
if len(os.Args) > 1 {
|
||||
@@ -80,11 +82,20 @@ func main() {
|
||||
|
||||
cachePath := expandHome(*cacheDir)
|
||||
|
||||
fss, err := fsstore.New(cachePath)
|
||||
if err != nil {
|
||||
log.Fatalf("fsstore: %v", err)
|
||||
var store storage.Store
|
||||
if *pgDSN != "" {
|
||||
pg, err := pgstore.New(context.Background(), *pgDSN)
|
||||
if err != nil {
|
||||
log.Fatalf("pgstore: %v", err)
|
||||
}
|
||||
store = pg
|
||||
} else {
|
||||
fss, err := fsstore.New(cachePath)
|
||||
if err != nil {
|
||||
log.Fatalf("fsstore: %v", err)
|
||||
}
|
||||
store = fss
|
||||
}
|
||||
var store storage.Store = fss
|
||||
|
||||
srv := &server{
|
||||
store: store,
|
||||
|
||||
9
go.mod
9
go.mod
@@ -3,7 +3,16 @@ module github.com/webinstall/webi-installers
|
||||
go 1.26.1
|
||||
|
||||
require (
|
||||
github.com/jackc/pgx/v5 v5.9.2
|
||||
github.com/joho/godotenv v1.5.1
|
||||
github.com/jszwec/csvutil v1.10.0
|
||||
github.com/therootcompany/golib/http/middleware/v2 v2.0.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
golang.org/x/sync v0.17.0 // indirect
|
||||
golang.org/x/text v0.29.0 // indirect
|
||||
)
|
||||
|
||||
26
go.sum
26
go.sum
@@ -1,6 +1,32 @@
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
||||
github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw=
|
||||
github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
|
||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||
github.com/jszwec/csvutil v1.10.0 h1:upMDUxhQKqZ5ZDCs/wy+8Kib8rZR8I8lOR34yJkdqhI=
|
||||
github.com/jszwec/csvutil v1.10.0/go.mod h1:/E4ONrmGkwmWsk9ae9jpXnv9QT8pLHEPcCirMFhxG9I=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/therootcompany/golib/http/middleware/v2 v2.0.1 h1:VNKpHcwyEW7cMct7/eO4fyrxwIQk2ycb6juVXSPs2Sk=
|
||||
github.com/therootcompany/golib/http/middleware/v2 v2.0.1/go.mod h1:g5gb9qBidw74nW6/mwIauTKMpOKchiN2l0gt5qzJ2aQ=
|
||||
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
|
||||
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
|
||||
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
295
internal/storage/pgstore/pgstore.go
Normal file
295
internal/storage/pgstore/pgstore.go
Normal file
@@ -0,0 +1,295 @@
|
||||
// Package pgstore implements [storage.Store] on PostgreSQL.
|
||||
//
|
||||
// Schema uses double-buffering: two asset generations per package (0 and 1).
|
||||
// The active generation pointer in webi_packages is updated atomically on
|
||||
// Commit, so readers always see a complete consistent snapshot.
|
||||
//
|
||||
// Write path:
|
||||
//
|
||||
// BeginRefresh → clears inactive generation, returns tx
|
||||
// Put → stages assets in-memory
|
||||
// Commit → bulk-inserts assets (COPY), swaps generation pointer
|
||||
//
|
||||
// Read path:
|
||||
//
|
||||
// Load → reads active generation from webi_packages, fetches assets
|
||||
//
|
||||
// Connection string format: standard libpq / pgx DSN, e.g.:
|
||||
//
|
||||
// postgres://user:pass@host/dbname?sslmode=require
|
||||
// host=localhost user=webi dbname=webi sslmode=disable
|
||||
package pgstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"github.com/webinstall/webi-installers/internal/storage"
|
||||
)
|
||||
|
||||
// Schema holds the DDL for creating the required tables.
|
||||
// Run once on startup or deploy to ensure the schema exists.
|
||||
const Schema = `
|
||||
CREATE TABLE IF NOT EXISTS webi_packages (
|
||||
name TEXT NOT NULL PRIMARY KEY,
|
||||
active_gen SMALLINT NOT NULL DEFAULT 0,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS webi_assets (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
pkg TEXT NOT NULL,
|
||||
gen SMALLINT NOT NULL,
|
||||
filename TEXT NOT NULL DEFAULT '',
|
||||
version TEXT NOT NULL DEFAULT '',
|
||||
lts BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
channel TEXT NOT NULL DEFAULT '',
|
||||
date TEXT NOT NULL DEFAULT '',
|
||||
os TEXT NOT NULL DEFAULT '',
|
||||
arch TEXT NOT NULL DEFAULT '',
|
||||
libc TEXT NOT NULL DEFAULT '',
|
||||
format TEXT NOT NULL DEFAULT '',
|
||||
download TEXT NOT NULL DEFAULT '',
|
||||
extra TEXT NOT NULL DEFAULT '',
|
||||
variants TEXT[] NOT NULL DEFAULT '{}'
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS webi_assets_pkg_gen ON webi_assets (pkg, gen);
|
||||
`
|
||||
|
||||
// Store is a PostgreSQL-backed asset store.
|
||||
type Store struct {
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
// New opens a connection pool to the given DSN and applies the schema.
|
||||
// Returns an error if the connection or schema creation fails.
|
||||
func New(ctx context.Context, dsn string) (*Store, error) {
|
||||
cfg, err := pgxpool.ParseConfig(dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pgstore: parse dsn: %w", err)
|
||||
}
|
||||
|
||||
pool, err := pgxpool.NewWithConfig(ctx, cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pgstore: connect: %w", err)
|
||||
}
|
||||
|
||||
if err := applySchema(ctx, pool); err != nil {
|
||||
pool.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Store{pool: pool}, nil
|
||||
}
|
||||
|
||||
// Close releases the connection pool.
|
||||
func (s *Store) Close() {
|
||||
s.pool.Close()
|
||||
}
|
||||
|
||||
// ListPackages returns the names of all packages in the store.
|
||||
func (s *Store) ListPackages(ctx context.Context) ([]string, error) {
|
||||
rows, err := s.pool.Query(ctx,
|
||||
`SELECT name FROM webi_packages ORDER BY name`,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pgstore: list packages: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var pkgs []string
|
||||
for rows.Next() {
|
||||
var name string
|
||||
if err := rows.Scan(&name); err != nil {
|
||||
return nil, fmt.Errorf("pgstore: scan package name: %w", err)
|
||||
}
|
||||
pkgs = append(pkgs, name)
|
||||
}
|
||||
return pkgs, rows.Err()
|
||||
}
|
||||
|
||||
// Load returns all assets for a package using the active generation.
|
||||
// Returns nil (not an error) if the package is not cached.
|
||||
func (s *Store) Load(ctx context.Context, pkg string) (*storage.PackageData, error) {
|
||||
// Fetch active generation and updated_at.
|
||||
var gen int16
|
||||
var updatedAt time.Time
|
||||
err := s.pool.QueryRow(ctx,
|
||||
`SELECT active_gen, updated_at FROM webi_packages WHERE name = $1`,
|
||||
pkg,
|
||||
).Scan(&gen, &updatedAt)
|
||||
if err == pgx.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pgstore: load %s: %w", pkg, err)
|
||||
}
|
||||
|
||||
// Fetch all assets for this generation.
|
||||
rows, err := s.pool.Query(ctx, `
|
||||
SELECT filename, version, lts, channel, date,
|
||||
os, arch, libc, format, download, extra, variants
|
||||
FROM webi_assets
|
||||
WHERE pkg = $1 AND gen = $2
|
||||
ORDER BY id
|
||||
`, pkg, gen)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pgstore: load assets %s: %w", pkg, err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var assets []storage.Asset
|
||||
for rows.Next() {
|
||||
var a storage.Asset
|
||||
if err := rows.Scan(
|
||||
&a.Filename, &a.Version, &a.LTS, &a.Channel, &a.Date,
|
||||
&a.OS, &a.Arch, &a.Libc, &a.Format, &a.Download,
|
||||
&a.Extra, &a.Variants,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("pgstore: scan asset %s: %w", pkg, err)
|
||||
}
|
||||
assets = append(assets, a)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("pgstore: rows %s: %w", pkg, err)
|
||||
}
|
||||
|
||||
return &storage.PackageData{
|
||||
Assets: assets,
|
||||
UpdatedAt: updatedAt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// BeginRefresh starts a write transaction for a package.
|
||||
// It determines the inactive generation and clears it, ready for new data.
|
||||
func (s *Store) BeginRefresh(ctx context.Context, pkg string) (storage.RefreshTx, error) {
|
||||
// Determine which generation to write into (the inactive one).
|
||||
var activeGen int16
|
||||
err := s.pool.QueryRow(ctx,
|
||||
`SELECT active_gen FROM webi_packages WHERE name = $1`,
|
||||
pkg,
|
||||
).Scan(&activeGen)
|
||||
if err != nil && err != pgx.ErrNoRows {
|
||||
return nil, fmt.Errorf("pgstore: begin refresh %s: %w", pkg, err)
|
||||
}
|
||||
// If package doesn't exist yet, activeGen defaults to 0 and we write to gen 1.
|
||||
// If package exists, we write to the inactive generation (1 - activeGen).
|
||||
var writeGen int16
|
||||
if err == pgx.ErrNoRows {
|
||||
writeGen = 1
|
||||
} else {
|
||||
writeGen = 1 - activeGen
|
||||
}
|
||||
|
||||
// Clear the write generation so we start fresh.
|
||||
if _, err := s.pool.Exec(ctx,
|
||||
`DELETE FROM webi_assets WHERE pkg = $1 AND gen = $2`,
|
||||
pkg, writeGen,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("pgstore: clear gen %d for %s: %w", writeGen, pkg, err)
|
||||
}
|
||||
|
||||
return &refreshTx{
|
||||
pool: s.pool,
|
||||
pkg: pkg,
|
||||
gen: writeGen,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// refreshTx is an in-progress write for one package.
|
||||
type refreshTx struct {
|
||||
pool *pgxpool.Pool
|
||||
pkg string
|
||||
gen int16
|
||||
assets []storage.Asset
|
||||
}
|
||||
|
||||
// Put stages assets for writing. May be called multiple times.
|
||||
func (tx *refreshTx) Put(assets []storage.Asset) error {
|
||||
tx.assets = append(tx.assets, assets...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Commit bulk-inserts all staged assets, then atomically swaps the
|
||||
// active generation pointer in webi_packages.
|
||||
func (tx *refreshTx) Commit(ctx context.Context) error {
|
||||
if len(tx.assets) == 0 {
|
||||
return tx.swapGeneration(ctx)
|
||||
}
|
||||
|
||||
// Build rows for pgx.CopyFromRows.
|
||||
rows := make([][]any, len(tx.assets))
|
||||
for i, a := range tx.assets {
|
||||
variants := a.Variants
|
||||
if variants == nil {
|
||||
variants = []string{}
|
||||
}
|
||||
rows[i] = []any{
|
||||
tx.pkg,
|
||||
tx.gen,
|
||||
a.Filename,
|
||||
a.Version,
|
||||
a.LTS,
|
||||
a.Channel,
|
||||
a.Date,
|
||||
a.OS,
|
||||
a.Arch,
|
||||
a.Libc,
|
||||
a.Format,
|
||||
a.Download,
|
||||
a.Extra,
|
||||
variants,
|
||||
}
|
||||
}
|
||||
|
||||
cols := []string{
|
||||
"pkg", "gen",
|
||||
"filename", "version", "lts", "channel", "date",
|
||||
"os", "arch", "libc", "format", "download", "extra", "variants",
|
||||
}
|
||||
|
||||
_, err := tx.pool.CopyFrom(ctx,
|
||||
pgx.Identifier{"webi_assets"},
|
||||
cols,
|
||||
pgx.CopyFromRows(rows),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("pgstore: copy assets %s: %w", tx.pkg, err)
|
||||
}
|
||||
|
||||
return tx.swapGeneration(ctx)
|
||||
}
|
||||
|
||||
// swapGeneration atomically updates the active generation pointer.
|
||||
func (tx *refreshTx) swapGeneration(ctx context.Context) error {
|
||||
_, err := tx.pool.Exec(ctx, `
|
||||
INSERT INTO webi_packages (name, active_gen, updated_at)
|
||||
VALUES ($1, $2, now())
|
||||
ON CONFLICT (name)
|
||||
DO UPDATE SET active_gen = $2, updated_at = now()
|
||||
`, tx.pkg, tx.gen)
|
||||
if err != nil {
|
||||
return fmt.Errorf("pgstore: swap gen %s: %w", tx.pkg, err)
|
||||
}
|
||||
tx.assets = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// Rollback discards all staged assets without writing anything.
|
||||
func (tx *refreshTx) Rollback() error {
|
||||
tx.assets = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// applySchema runs the schema DDL idempotently.
|
||||
func applySchema(ctx context.Context, pool *pgxpool.Pool) error {
|
||||
if _, err := pool.Exec(ctx, Schema); err != nil {
|
||||
return fmt.Errorf("pgstore: apply schema: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user