mirror of
https://github.com/clastix/kamaji.git
synced 2026-02-14 18:10:03 +00:00
refactor: abstracting datastore configuration retrieval
This commit is contained in:
@@ -1,104 +0,0 @@
|
||||
// Copyright 2022 Clastix Labs
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
|
||||
"github.com/clastix/kamaji/internal/datastore"
|
||||
)
|
||||
|
||||
func (r *TenantControlPlaneReconciler) getStorageConnection(ctx context.Context, ds kamajiv1alpha1.DataStore) (datastore.Connection, error) {
|
||||
ca, err := ds.Spec.TLSConfig.CertificateAuthority.Certificate.GetContent(ctx, r.Client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
crt, err := ds.Spec.TLSConfig.ClientCertificate.Certificate.GetContent(ctx, r.Client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key, err := ds.Spec.TLSConfig.ClientCertificate.PrivateKey.GetContent(ctx, r.Client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rootCAs := x509.NewCertPool()
|
||||
if ok := rootCAs.AppendCertsFromPEM(ca); !ok {
|
||||
return nil, fmt.Errorf("error create root CA for the DB connector")
|
||||
}
|
||||
|
||||
certificate, err := tls.X509KeyPair(crt, key)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot retrieve x.509 key pair from the Kine Secret")
|
||||
}
|
||||
|
||||
var user, password string
|
||||
if auth := ds.Spec.BasicAuth; auth != nil {
|
||||
u, err := auth.Username.GetContent(ctx, r.Client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
user = string(u)
|
||||
|
||||
p, err := auth.Password.GetContent(ctx, r.Client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
password = string(p)
|
||||
}
|
||||
|
||||
eps := make([]datastore.ConnectionEndpoint, 0, len(ds.Spec.Endpoints))
|
||||
|
||||
for _, ep := range ds.Spec.Endpoints {
|
||||
host, stringPort, err := net.SplitHostPort(ep)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot retrieve host-port pair from DataStore endpoints")
|
||||
}
|
||||
|
||||
port, err := strconv.Atoi(stringPort)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot convert port from string for the given DataStore")
|
||||
}
|
||||
|
||||
eps = append(eps, datastore.ConnectionEndpoint{
|
||||
Host: host,
|
||||
Port: port,
|
||||
})
|
||||
}
|
||||
|
||||
cc := datastore.ConnectionConfig{
|
||||
User: user,
|
||||
Password: password,
|
||||
Endpoints: eps,
|
||||
TLSConfig: &tls.Config{
|
||||
RootCAs: rootCAs,
|
||||
Certificates: []tls.Certificate{certificate},
|
||||
},
|
||||
}
|
||||
|
||||
switch ds.Spec.Driver {
|
||||
case kamajiv1alpha1.KineMySQLDriver:
|
||||
cc.TLSConfig.ServerName = cc.Endpoints[0].Host
|
||||
|
||||
return datastore.NewMySQLConnection(cc)
|
||||
case kamajiv1alpha1.KinePostgreSQLDriver:
|
||||
cc.TLSConfig.ServerName = cc.Endpoints[0].Host
|
||||
//nolint:contextcheck
|
||||
return datastore.NewPostgreSQLConnection(cc)
|
||||
case kamajiv1alpha1.EtcdDriver:
|
||||
return datastore.NewETCDConnection(cc)
|
||||
default:
|
||||
return nil, fmt.Errorf("%s is not a valid driver", ds.Spec.Driver)
|
||||
}
|
||||
}
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
|
||||
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
|
||||
"github.com/clastix/kamaji/controllers/finalizers"
|
||||
"github.com/clastix/kamaji/internal/datastore"
|
||||
kamajierrors "github.com/clastix/kamaji/internal/errors"
|
||||
"github.com/clastix/kamaji/internal/resources"
|
||||
)
|
||||
@@ -81,7 +82,7 @@ func (r *TenantControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.R
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
dsConnection, err := r.getStorageConnection(ctx, *ds)
|
||||
dsConnection, err := datastore.NewStorageConnection(ctx, r.Client, *ds)
|
||||
if err != nil {
|
||||
log.Error(err, "cannot generate the DataStore connection for the given instance")
|
||||
|
||||
|
||||
55
internal/datastore/connection.go
Normal file
55
internal/datastore/connection.go
Normal file
@@ -0,0 +1,55 @@
|
||||
// Copyright 2022 Clastix Labs
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package datastore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
|
||||
)
|
||||
|
||||
func NewStorageConnection(ctx context.Context, client client.Client, ds kamajiv1alpha1.DataStore) (Connection, error) {
|
||||
cc, err := NewConnectionConfig(ctx, client, ds)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to create connection config object")
|
||||
}
|
||||
|
||||
switch ds.Spec.Driver {
|
||||
case kamajiv1alpha1.KineMySQLDriver:
|
||||
cc.TLSConfig.ServerName = cc.Endpoints[0].Host
|
||||
cc.Parameters = map[string][]string{
|
||||
"multiStatements": {"true"},
|
||||
}
|
||||
|
||||
return NewMySQLConnection(*cc)
|
||||
case kamajiv1alpha1.KinePostgreSQLDriver:
|
||||
cc.TLSConfig.ServerName = cc.Endpoints[0].Host
|
||||
//nolint:contextcheck
|
||||
return NewPostgreSQLConnection(*cc)
|
||||
case kamajiv1alpha1.EtcdDriver:
|
||||
return NewETCDConnection(*cc)
|
||||
default:
|
||||
return nil, fmt.Errorf("%s is not a valid driver", ds.Spec.Driver)
|
||||
}
|
||||
}
|
||||
|
||||
type Connection interface {
|
||||
CreateUser(ctx context.Context, user, password string) error
|
||||
CreateDB(ctx context.Context, dbName string) error
|
||||
GrantPrivileges(ctx context.Context, user, dbName string) error
|
||||
UserExists(ctx context.Context, user string) (bool, error)
|
||||
DBExists(ctx context.Context, dbName string) (bool, error)
|
||||
GrantPrivilegesExists(ctx context.Context, user, dbName string) (bool, error)
|
||||
DeleteUser(ctx context.Context, user string) error
|
||||
DeleteDB(ctx context.Context, dbName string) error
|
||||
RevokePrivileges(ctx context.Context, user, dbName string) error
|
||||
GetConnectionString() string
|
||||
Close() error
|
||||
Check(ctx context.Context) error
|
||||
Driver() string
|
||||
}
|
||||
@@ -6,7 +6,15 @@ package datastore
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
|
||||
)
|
||||
|
||||
type ConnectionEndpoint struct {
|
||||
@@ -27,6 +35,77 @@ type ConnectionConfig struct {
|
||||
Parameters map[string][]string
|
||||
}
|
||||
|
||||
func NewConnectionConfig(ctx context.Context, client client.Client, ds kamajiv1alpha1.DataStore) (*ConnectionConfig, error) {
|
||||
ca, err := ds.Spec.TLSConfig.CertificateAuthority.Certificate.GetContent(ctx, client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
crt, err := ds.Spec.TLSConfig.ClientCertificate.Certificate.GetContent(ctx, client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key, err := ds.Spec.TLSConfig.ClientCertificate.PrivateKey.GetContent(ctx, client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rootCAs := x509.NewCertPool()
|
||||
if ok := rootCAs.AppendCertsFromPEM(ca); !ok {
|
||||
return nil, fmt.Errorf("error create root CA for the DB connector")
|
||||
}
|
||||
|
||||
certificate, err := tls.X509KeyPair(crt, key)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot retrieve x.509 key pair from the Kine Secret")
|
||||
}
|
||||
|
||||
var user, password string
|
||||
if auth := ds.Spec.BasicAuth; auth != nil {
|
||||
u, err := auth.Username.GetContent(ctx, client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
user = string(u)
|
||||
|
||||
p, err := auth.Password.GetContent(ctx, client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
password = string(p)
|
||||
}
|
||||
|
||||
eps := make([]ConnectionEndpoint, 0, len(ds.Spec.Endpoints))
|
||||
|
||||
for _, ep := range ds.Spec.Endpoints {
|
||||
host, stringPort, err := net.SplitHostPort(ep)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot retrieve host-port pair from DataStore endpoints")
|
||||
}
|
||||
|
||||
port, err := strconv.Atoi(stringPort)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot convert port from string for the given DataStore")
|
||||
}
|
||||
|
||||
eps = append(eps, ConnectionEndpoint{
|
||||
Host: host,
|
||||
Port: port,
|
||||
})
|
||||
}
|
||||
|
||||
return &ConnectionConfig{
|
||||
User: user,
|
||||
Password: password,
|
||||
Endpoints: eps,
|
||||
TLSConfig: &tls.Config{
|
||||
RootCAs: rootCAs,
|
||||
Certificates: []tls.Certificate{certificate},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (config ConnectionConfig) getDataSourceNameUserPassword() string {
|
||||
if config.User == "" {
|
||||
return ""
|
||||
@@ -38,19 +117,3 @@ func (config ConnectionConfig) getDataSourceNameUserPassword() string {
|
||||
|
||||
return fmt.Sprintf("%s:%s@", config.User, config.Password)
|
||||
}
|
||||
|
||||
type Connection interface {
|
||||
CreateUser(ctx context.Context, user, password string) error
|
||||
CreateDB(ctx context.Context, dbName string) error
|
||||
GrantPrivileges(ctx context.Context, user, dbName string) error
|
||||
UserExists(ctx context.Context, user string) (bool, error)
|
||||
DBExists(ctx context.Context, dbName string) (bool, error)
|
||||
GrantPrivilegesExists(ctx context.Context, user, dbName string) (bool, error)
|
||||
DeleteUser(ctx context.Context, user string) error
|
||||
DeleteDB(ctx context.Context, dbName string) error
|
||||
RevokePrivileges(ctx context.Context, user, dbName string) error
|
||||
GetConnectionString() string
|
||||
Close() error
|
||||
Check(ctx context.Context) error
|
||||
Driver() string
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user