mirror of
https://github.com/clastix/kamaji.git
synced 2026-02-14 18:10:03 +00:00
* refactor: migrate error packages from pkg/errors to stdlib Replace github.com/pkg/errors with Go standard library error handling in foundation error packages: - internal/datastore/errors: errors.Wrap -> fmt.Errorf with %w - internal/errors: errors.As -> stdlib errors.As - controllers/soot/controllers/errors: errors.New -> stdlib errors.New Part 1 of 4 in the pkg/errors migration. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor: migrate datastore package from pkg/errors to stdlib Replace github.com/pkg/errors with Go standard library error handling in the datastore layer: - connection.go: errors.Wrap -> fmt.Errorf with %w - datastore.go: errors.Wrap -> fmt.Errorf with %w - etcd.go: goerrors alias removed, use stdlib errors.As - nats.go: errors.Wrap/Is/New -> stdlib equivalents - postgresql.go: goerrors.Wrap -> fmt.Errorf with %w Part 2 of 4 in the pkg/errors migration. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor: migrate internal packages from pkg/errors to stdlib (partial) Replace github.com/pkg/errors with Go standard library error handling in internal packages: - internal/builders/controlplane: errors.Wrap -> fmt.Errorf - internal/crypto: errors.Wrap -> fmt.Errorf - internal/kubeadm: errors.Wrap/Wrapf -> fmt.Errorf - internal/upgrade: errors.Wrap -> fmt.Errorf - internal/webhook: errors.Wrap -> fmt.Errorf Part 3 of 4 in the pkg/errors migration. Remaining files: internal/resources/*.go (8 files, 42 occurrences) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor(resources): migrate from pkg/errors to stdlib Replace github.com/pkg/errors with Go standard library: - errors.Wrap(err, msg) → fmt.Errorf("msg: %w", err) - errors.New(msg) → errors.New(msg) Files migrated: - internal/resources/kubeadm_phases.go - internal/resources/kubeadm_upgrade.go - internal/resources/kubeadm_utils.go - internal/resources/datastore/datastore_multitenancy.go - internal/resources/datastore/datastore_setup.go - internal/resources/datastore/datastore_storage_config.go - internal/resources/addons/coredns.go - internal/resources/addons/kube_proxy.go Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor(controllers): migrate from pkg/errors to stdlib Replace github.com/pkg/errors with Go standard library: - errors.Wrap(err, msg) → fmt.Errorf("msg: %w", err) - errors.New(msg) → errors.New(msg) (stdlib) - errors.Is/As → errors.Is/As (stdlib) Files migrated: - controllers/datastore_controller.go - controllers/kubeconfiggenerator_controller.go - controllers/tenantcontrolplane_controller.go - controllers/telemetry_controller.go - controllers/certificate_lifecycle_controller.go Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor(soot): migrate from pkg/errors to stdlib Replace github.com/pkg/errors with Go standard library: - errors.Is() now uses stdlib errors.Is() Files migrated: - controllers/soot/controllers/kubeproxy.go - controllers/soot/controllers/migrate.go - controllers/soot/controllers/coredns.go - controllers/soot/controllers/konnectivity.go - controllers/soot/controllers/kubeadm_phase.go Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor(api,cmd): migrate from pkg/errors to stdlib Replace github.com/pkg/errors with Go standard library: - errors.Wrap(err, msg) → fmt.Errorf("msg: %w", err) Files migrated: - api/v1alpha1/tenantcontrolplane_funcs.go - cmd/utils/k8s_version.go Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * chore: run go mod tidy after pkg/errors migration The github.com/pkg/errors package moved from direct to indirect dependency. It remains as an indirect dependency because other packages in the dependency tree still use it. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(datastore): use errors.Is for sentinel error comparison The stdlib errors.As expects a pointer to a concrete error type, not a pointer to an error value. For comparing against sentinel errors like rpctypes.ErrGRPCUserNotFound, errors.Is should be used instead. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: resolve golangci-lint errors - Fix GCI import formatting (remove extra blank lines between groups) - Use errors.Is instead of errors.As for mutex sentinel errors Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(errors): use proper variable declarations for errors.As The errors.As function requires a pointer to an assignable variable, not a pointer to a composite literal. The previous pattern `errors.As(err, &SomeError{})` creates a pointer to a temporary value which errors.As cannot reliably use for assignment. This fix declares proper variables for each error type and passes their addresses to errors.As, ensuring correct error chain matching. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(datastore/etcd): use rpctypes.Error() for gRPC error comparison The etcd gRPC status errors (ErrGRPCUserNotFound, ErrGRPCRoleNotFound) cannot be compared directly using errors.Is() because they are wrapped in gRPC status errors during transmission. The etcd rpctypes package provides: - ErrGRPC* constants: server-side gRPC status errors - Err* constants (without GRPC prefix): client-side comparable errors - Error() function: converts gRPC errors to comparable EtcdError values The correct pattern is to use rpctypes.Error(err) to normalize the received error, then compare against client-side error constants like rpctypes.ErrUserNotFound. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
285 lines
8.9 KiB
Go
285 lines
8.9 KiB
Go
// Copyright 2022 Clastix Labs
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package datastore
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/go-pg/pg/v10"
|
|
|
|
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
|
|
"github.com/clastix/kamaji/internal/datastore/errors"
|
|
)
|
|
|
|
const (
|
|
postgresqlFetchDBStatement = "SELECT FROM pg_database WHERE datname = ?"
|
|
postgresqlCreateDBStatement = `CREATE DATABASE "%s"`
|
|
postgresqlUserExists = "SELECT 1 FROM pg_roles WHERE rolname = ?"
|
|
postgresqlCreateUserStatement = `CREATE ROLE "%s" LOGIN PASSWORD ?`
|
|
postgresqlShowGrantsStatement = "SELECT has_database_privilege(rolname, ?, 'create') from pg_roles where rolcanlogin and rolname = ?"
|
|
postgresqlShowOwnershipStatement = "SELECT 't' FROM pg_catalog.pg_database AS d WHERE d.datname = ? AND pg_catalog.pg_get_userbyid(d.datdba) = ?"
|
|
postgresqlShowTableOwnershipStatement = "SELECT 't' from pg_tables where tableowner = ? AND tablename = ?"
|
|
postgresqlKineTableExistsStatement = "SELECT 't' FROM pg_tables WHERE schemaname = ? AND tablename = ?"
|
|
postgresqlGrantPrivilegesStatement = `GRANT CONNECT, CREATE ON DATABASE "%s" TO "%s"`
|
|
postgresqlChangeOwnerStatement = `ALTER DATABASE "%s" OWNER TO "%s"`
|
|
postgresqlRevokePrivilegesStatement = `REVOKE ALL PRIVILEGES ON DATABASE "%s" FROM "%s"`
|
|
postgresqlDropRoleStatement = `DROP ROLE "%s"`
|
|
postgresqlDropDBStatement = `DROP DATABASE "%s" WITH (FORCE)`
|
|
)
|
|
|
|
type PostgreSQLConnection struct {
|
|
db *pg.DB
|
|
connection ConnectionEndpoint
|
|
rootUser string
|
|
switchDatabaseFn func(dbName string) *pg.DB
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) Migrate(ctx context.Context, tcp kamajiv1alpha1.TenantControlPlane, target Connection) error {
|
|
// Ensuring the connection is working as expected
|
|
if err := target.Check(ctx); err != nil {
|
|
return fmt.Errorf("unable to check target datastore: %w", err)
|
|
}
|
|
// Creating the target schema if it doesn't exist
|
|
if ok, _ := target.DBExists(ctx, tcp.Status.Storage.Setup.Schema); !ok {
|
|
if err := target.CreateDB(ctx, tcp.Status.Storage.Setup.Schema); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
targetConn := target.(*PostgreSQLConnection).switchDatabaseFn(tcp.Status.Storage.Setup.Schema) //nolint:forcetypeassert
|
|
|
|
err := targetConn.RunInTransaction(ctx, func(tx *pg.Tx) error {
|
|
for _, stm := range []string{
|
|
`CREATE TABLE IF NOT EXISTS kine (
|
|
id SERIAL PRIMARY KEY,
|
|
name VARCHAR(630),
|
|
created INTEGER,
|
|
deleted INTEGER,
|
|
create_revision INTEGER,
|
|
prev_revision INTEGER,
|
|
lease INTEGER,
|
|
value bytea,
|
|
old_value bytea
|
|
)`,
|
|
`TRUNCATE TABLE kine`,
|
|
`CREATE INDEX IF NOT EXISTS kine_name_index ON kine (name)`,
|
|
`CREATE INDEX IF NOT EXISTS kine_name_id_index ON kine (name,id)`,
|
|
`CREATE INDEX IF NOT EXISTS kine_id_deleted_index ON kine (id,deleted)`,
|
|
`CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`,
|
|
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
|
|
} {
|
|
if _, err := tx.ExecContext(ctx, stm); err != nil {
|
|
return fmt.Errorf("unable to perform schema creation: %w", err)
|
|
}
|
|
}
|
|
// Dumping the old datastore in a local buffer
|
|
var buf bytes.Buffer
|
|
|
|
if _, err := r.switchDatabaseFn(tcp.Status.Storage.Setup.Schema).WithContext(ctx).CopyTo(&buf, "COPY kine TO STDOUT"); err != nil {
|
|
return fmt.Errorf("unable to copy from the origin datastore: %w", err)
|
|
}
|
|
|
|
if _, err := tx.CopyFrom(&buf, "COPY kine FROM STDIN"); err != nil {
|
|
return fmt.Errorf("unable to copy to the target datastore: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("unable to perform migration transaction: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func NewPostgreSQLConnection(config ConnectionConfig) (Connection, error) {
|
|
opt := &pg.Options{
|
|
Addr: config.Endpoints[0].String(),
|
|
Database: config.DBName,
|
|
User: config.User,
|
|
Password: config.Password,
|
|
TLSConfig: config.TLSConfig,
|
|
}
|
|
|
|
fn := func(dbName string) *pg.DB {
|
|
o := opt
|
|
o.Database = dbName
|
|
|
|
return pg.Connect(o)
|
|
}
|
|
|
|
return &PostgreSQLConnection{
|
|
db: pg.Connect(opt),
|
|
switchDatabaseFn: fn,
|
|
rootUser: config.User,
|
|
connection: config.Endpoints[0],
|
|
}, nil
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) Driver() string {
|
|
return string(kamajiv1alpha1.KinePostgreSQLDriver)
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) UserExists(ctx context.Context, user string) (bool, error) {
|
|
res, err := r.db.ExecContext(ctx, postgresqlUserExists, user)
|
|
if err != nil {
|
|
return false, errors.NewCheckUserExistsError(err)
|
|
}
|
|
|
|
return res.RowsReturned() > 0, nil
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) CreateUser(ctx context.Context, user, password string) error {
|
|
_, err := r.db.ExecContext(ctx, fmt.Sprintf(postgresqlCreateUserStatement, user), password)
|
|
if err != nil {
|
|
return errors.NewCreateUserError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) DBExists(ctx context.Context, dbName string) (bool, error) {
|
|
rows, err := r.db.ExecContext(ctx, postgresqlFetchDBStatement, dbName)
|
|
if err != nil {
|
|
return false, errors.NewCheckDatabaseExistError(err)
|
|
}
|
|
|
|
return rows.RowsReturned() > 0, nil
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) CreateDB(ctx context.Context, dbName string) error {
|
|
_, err := r.db.ExecContext(ctx, fmt.Sprintf(postgresqlCreateDBStatement, dbName))
|
|
if err != nil {
|
|
return errors.NewCreateDBError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) GrantPrivilegesExists(ctx context.Context, user, dbName string) (bool, error) {
|
|
var hasDatabasePrivilege string
|
|
|
|
_, err := r.db.QueryContext(ctx, pg.Scan(&hasDatabasePrivilege), postgresqlShowGrantsStatement, dbName, user)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "does not exist") {
|
|
return false, nil
|
|
}
|
|
|
|
return false, errors.NewCheckGrantExistsError(err)
|
|
}
|
|
|
|
var isOwner string
|
|
|
|
if _, err = r.db.QueryContext(ctx, pg.Scan(&isOwner), postgresqlShowOwnershipStatement, dbName, user); err != nil {
|
|
return false, errors.NewCheckGrantExistsError(err)
|
|
}
|
|
|
|
var isTableOwner string
|
|
|
|
dbConn := r.switchDatabaseFn(dbName)
|
|
defer dbConn.Close()
|
|
|
|
tableExists, err := r.kineTableExists(ctx, dbConn)
|
|
if err != nil {
|
|
return false, errors.NewGrantPrivilegesError(err)
|
|
}
|
|
|
|
if tableExists {
|
|
if _, err = dbConn.QueryContext(ctx, pg.Scan(&isTableOwner), postgresqlShowTableOwnershipStatement, user, "kine"); err != nil {
|
|
return false, errors.NewCheckGrantExistsError(err)
|
|
}
|
|
|
|
return hasDatabasePrivilege == "t" && isOwner == "t" && isTableOwner == "t", nil
|
|
}
|
|
|
|
return hasDatabasePrivilege == "t" && isOwner == "t", nil
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) GrantPrivileges(ctx context.Context, user, dbName string) error {
|
|
if _, err := r.db.ExecContext(ctx, fmt.Sprintf(postgresqlGrantPrivilegesStatement, dbName, user)); err != nil {
|
|
return errors.NewGrantPrivilegesError(err)
|
|
}
|
|
|
|
dbConn := r.switchDatabaseFn(dbName)
|
|
defer dbConn.Close()
|
|
|
|
if _, err := dbConn.ExecContext(ctx, fmt.Sprintf(postgresqlChangeOwnerStatement, dbName, user)); err != nil {
|
|
return errors.NewGrantPrivilegesError(err)
|
|
}
|
|
|
|
tableExists, err := r.kineTableExists(ctx, dbConn)
|
|
if err != nil {
|
|
return errors.NewGrantPrivilegesError(err)
|
|
}
|
|
|
|
if tableExists {
|
|
if _, err = dbConn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE kine OWNER TO %s", user)); err != nil {
|
|
return errors.NewGrantPrivilegesError(err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) DeleteUser(ctx context.Context, user string) error {
|
|
if _, err := r.db.ExecContext(ctx, fmt.Sprintf(postgresqlDropRoleStatement, user)); err != nil {
|
|
return errors.NewDeleteUserError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) DeleteDB(ctx context.Context, dbName string) error {
|
|
if err := r.GrantPrivileges(ctx, r.rootUser, dbName); err != nil {
|
|
return errors.NewCannotDeleteDatabaseError(fmt.Errorf("cannot grant privileges to root user: %w", err))
|
|
}
|
|
|
|
if _, err := r.db.ExecContext(ctx, fmt.Sprintf(postgresqlDropDBStatement, dbName)); err != nil {
|
|
return errors.NewCannotDeleteDatabaseError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) RevokePrivileges(ctx context.Context, user, dbName string) error {
|
|
if _, err := r.db.ExecContext(ctx, fmt.Sprintf(postgresqlRevokePrivilegesStatement, dbName, user)); err != nil {
|
|
return errors.NewRevokePrivilegesError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) GetConnectionString() string {
|
|
return r.connection.String()
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) Close() error {
|
|
if err := r.db.Close(); err != nil {
|
|
return errors.NewCloseConnectionError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) Check(ctx context.Context) error {
|
|
if err := r.db.Ping(ctx); err != nil {
|
|
return errors.NewCheckConnectionError(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *PostgreSQLConnection) kineTableExists(ctx context.Context, db *pg.DB) (bool, error) {
|
|
var tableExists string
|
|
|
|
if _, err := db.QueryContext(ctx, pg.Scan(&tableExists), postgresqlKineTableExistsStatement, "public", "kine"); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return tableExists == "t", nil
|
|
}
|