feat: storage homogeneity

This commit is contained in:
Dario Tranchitella
2022-08-26 21:59:54 +02:00
parent b23cbe3976
commit 1ddaeccc94
29 changed files with 975 additions and 1790 deletions

View File

@@ -8,8 +8,6 @@ import (
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/clastix/kamaji/internal/etcd"
)
// APIServerCertificatesStatus defines the observed state of ETCD Certificate for API server.
@@ -57,41 +55,30 @@ type CertificatesStatus struct {
ETCD *ETCDCertificatesStatus `json:"etcd,omitempty"`
}
// ETCDStatus defines the observed state of ETCDStatus.
type ETCDStatus struct {
Role etcd.Role `json:"role,omitempty"`
User etcd.User `json:"user,omitempty"`
}
type SQLCertificateStatus struct {
type DataStoreCertificateStatus struct {
SecretName string `json:"secretName,omitempty"`
Checksum string `json:"checksum,omitempty"`
LastUpdate metav1.Time `json:"lastUpdate,omitempty"`
}
type SQLConfigStatus struct {
type DataStoreConfigStatus struct {
SecretName string `json:"secretName,omitempty"`
Checksum string `json:"checksum,omitempty"`
}
type SQLSetupStatus struct {
type DataStoreSetupStatus struct {
Schema string `json:"schema,omitempty"`
User string `json:"user,omitempty"`
LastUpdate metav1.Time `json:"lastUpdate,omitempty"`
Checksum string `json:"checksum,omitempty"`
}
type KineStatus struct {
Driver string `json:"driver,omitempty"`
Config SQLConfigStatus `json:"config,omitempty"`
Setup SQLSetupStatus `json:"setup,omitempty"`
Certificate SQLCertificateStatus `json:"certificate,omitempty"`
}
// StorageStatus defines the observed state of StorageStatus.
type StorageStatus struct {
ETCD *ETCDStatus `json:"etcd,omitempty"`
Kine *KineStatus `json:"kine,omitempty"`
Driver string `json:"driver,omitempty"`
Config DataStoreConfigStatus `json:"config,omitempty"`
Setup DataStoreSetupStatus `json:"setup,omitempty"`
Certificate DataStoreCertificateStatus `json:"certificate,omitempty"`
}
// KubeconfigStatus contains information about the generated kubeconfig.
@@ -163,9 +150,8 @@ type AddonStatus struct {
// AddonsStatus defines the observed state of the different Addons.
type AddonsStatus struct {
CoreDNS AddonStatus `json:"coreDNS,omitempty"`
KubeProxy AddonStatus `json:"kubeProxy,omitempty"`
CoreDNS AddonStatus `json:"coreDNS,omitempty"`
KubeProxy AddonStatus `json:"kubeProxy,omitempty"`
Konnectivity KonnectivityStatus `json:"konnectivity,omitempty"`
}

View File

@@ -392,6 +392,37 @@ func (in *DataStore) DeepCopyObject() runtime.Object {
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DataStoreCertificateStatus) DeepCopyInto(out *DataStoreCertificateStatus) {
*out = *in
in.LastUpdate.DeepCopyInto(&out.LastUpdate)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataStoreCertificateStatus.
func (in *DataStoreCertificateStatus) DeepCopy() *DataStoreCertificateStatus {
if in == nil {
return nil
}
out := new(DataStoreCertificateStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DataStoreConfigStatus) DeepCopyInto(out *DataStoreConfigStatus) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataStoreConfigStatus.
func (in *DataStoreConfigStatus) DeepCopy() *DataStoreConfigStatus {
if in == nil {
return nil
}
out := new(DataStoreConfigStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DataStoreList) DeepCopyInto(out *DataStoreList) {
*out = *in
@@ -424,6 +455,22 @@ func (in *DataStoreList) DeepCopyObject() runtime.Object {
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DataStoreSetupStatus) DeepCopyInto(out *DataStoreSetupStatus) {
*out = *in
in.LastUpdate.DeepCopyInto(&out.LastUpdate)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DataStoreSetupStatus.
func (in *DataStoreSetupStatus) DeepCopy() *DataStoreSetupStatus {
if in == nil {
return nil
}
out := new(DataStoreSetupStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DataStoreSpec) DeepCopyInto(out *DataStoreSpec) {
*out = *in
@@ -529,23 +576,6 @@ func (in *ETCDCertificatesStatus) DeepCopy() *ETCDCertificatesStatus {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ETCDStatus) DeepCopyInto(out *ETCDStatus) {
*out = *in
in.Role.DeepCopyInto(&out.Role)
in.User.DeepCopyInto(&out.User)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ETCDStatus.
func (in *ETCDStatus) DeepCopy() *ETCDStatus {
if in == nil {
return nil
}
out := new(ETCDStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ExternalKubernetesObjectStatus) DeepCopyInto(out *ExternalKubernetesObjectStatus) {
*out = *in
@@ -578,24 +608,6 @@ func (in *IngressSpec) DeepCopy() *IngressSpec {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *KineStatus) DeepCopyInto(out *KineStatus) {
*out = *in
out.Config = in.Config
in.Setup.DeepCopyInto(&out.Setup)
in.Certificate.DeepCopyInto(&out.Certificate)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KineStatus.
func (in *KineStatus) DeepCopy() *KineStatus {
if in == nil {
return nil
}
out := new(KineStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *KonnectivityConfigMap) DeepCopyInto(out *KonnectivityConfigMap) {
*out = *in
@@ -906,53 +918,6 @@ func (in *PublicKeyPrivateKeyPairStatus) DeepCopy() *PublicKeyPrivateKeyPairStat
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SQLCertificateStatus) DeepCopyInto(out *SQLCertificateStatus) {
*out = *in
in.LastUpdate.DeepCopyInto(&out.LastUpdate)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQLCertificateStatus.
func (in *SQLCertificateStatus) DeepCopy() *SQLCertificateStatus {
if in == nil {
return nil
}
out := new(SQLCertificateStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SQLConfigStatus) DeepCopyInto(out *SQLConfigStatus) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQLConfigStatus.
func (in *SQLConfigStatus) DeepCopy() *SQLConfigStatus {
if in == nil {
return nil
}
out := new(SQLConfigStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SQLSetupStatus) DeepCopyInto(out *SQLSetupStatus) {
*out = *in
in.LastUpdate.DeepCopyInto(&out.LastUpdate)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQLSetupStatus.
func (in *SQLSetupStatus) DeepCopy() *SQLSetupStatus {
if in == nil {
return nil
}
out := new(SQLSetupStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SecretReference) DeepCopyInto(out *SecretReference) {
*out = *in
@@ -988,16 +953,9 @@ func (in *ServiceSpec) DeepCopy() *ServiceSpec {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *StorageStatus) DeepCopyInto(out *StorageStatus) {
*out = *in
if in.ETCD != nil {
in, out := &in.ETCD, &out.ETCD
*out = new(ETCDStatus)
(*in).DeepCopyInto(*out)
}
if in.Kine != nil {
in, out := &in.Kine, &out.Kine
*out = new(KineStatus)
(*in).DeepCopyInto(*out)
}
out.Config = in.Config
in.Setup.DeepCopyInto(&out.Setup)
in.Certificate.DeepCopyInto(&out.Certificate)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StorageStatus.

View File

@@ -12,9 +12,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/datastore"
"github.com/clastix/kamaji/internal/resources"
ds "github.com/clastix/kamaji/internal/resources/datastore"
"github.com/clastix/kamaji/internal/resources/konnectivity"
"github.com/clastix/kamaji/internal/sql"
)
type GroupResourceBuilderConfiguration struct {
@@ -22,7 +23,7 @@ type GroupResourceBuilderConfiguration struct {
log logr.Logger
tcpReconcilerConfig TenantControlPlaneReconcilerConfig
tenantControlPlane kamajiv1alpha1.TenantControlPlane
DBConnection sql.DBConnection
Connection datastore.Connection
DataStore kamajiv1alpha1.DataStore
}
@@ -31,14 +32,14 @@ type GroupDeleteableResourceBuilderConfiguration struct {
log logr.Logger
tcpReconcilerConfig TenantControlPlaneReconcilerConfig
tenantControlPlane kamajiv1alpha1.TenantControlPlane
DBConnection sql.DBConnection
connection datastore.Connection
}
// GetResources returns a list of resources that will be used to provide tenant control planes
// Currently there is only a default approach
// TODO: the idea of this function is to become a factory to return the group of resources according to the given configuration.
func GetResources(config GroupResourceBuilderConfiguration, dataStore kamajiv1alpha1.DataStore) []resources.Resource {
return getDefaultResources(config, dataStore)
func GetResources(config GroupResourceBuilderConfiguration) []resources.Resource {
return getDefaultResources(config)
}
// GetDeletableResources returns a list of resources that have to be deleted when tenant control planes are deleted
@@ -48,14 +49,14 @@ func GetDeletableResources(config GroupDeleteableResourceBuilderConfiguration, d
return getDefaultDeleteableResources(config, dataStore)
}
func getDefaultResources(config GroupResourceBuilderConfiguration, dataStore kamajiv1alpha1.DataStore) []resources.Resource {
func getDefaultResources(config GroupResourceBuilderConfiguration) []resources.Resource {
resources := append(getUpgradeResources(config.client, config.tenantControlPlane), getKubernetesServiceResources(config.client, config.tenantControlPlane)...)
resources = append(resources, getKubeadmConfigResources(config.client, getTmpDirectory(config.tcpReconcilerConfig.TmpBaseDirectory, config.tenantControlPlane), dataStore)...)
resources = append(resources, getKubeadmConfigResources(config.client, getTmpDirectory(config.tcpReconcilerConfig.TmpBaseDirectory, config.tenantControlPlane), config.DataStore)...)
resources = append(resources, getKubernetesCertificatesResources(config.client, config.log, config.tcpReconcilerConfig, config.tenantControlPlane)...)
resources = append(resources, getKubeconfigResources(config.client, config.log, config.tcpReconcilerConfig, config.tenantControlPlane)...)
resources = append(resources, getKubernetesStorageResources(config.client, config.log, config.tcpReconcilerConfig, config.DBConnection, config.tenantControlPlane, dataStore)...)
resources = append(resources, getKubernetesStorageResources(config.client, config.Connection, config.DataStore)...)
resources = append(resources, getInternalKonnectivityResources(config.client, config.log, config.tcpReconcilerConfig, config.tenantControlPlane)...)
resources = append(resources, getKubernetesDeploymentResources(config.client, config.tcpReconcilerConfig, dataStore)...)
resources = append(resources, getKubernetesDeploymentResources(config.client, config.tcpReconcilerConfig, config.DataStore)...)
resources = append(resources, getKubernetesIngressResources(config.client, config.tenantControlPlane)...)
resources = append(resources, getKubeadmPhaseResources(config.client, config.log, config.tenantControlPlane)...)
resources = append(resources, getKubeadmAddonResources(config.client, config.log, config.tenantControlPlane)...)
@@ -65,24 +66,11 @@ func getDefaultResources(config GroupResourceBuilderConfiguration, dataStore kam
}
func getDefaultDeleteableResources(config GroupDeleteableResourceBuilderConfiguration, dataStore kamajiv1alpha1.DataStore) []resources.DeleteableResource {
switch dataStore.Spec.Driver {
case kamajiv1alpha1.EtcdDriver:
return []resources.DeleteableResource{
&resources.ETCDSetupResource{
Client: config.client,
Log: config.log,
DataStore: dataStore,
},
}
case kamajiv1alpha1.KineMySQLDriver, kamajiv1alpha1.KinePostgreSQLDriver:
return []resources.DeleteableResource{
&resources.SQLSetup{
Client: config.client,
DBConnection: config.DBConnection,
},
}
default:
return []resources.DeleteableResource{}
return []resources.DeleteableResource{
&ds.Setup{
Client: config.client,
Connection: config.connection,
},
}
}
@@ -182,66 +170,30 @@ func getKubeconfigResources(c client.Client, log logr.Logger, tcpReconcilerConfi
}
}
func getKubernetesStorageResources(c client.Client, log logr.Logger, tcpReconcilerConfig TenantControlPlaneReconcilerConfig, dbConnection sql.DBConnection, tenantControlPlane kamajiv1alpha1.TenantControlPlane, ds kamajiv1alpha1.DataStore) []resources.Resource {
switch ds.Spec.Driver {
case kamajiv1alpha1.EtcdDriver:
return []resources.Resource{
&resources.ETCDCACertificatesResource{
Name: "etcd-ca-certificates",
Client: c,
Log: log,
DataStore: ds,
},
&resources.ETCDCertificatesResource{
Name: "etcd-certificates",
Client: c,
Log: log,
},
&resources.ETCDSetupResource{
Client: c,
Log: log,
DataStore: ds,
},
}
case kamajiv1alpha1.KineMySQLDriver, kamajiv1alpha1.KinePostgreSQLDriver:
return []resources.Resource{
&resources.SQLStorageConfig{
Client: c,
Name: "sql-config",
Host: dbConnection.GetHost(),
Port: dbConnection.GetPort(),
Driver: dbConnection.Driver(),
},
&resources.SQLSetup{
Client: c,
DBConnection: dbConnection,
Driver: dbConnection.Driver(),
},
&resources.SQLCertificate{
Client: c,
DataStore: ds,
},
}
default:
return []resources.Resource{}
func getKubernetesStorageResources(c client.Client, dbConnection datastore.Connection, datastore kamajiv1alpha1.DataStore) []resources.Resource {
return []resources.Resource{
&ds.Config{
Client: c,
ConnString: dbConnection.GetConnectionString(),
Driver: dbConnection.Driver(),
},
&ds.Setup{
Client: c,
Connection: dbConnection,
DataStore: datastore,
},
&ds.Certificate{
Client: c,
DataStore: datastore,
},
}
}
func getKubernetesDeploymentResources(c client.Client, tcpReconcilerConfig TenantControlPlaneReconcilerConfig, dataStore kamajiv1alpha1.DataStore) []resources.Resource {
var endpoints []string
switch dataStore.Spec.Driver {
case kamajiv1alpha1.EtcdDriver:
endpoints = dataStore.Spec.Endpoints
default:
endpoints = []string{"127.0.0.1:2379"}
}
return []resources.Resource{
&resources.KubernetesDeploymentResource{
Client: c,
ETCDEndpoints: endpoints,
DataStoreDriver: dataStore.Spec.Driver,
DataStore: dataStore,
KineContainerImage: tcpReconcilerConfig.KineContainerImage,
},
}

View File

@@ -14,26 +14,10 @@ import (
"github.com/pkg/errors"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/sql"
"github.com/clastix/kamaji/internal/datastore"
)
func (r *TenantControlPlaneReconciler) getStorageConnection(ctx context.Context, ds kamajiv1alpha1.DataStore) (sql.DBConnection, error) {
var driver sql.Driver
var dbName string
// TODO: https://github.com/clastix/kamaji/issues/67
switch ds.Spec.Driver {
case kamajiv1alpha1.EtcdDriver:
return nil, nil
case kamajiv1alpha1.KineMySQLDriver:
driver = sql.MySQL
dbName = "mysql"
case kamajiv1alpha1.KinePostgreSQLDriver:
driver = sql.PostgreSQL
default:
return nil, nil
}
func (r *TenantControlPlaneReconciler) getStorageConnection(ctx context.Context, ds kamajiv1alpha1.DataStore) (datastore.Connection, error) {
ca, err := ds.Spec.TLSConfig.CertificateAuthority.Certificate.GetContent(ctx, r.Client)
if err != nil {
return nil, err
@@ -74,29 +58,47 @@ func (r *TenantControlPlaneReconciler) getStorageConnection(ctx context.Context,
password = string(p)
}
host, stringPort, err := net.SplitHostPort(ds.Spec.Endpoints[0])
if err != nil {
return nil, errors.Wrap(err, "cannot retrieve host-port pair from DataStore endpoints")
eps := make([]datastore.ConnectionEndpoint, 0, len(ds.Spec.Endpoints))
for _, ep := range ds.Spec.Endpoints {
host, stringPort, err := net.SplitHostPort(ep)
if err != nil {
return nil, errors.Wrap(err, "cannot retrieve host-port pair from DataStore endpoints")
}
port, err := strconv.Atoi(stringPort)
if err != nil {
return nil, errors.Wrap(err, "cannot convert port from string for the given DataStore")
}
eps = append(eps, datastore.ConnectionEndpoint{
Host: host,
Port: port,
})
}
port, err := strconv.Atoi(stringPort)
if err != nil {
return nil, errors.Wrap(err, "cannot convert port from string for the given DataStore")
}
return sql.GetDBConnection(
sql.ConnectionConfig{
SQLDriver: driver,
User: user,
Password: password,
Host: host,
Port: port,
DBName: dbName,
TLSConfig: &tls.Config{
ServerName: host,
RootCAs: rootCAs,
Certificates: []tls.Certificate{certificate},
},
cc := datastore.ConnectionConfig{
User: user,
Password: password,
Endpoints: eps,
TLSConfig: &tls.Config{
RootCAs: rootCAs,
Certificates: []tls.Certificate{certificate},
},
)
}
switch ds.Spec.Driver {
case kamajiv1alpha1.KineMySQLDriver:
cc.TLSConfig.ServerName = cc.Endpoints[0].Host
return datastore.NewMySQLConnection(cc)
case kamajiv1alpha1.KinePostgreSQLDriver:
cc.TLSConfig.ServerName = cc.Endpoints[0].Host
return datastore.NewPostgreSQLConnection(cc)
case kamajiv1alpha1.EtcdDriver:
return datastore.NewETCDConnection(cc)
default:
return nil, fmt.Errorf("%s is not a valid driver", ds.Spec.Driver)
}
}

View File

@@ -50,11 +50,11 @@ type TenantControlPlaneReconcilerConfig struct {
//+kubebuilder:rbac:groups=kamaji.clastix.io,resources=tenantcontrolplanes,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=kamaji.clastix.io,resources=tenantcontrolplanes/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=kamaji.clastix.io,resources=tenantcontrolplanes/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete
func (r *TenantControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
@@ -80,17 +80,11 @@ func (r *TenantControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.R
return ctrl.Result{}, errors.Wrap(err, "cannot retrieve kamajiv1alpha.DataStore object")
}
dbConnection, err := r.getStorageConnection(ctx, ds)
dsConnection, err := r.getStorageConnection(ctx, ds)
if err != nil {
return ctrl.Result{}, err
}
defer func() {
// TODO: Currently, etcd is not accessed using this dbConnection. For that reason we need this check
// Check: https://github.com/clastix/kamaji/issues/67
if dbConnection != nil {
dbConnection.Close()
}
}()
defer dsConnection.Close()
if markedToBeDeleted {
log.Info("marked for deletion, performing clean-up")
@@ -100,12 +94,12 @@ func (r *TenantControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.R
log: log,
tcpReconcilerConfig: r.Config,
tenantControlPlane: *tenantControlPlane,
DBConnection: dbConnection,
connection: dsConnection,
}
registeredDeletableResources := GetDeletableResources(groupDeleteableResourceBuilderConfiguration, ds)
for _, resource := range registeredDeletableResources {
if err := resources.HandleDeletion(ctx, resource, tenantControlPlane); err != nil {
if err = resources.HandleDeletion(ctx, resource, tenantControlPlane); err != nil {
return ctrl.Result{}, err
}
}
@@ -113,7 +107,7 @@ func (r *TenantControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.R
if hasFinalizer {
log.Info("removing finalizer")
if err := r.RemoveFinalizer(ctx, tenantControlPlane); err != nil {
if err = r.RemoveFinalizer(ctx, tenantControlPlane); err != nil {
return ctrl.Result{}, err
}
}
@@ -133,9 +127,9 @@ func (r *TenantControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.R
tcpReconcilerConfig: r.Config,
tenantControlPlane: *tenantControlPlane,
DataStore: ds,
DBConnection: dbConnection,
Connection: dsConnection,
}
registeredResources := GetResources(groupResourceBuilderConfiguration, ds)
registeredResources := GetResources(groupResourceBuilderConfiguration)
for _, resource := range registeredResources {
result, err := resources.Handle(ctx, resource, tenantControlPlane)

View File

@@ -42,16 +42,14 @@ const (
const (
apiServerFlagsAnnotation = "kube-apiserver.kamaji.clastix.io/args"
kineContainerName = "kine"
kineVolumeChmod = "kine-config"
dataStoreCerts = "kine-config"
kineVolumeCertName = "kine-certs"
)
type Deployment struct {
Address string
ETCDEndpoints []string
ETCDCompactionInterval string
ETCDStorageType kamajiv1alpha1.Driver
KineContainerImage string
Address string
KineContainerImage string
DataStore kamajiv1alpha1.DataStore
}
func (d *Deployment) SetContainers(podSpec *corev1.PodSpec, tcp *kamajiv1alpha1.TenantControlPlane, address string) {
@@ -116,19 +114,24 @@ func (d *Deployment) buildPKIVolume(podSpec *corev1.PodSpec, tcp *kamajiv1alpha1
},
}
if d.ETCDStorageType == kamajiv1alpha1.EtcdDriver {
sources = append(sources, corev1.VolumeProjection{
Secret: d.secretProjection(tcp.Status.Certificates.ETCD.APIServer.SecretName, constants.APIServerEtcdClientCertName, constants.APIServerEtcdClientKeyName),
})
if d.DataStore.Spec.Driver == kamajiv1alpha1.EtcdDriver {
sources = append(sources, corev1.VolumeProjection{
Secret: &corev1.SecretProjection{
LocalObjectReference: corev1.LocalObjectReference{
Name: tcp.Status.Certificates.ETCD.CA.SecretName,
Name: tcp.Status.Storage.Certificate.SecretName,
},
Items: []corev1.KeyToPath{
{
Key: constants.CACertName,
Path: constants.EtcdCACertName,
Key: "ca.crt",
Path: "etcd/ca.crt",
},
{
Key: "server.crt",
Path: "etcd/server.crt",
},
{
Key: "server.key",
Path: "etcd/server.key",
},
},
},
@@ -530,7 +533,6 @@ func (d *Deployment) buildKubeAPIServerCommand(tenantControlPlane *kamajiv1alpha
"--client-ca-file": path.Join(v1beta3.DefaultCertificatesDir, constants.CACertName),
"--enable-admission-plugins": strings.Join(tenantControlPlane.Spec.Kubernetes.AdmissionControllers.ToSlice(), ","),
"--enable-bootstrap-token-auth": "true",
"--etcd-servers": strings.Join(d.ETCDEndpoints, ","),
"--service-cluster-ip-range": tenantControlPlane.Spec.NetworkProfile.ServiceCIDR,
"--kubelet-client-certificate": path.Join(v1beta3.DefaultCertificatesDir, constants.APIServerKubeletClientCertName),
"--kubelet-client-key": path.Join(v1beta3.DefaultCertificatesDir, constants.APIServerKubeletClientKeyName),
@@ -549,12 +551,23 @@ func (d *Deployment) buildKubeAPIServerCommand(tenantControlPlane *kamajiv1alpha
"--tls-private-key-file": path.Join(v1beta3.DefaultCertificatesDir, constants.APIServerKeyName),
}
if d.ETCDStorageType == kamajiv1alpha1.EtcdDriver {
desiredArgs["--etcd-cafile"] = path.Join(v1beta3.DefaultCertificatesDir, constants.EtcdCACertName)
desiredArgs["--etcd-certfile"] = path.Join(v1beta3.DefaultCertificatesDir, constants.APIServerEtcdClientCertName)
desiredArgs["--etcd-keyfile"] = path.Join(v1beta3.DefaultCertificatesDir, constants.APIServerEtcdClientKeyName)
switch d.DataStore.Spec.Driver {
case kamajiv1alpha1.KineMySQLDriver, kamajiv1alpha1.KinePostgreSQLDriver:
desiredArgs["--etcd-servers"] = "http://127.0.0.1:2379"
case kamajiv1alpha1.EtcdDriver:
httpsEndpoints := make([]string, 0, len(d.DataStore.Spec.Endpoints))
for _, ep := range d.DataStore.Spec.Endpoints {
httpsEndpoints = append(httpsEndpoints, fmt.Sprintf("https://%s", ep))
}
desiredArgs["--etcd-prefix"] = fmt.Sprintf("/%s", tenantControlPlane.GetName())
desiredArgs["--etcd-servers"] = strings.Join(httpsEndpoints, ",")
desiredArgs["--etcd-cafile"] = "/etc/kubernetes/pki/etcd/ca.crt"
desiredArgs["--etcd-certfile"] = "/etc/kubernetes/pki/etcd/server.crt"
desiredArgs["--etcd-keyfile"] = "/etc/kubernetes/pki/etcd/server.key"
}
// Order matters, here: extraArgs could try to overwrite some arguments managed by Kamaji and that would be crucial.
// Adding as first element of the array of maps, we're sure that these overrides will be sanitized by our configuration.
return utilities.MergeMaps(extraArgs, current, desiredArgs)
@@ -579,15 +592,6 @@ func (d *Deployment) secretProjection(secretName, certKeyName, keyName string) *
}
func (d *Deployment) removeKineVolumes(podSpec *corev1.PodSpec) {
if found, index := utilities.HasNamedVolume(podSpec.Volumes, kineVolumeChmod); found {
var volumes []corev1.Volume
volumes = append(volumes, podSpec.Volumes[:index]...)
volumes = append(volumes, podSpec.Volumes[index+1:]...)
podSpec.Volumes = volumes
}
if found, index := utilities.HasNamedVolume(podSpec.Volumes, kineVolumeCertName); found {
var volumes []corev1.Volume
@@ -599,25 +603,25 @@ func (d *Deployment) removeKineVolumes(podSpec *corev1.PodSpec) {
}
func (d *Deployment) buildKineVolume(podSpec *corev1.PodSpec, tcp *kamajiv1alpha1.TenantControlPlane) {
if d.ETCDStorageType == kamajiv1alpha1.EtcdDriver {
d.removeKineVolumes(podSpec)
return
}
// Adding the volume for chmod'ed Kine certificates.
found, index := utilities.HasNamedVolume(podSpec.Volumes, kineVolumeChmod)
found, index := utilities.HasNamedVolume(podSpec.Volumes, dataStoreCerts)
if !found {
podSpec.Volumes = append(podSpec.Volumes, corev1.Volume{})
index = len(podSpec.Volumes) - 1
}
podSpec.Volumes[index].Name = kineVolumeChmod
podSpec.Volumes[index].Name = dataStoreCerts
podSpec.Volumes[index].VolumeSource = corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: tcp.Status.Storage.Kine.Certificate.SecretName,
SecretName: tcp.Status.Storage.Certificate.SecretName,
DefaultMode: pointer.Int32Ptr(420),
},
}
if d.DataStore.Spec.Driver == kamajiv1alpha1.EtcdDriver {
d.removeKineVolumes(podSpec)
return
}
// Adding the volume to read Kine certificates:
// these must be subsequently fixed with a chmod due to pg issues with private key.
if found, index = utilities.HasNamedVolume(podSpec.Volumes, kineVolumeCertName); !found {
@@ -646,7 +650,7 @@ func (d *Deployment) removeKineContainers(podSpec *corev1.PodSpec) {
}
func (d *Deployment) buildKine(podSpec *corev1.PodSpec, tcp *kamajiv1alpha1.TenantControlPlane) {
if d.ETCDStorageType == kamajiv1alpha1.EtcdDriver {
if d.DataStore.Spec.Driver == kamajiv1alpha1.EtcdDriver {
d.removeKineContainers(podSpec)
return
@@ -665,11 +669,11 @@ func (d *Deployment) buildKine(podSpec *corev1.PodSpec, tcp *kamajiv1alpha1.Tena
args = utilities.ArgsFromSliceToMap(tcp.Spec.ControlPlane.Deployment.ExtraArgs.Kine)
}
switch d.ETCDStorageType {
switch d.DataStore.Spec.Driver {
case kamajiv1alpha1.KineMySQLDriver:
args["--endpoint"] = "mysql://$(DB_USER):$(DB_PASSWORD)@tcp($(DB_HOST):$(DB_PORT))/$(DB_SCHEMA)"
args["--endpoint"] = "mysql://$(DB_USER):$(DB_PASSWORD)@tcp($(DB_CONNECTION_STRING))/$(DB_SCHEMA)"
case kamajiv1alpha1.KinePostgreSQLDriver:
args["--endpoint"] = "postgres://$(DB_USER):$(DB_PASSWORD)@$(DB_HOST):$(DB_PORT)/$(DB_SCHEMA)"
args["--endpoint"] = "postgres://$(DB_USER):$(DB_PASSWORD)@$(DB_CONNECTION_STRING)/$(DB_SCHEMA)"
}
args["--ca-file"] = "/certs/ca.crt"
@@ -690,7 +694,7 @@ func (d *Deployment) buildKine(podSpec *corev1.PodSpec, tcp *kamajiv1alpha1.Tena
},
VolumeMounts: []corev1.VolumeMount{
{
Name: kineVolumeChmod,
Name: dataStoreCerts,
ReadOnly: true,
MountPath: "/kine",
},
@@ -726,7 +730,7 @@ func (d *Deployment) buildKine(podSpec *corev1.PodSpec, tcp *kamajiv1alpha1.Tena
{
SecretRef: &corev1.SecretEnvSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: tcp.Status.Storage.Kine.Config.SecretName,
Name: tcp.Status.Storage.Config.SecretName,
},
},
},

View File

@@ -0,0 +1,56 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package datastore
import (
"context"
"crypto/tls"
"fmt"
)
type ConnectionEndpoint struct {
Host string
Port int
}
func (r ConnectionEndpoint) String() string {
return fmt.Sprintf("%s:%d", r.Host, r.Port)
}
type ConnectionConfig struct {
User string
Password string
Endpoints []ConnectionEndpoint
DBName string
TLSConfig *tls.Config
Parameters map[string][]string
}
func (config ConnectionConfig) getDataSourceNameUserPassword() string {
if config.User == "" {
return ""
}
if config.Password == "" {
return fmt.Sprintf("%s@", config.User)
}
return fmt.Sprintf("%s:%s@", config.User, config.Password)
}
type Connection interface {
CreateUser(ctx context.Context, user, password string) error
CreateDB(ctx context.Context, dbName string) error
GrantPrivileges(ctx context.Context, user, dbName string) error
UserExists(ctx context.Context, user string) (bool, error)
DBExists(ctx context.Context, dbName string) (bool, error)
GrantPrivilegesExists(ctx context.Context, user, dbName string) (bool, error)
DeleteUser(ctx context.Context, user string) error
DeleteDB(ctx context.Context, dbName string) error
RevokePrivileges(ctx context.Context, user, dbName string) error
GetConnectionString() string
Close() error
Check(ctx context.Context) error
Driver() string
}

171
internal/datastore/etcd.go Normal file
View File

@@ -0,0 +1,171 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package datastore
import (
"context"
"fmt"
"github.com/pkg/errors"
"go.etcd.io/etcd/api/v3/authpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
etcdclient "go.etcd.io/etcd/client/v3"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
)
const (
// rangeEnd is the key following the last key of the range.
// If rangeEnd is \0, the range is all keys greater than or equal to the key argument
// source: https://etcd.io/docs/v3.5/learning/api/
rangeEnd = "\\0"
)
func NewETCDConnection(config ConnectionConfig) (Connection, error) {
endpoints := make([]string, 0, len(config.Endpoints))
for _, ep := range config.Endpoints {
endpoints = append(endpoints, ep.String())
}
cfg := etcdclient.Config{
Endpoints: endpoints,
TLS: config.TLSConfig,
}
client, err := etcdclient.New(cfg)
if err != nil {
return nil, err
}
return &EtcdClient{
Client: *client,
}, nil
}
type EtcdClient struct {
Client etcdclient.Client
}
func (e *EtcdClient) CreateUser(ctx context.Context, user, password string) error {
_, err := e.Client.Auth.UserAddWithOptions(ctx, user, password, &etcdclient.UserAddOptions{
NoPassword: true,
})
return err
}
func (e *EtcdClient) CreateDB(ctx context.Context, dbName string) error {
return nil
}
func (e *EtcdClient) GrantPrivileges(ctx context.Context, user, dbName string) error {
_, err := e.Client.Auth.RoleAdd(ctx, dbName)
if err != nil {
return err
}
permission := etcdclient.PermissionType(authpb.READWRITE)
key := e.buildKey(dbName)
if _, err = e.Client.RoleGrantPermission(ctx, user, key, rangeEnd, permission); err != nil {
return err
}
if _, err = e.Client.UserGrantRole(ctx, user, dbName); err != nil {
return err
}
return err
}
func (e *EtcdClient) UserExists(ctx context.Context, user string) (bool, error) {
_, err := e.Client.UserGet(ctx, user)
if err != nil {
if errors.As(err, &rpctypes.ErrGRPCUserNotFound) {
return false, nil
}
return false, err
}
return true, nil
}
func (e *EtcdClient) DBExists(_ context.Context, dbName string) (bool, error) {
return true, nil
}
func (e *EtcdClient) GrantPrivilegesExists(ctx context.Context, username, dbName string) (bool, error) {
_, err := e.Client.RoleGet(ctx, dbName)
if err != nil {
if errors.As(err, &rpctypes.ErrGRPCRoleNotFound) {
return false, nil
}
return false, err
}
user, err := e.Client.UserGet(ctx, username)
if err != nil {
return false, err
}
for _, i := range user.Roles {
if i == dbName {
return true, nil
}
}
return false, nil
}
func (e *EtcdClient) DeleteUser(ctx context.Context, user string) error {
_, err := e.Client.Auth.UserDelete(ctx, user)
return err
}
func (e *EtcdClient) DeleteDB(ctx context.Context, dbName string) error {
withRange := etcdclient.WithRange(rangeEnd)
prefix := e.buildKey(dbName)
_, err := e.Client.Delete(ctx, prefix, withRange)
return err
}
func (e *EtcdClient) RevokePrivileges(ctx context.Context, user, dbName string) error {
_, err := e.Client.Auth.RoleDelete(ctx, dbName)
return err
}
func (e *EtcdClient) GetConnectionString() string {
// There's no need for connection string in etcd client:
// it's not used by Kine
return ""
}
func (e *EtcdClient) Close() error {
return e.Client.Close()
}
func (e *EtcdClient) Check(ctx context.Context) error {
_, err := e.Client.AuthStatus(ctx)
return err
}
func (e *EtcdClient) Driver() string {
return string(kamajiv1alpha1.EtcdDriver)
}
func (e *EtcdClient) buildKey(roleName string) string {
return fmt.Sprintf("/%s/", roleName)
}
type Permission struct {
Type int `json:"type,omitempty"`
Key string `json:"key,omitempty"`
RangeEnd string `json:"rangeEnd,omitempty"`
}

View File

@@ -1,15 +1,23 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package sql
package datastore
import (
"context"
"database/sql"
"fmt"
"net/url"
"github.com/go-pg/pg/v10"
"github.com/go-sql-driver/mysql"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
)
const (
defaultProtocol = "tcp"
sqlErrorNoRows = "sql: no rows in result set"
)
const (
@@ -25,36 +33,44 @@ const (
)
type MySQLConnection struct {
db *sql.DB
host string
port int
db *sql.DB
connector ConnectionEndpoint
}
func (c *MySQLConnection) Driver() string {
return "MySQL"
return string(kamajiv1alpha1.KineMySQLDriver)
}
func getPostgreSQLDB(config ConnectionConfig) (DBConnection, error) {
func NewPostgreSQLConnection(config ConnectionConfig) (Connection, error) {
opt := &pg.Options{
Addr: fmt.Sprintf("%s:%d", config.Host, config.Port),
Addr: config.Endpoints[0].String(),
Database: config.DBName,
User: config.User,
Password: config.Password,
TLSConfig: config.TLSConfig,
}
return &PostgreSQLConnection{db: pg.Connect(opt), port: config.Port, host: config.Host}, nil
return &PostgreSQLConnection{db: pg.Connect(opt), connection: config.Endpoints[0]}, nil
}
func getMySQLDB(config ConnectionConfig) (DBConnection, error) {
tlsKey := "mysql"
dataSourceName := config.GetDataSourceName()
mysqlConfig, err := mysql.ParseDSN(dataSourceName)
func NewMySQLConnection(config ConnectionConfig) (Connection, error) {
nameDB := fmt.Sprintf("%s(%s)", defaultProtocol, config.Endpoints[0].String())
var parameters string
if len(config.Parameters) > 0 {
parameters = url.Values(config.Parameters).Encode()
}
dsn := fmt.Sprintf("%s%s/%s?%s", config.getDataSourceNameUserPassword(), nameDB, config.DBName, parameters)
mysqlConfig, err := mysql.ParseDSN(dsn)
if err != nil {
return nil, err
}
if err := mysql.RegisterTLSConfig(tlsKey, config.TLSConfig); err != nil {
tlsKey := "mysql"
if err = mysql.RegisterTLSConfig(tlsKey, config.TLSConfig); err != nil {
return nil, err
}
@@ -67,27 +83,19 @@ func getMySQLDB(config ConnectionConfig) (DBConnection, error) {
return nil, err
}
return &MySQLConnection{
db: db,
host: config.Host,
port: config.Port,
}, nil
return &MySQLConnection{db: db, connector: config.Endpoints[0]}, nil
}
func (c *MySQLConnection) GetHost() string {
return c.host
}
func (c *MySQLConnection) GetPort() int {
return c.port
func (c *MySQLConnection) GetConnectionString() string {
return c.connector.String()
}
func (c *MySQLConnection) Close() error {
return c.db.Close()
}
func (c *MySQLConnection) Check() error {
return c.db.Ping()
func (c *MySQLConnection) Check(ctx context.Context) error {
return c.db.PingContext(ctx)
}
func (c *MySQLConnection) CreateUser(ctx context.Context, user, password string) error {
@@ -106,7 +114,7 @@ func (c *MySQLConnection) UserExists(ctx context.Context, user string) (bool, er
checker := func(row *sql.Row) (bool, error) {
var name string
if err := row.Scan(&name); err != nil {
if checkEmptyQueryResult(err) {
if c.checkEmptyQueryResult(err) {
return false, nil
}
@@ -123,7 +131,7 @@ func (c *MySQLConnection) DBExists(ctx context.Context, dbName string) (bool, er
checker := func(row *sql.Row) (bool, error) {
var name string
if err := row.Scan(&name); err != nil {
if checkEmptyQueryResult(err) {
if c.checkEmptyQueryResult(err) {
return false, nil
}
@@ -191,3 +199,7 @@ func (c *MySQLConnection) mutate(ctx context.Context, nonFilledStatement string,
return nil
}
func (c *MySQLConnection) checkEmptyQueryResult(err error) bool {
return err.Error() == sqlErrorNoRows
}

View File

@@ -1,7 +1,7 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package sql
package datastore
import (
"context"
@@ -9,6 +9,8 @@ import (
"strings"
"github.com/go-pg/pg/v10"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
)
const (
@@ -24,13 +26,12 @@ const (
)
type PostgreSQLConnection struct {
db *pg.DB
host string
port int
db *pg.DB
connection ConnectionEndpoint
}
func (r *PostgreSQLConnection) Driver() string {
return "PostgreSQL"
return string(kamajiv1alpha1.KinePostgreSQLDriver)
}
func (r *PostgreSQLConnection) UserExists(ctx context.Context, user string) (bool, error) {
@@ -109,18 +110,14 @@ func (r *PostgreSQLConnection) RevokePrivileges(ctx context.Context, user, dbNam
return err
}
func (r *PostgreSQLConnection) GetHost() string {
return r.host
}
func (r *PostgreSQLConnection) GetPort() int {
return r.port
func (r *PostgreSQLConnection) GetConnectionString() string {
return r.connection.String()
}
func (r *PostgreSQLConnection) Close() error {
return r.db.Close()
}
func (r *PostgreSQLConnection) Check() error {
return r.db.Ping(context.Background())
func (r *PostgreSQLConnection) Check(ctx context.Context) error {
return r.db.Ping(ctx)
}

View File

@@ -1,177 +0,0 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package etcd
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"time"
"go.etcd.io/etcd/api/v3/authpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
etcdclient "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/codes"
)
const (
etcdTimeout = 10 // seconds
// rangeEnd is the key following the last key of the range.
// If rangeEnd is \0, the range is all keys greater than or equal to the key argument
// source: https://etcd.io/docs/v3.5/learning/api/
rangeEnd = "\\0"
)
func NewClient(config Config) (*etcdclient.Client, error) {
cert, err := tls.X509KeyPair(config.ETCDCertificate, config.ETCDPrivateKey)
if err != nil {
return nil, err
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(config.ETCDCA)
cfg := etcdclient.Config{
Endpoints: config.Endpoints,
TLS: &tls.Config{ // nolint:gosec
Certificates: []tls.Certificate{cert},
RootCAs: pool,
},
}
return etcdclient.New(cfg)
}
func GetUser(ctx context.Context, client *etcdclient.Client, user *User) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, etcdTimeout*time.Second)
defer cancel()
response, err := client.UserGet(ctxWithTimeout, user.Name)
if err != nil {
var etcdError rpctypes.EtcdError
if errors.As(err, &etcdError) && etcdError.Code() == codes.FailedPrecondition {
return nil
}
return err
}
user.Roles = response.Roles
user.Exists = true
return nil
}
func AddUser(ctx context.Context, client *etcdclient.Client, username string) error {
ctxWithTimeout, cancel := getContextWithTimeout(ctx)
defer cancel()
opts := etcdclient.UserAddOptions{
NoPassword: true,
}
_, err := client.Auth.UserAddWithOptions(ctxWithTimeout, username, "", &opts)
return err
}
func RemoveUser(ctx context.Context, client *etcdclient.Client, username string) error {
ctxWithTimeout, cancel := getContextWithTimeout(ctx)
defer cancel()
_, err := client.Auth.UserDelete(ctxWithTimeout, username)
return err
}
func GetRole(ctx context.Context, client *etcdclient.Client, role *Role) error {
ctxWithTimeout, cancel := getContextWithTimeout(ctx)
defer cancel()
response, err := client.RoleGet(ctxWithTimeout, role.Name)
if err != nil {
var etcdError rpctypes.EtcdError
if errors.As(err, &etcdError) && etcdError.Code() == codes.FailedPrecondition {
return nil
}
return err
}
role.Exists = true
for _, perm := range response.Perm {
permission := Permission{
Type: int(perm.PermType),
Key: string(perm.Key),
RangeEnd: string(perm.RangeEnd),
}
role.Permissions = append(role.Permissions, permission)
}
return nil
}
func AddRole(ctx context.Context, client *etcdclient.Client, roleName string) error {
ctxWithTimeout, cancel := getContextWithTimeout(ctx)
defer cancel()
_, err := client.Auth.RoleAdd(ctxWithTimeout, roleName)
return err
}
func RemoveRole(ctx context.Context, client *etcdclient.Client, roleName string) error {
ctxWithTimeout, cancel := getContextWithTimeout(ctx)
defer cancel()
_, err := client.Auth.RoleDelete(ctxWithTimeout, roleName)
return err
}
func GrantUserRole(ctx context.Context, client *etcdclient.Client, user User, role Role) error {
ctxWithTimeout, cancel := getContextWithTimeout(ctx)
defer cancel()
_, err := client.UserGrantRole(ctxWithTimeout, user.Name, role.Name)
if err != nil {
return err
}
return nil
}
func GrantRolePermission(ctx context.Context, client *etcdclient.Client, role Role) error {
ctxWithTimeout, cancel := getContextWithTimeout(ctx)
defer cancel()
permission := etcdclient.PermissionType(authpb.READWRITE)
key := BuildKey(role.Name)
_, err := client.RoleGrantPermission(ctxWithTimeout, role.Name, key, rangeEnd, permission)
if err != nil {
return err
}
return nil
}
func CleanUpPrefix(ctx context.Context, client *etcdclient.Client, name string) error {
ctxWithTimeout, cancel := getContextWithTimeout(ctx)
defer cancel()
withRange := etcdclient.WithRange(rangeEnd)
prefix := BuildKey(name)
_, err := client.Delete(ctxWithTimeout, prefix, withRange)
return err
}
func BuildKey(roleName string) string {
return fmt.Sprintf("/%s/", roleName)
}
func getContextWithTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, etcdTimeout*time.Second)
}

View File

@@ -1,71 +0,0 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package etcd
type Config struct {
ETCDCertificate []byte
ETCDPrivateKey []byte
ETCDCA []byte
Endpoints []string
}
type Permission struct {
Type int `json:"type,omitempty"`
Key string `json:"key,omitempty"`
RangeEnd string `json:"rangeEnd,omitempty"`
}
func (in *Permission) DeepCopyInto(out *Permission) {
*out = *in
}
func (in *Permission) DeepCopy() *Permission {
if in == nil {
return nil
}
out := new(Permission)
in.DeepCopyInto(out)
return out
}
type Role struct {
Name string `json:"name"`
Permissions []Permission `json:"permissions,omitempty"`
Exists bool `json:"exists"`
}
func (in *Role) DeepCopyInto(out *Role) {
*out = *in
}
func (in *Role) DeepCopy() *Role {
if in == nil {
return nil
}
out := new(Role)
in.DeepCopyInto(out)
return out
}
type User struct {
Name string `json:"name"`
Roles []string `json:"roles,omitempty"`
Exists bool `json:"exists"`
}
func (in *User) DeepCopyInto(out *User) {
*out = *in
}
func (in *User) DeepCopy() *User {
if in == nil {
return nil
}
out := new(User)
in.DeepCopyInto(out)
return out
}

View File

@@ -0,0 +1,148 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package datastore
import (
"bytes"
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/etcd"
"github.com/clastix/kamaji/internal/utilities"
)
type Certificate struct {
resource *corev1.Secret
Client client.Client
Name string
DataStore kamajiv1alpha1.DataStore
}
func (r *Certificate) ShouldStatusBeUpdated(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) bool {
return tenantControlPlane.Status.Storage.Certificate.Checksum != r.resource.GetAnnotations()["checksum"]
}
func (r *Certificate) ShouldCleanup(*kamajiv1alpha1.TenantControlPlane) bool {
return false
}
func (r *Certificate) CleanUp(context.Context, *kamajiv1alpha1.TenantControlPlane) (bool, error) {
return false, nil
}
func (r *Certificate) Define(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
r.resource = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: r.getPrefixedName(tenantControlPlane),
Namespace: tenantControlPlane.GetNamespace(),
},
Data: map[string][]byte{},
}
return nil
}
func (r *Certificate) getPrefixedName(tenantControlPlane *kamajiv1alpha1.TenantControlPlane) string {
return utilities.AddTenantPrefix(r.GetName(), tenantControlPlane)
}
func (r *Certificate) GetClient() client.Client {
return r.Client
}
func (r *Certificate) CreateOrUpdate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
return utilities.CreateOrUpdateWithConflict(ctx, r.Client, r.resource, r.mutate(ctx, tenantControlPlane))
}
func (r *Certificate) GetName() string {
return "datastore-certificate"
}
func (r *Certificate) UpdateTenantControlPlaneStatus(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
tenantControlPlane.Status.Storage.Certificate.SecretName = r.resource.GetName()
tenantControlPlane.Status.Storage.Certificate.Checksum = r.resource.GetAnnotations()["checksum"]
tenantControlPlane.Status.Storage.Certificate.LastUpdate = metav1.Now()
return nil
}
func (r *Certificate) mutate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) controllerutil.MutateFn {
return func() error {
ca, err := r.DataStore.Spec.TLSConfig.CertificateAuthority.Certificate.GetContent(ctx, r.Client)
if err != nil {
return err
}
r.resource.Data["ca.crt"] = ca
if r.resource.GetAnnotations()["checksum"] == utilities.CalculateConfigMapChecksum(r.resource.StringData) {
if r.DataStore.Spec.Driver == kamajiv1alpha1.EtcdDriver {
if isValid, _ := etcd.IsETCDCertificateAndKeyPairValid(r.resource.Data["server.crt"], r.resource.Data["server.key"]); isValid {
return nil
}
}
}
var crt, key *bytes.Buffer
switch r.DataStore.Spec.Driver {
case kamajiv1alpha1.EtcdDriver:
// When dealing with the etcd storage we cannot use the basic authentication, thus the generation of a
// certificate used for authentication is mandatory, along with the CA private key.
privateKey, err := r.DataStore.Spec.TLSConfig.CertificateAuthority.PrivateKey.GetContent(ctx, r.Client)
if err != nil {
return err
}
crt, key, err = etcd.GetETCDCACertificateAndKeyPair(tenantControlPlane.GetName(), ca, privateKey)
if err != nil {
return err
}
case kamajiv1alpha1.KineMySQLDriver, kamajiv1alpha1.KinePostgreSQLDriver:
// For the SQL drivers we just need to copy the certificate, since the basic authentication is used
// to connect to the desired schema and database.
crtBytes, err := r.DataStore.Spec.TLSConfig.ClientCertificate.Certificate.GetContent(ctx, r.Client)
if err != nil {
return err
}
crt = bytes.NewBuffer(crtBytes)
keyBytes, err := r.DataStore.Spec.TLSConfig.ClientCertificate.PrivateKey.GetContent(ctx, r.Client)
if err != nil {
return err
}
key = bytes.NewBuffer(keyBytes)
default:
return fmt.Errorf("unrecognized driver for Certificate generation")
}
r.resource.Data["server.crt"] = crt.Bytes()
r.resource.Data["server.key"] = key.Bytes()
annotations := r.resource.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations["checksum"] = utilities.CalculateConfigMapChecksum(r.resource.StringData)
r.resource.SetAnnotations(annotations)
r.resource.SetLabels(utilities.MergeMaps(
utilities.KamajiLabels(),
r.resource.GetLabels(),
map[string]string{
"kamaji.clastix.io/name": tenantControlPlane.GetName(),
"kamaji.clastix.io/component": r.GetName(),
},
))
return ctrl.SetControllerReference(tenantControlPlane, r.resource, r.Client.Scheme())
}
}

View File

@@ -0,0 +1,237 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package datastore
import (
"context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/datastore"
"github.com/clastix/kamaji/internal/resources/utils"
)
type SetupResource struct {
schema string
user string
password string
}
type Setup struct {
resource SetupResource
Client client.Client
Connection datastore.Connection
DataStore kamajiv1alpha1.DataStore
}
func (r *Setup) ShouldStatusBeUpdated(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) bool {
return tenantControlPlane.Status.Storage.Driver != string(r.DataStore.Spec.Driver) &&
tenantControlPlane.Status.Storage.Setup.Checksum != tenantControlPlane.Status.Storage.Config.Checksum
}
func (r *Setup) ShouldCleanup(_ *kamajiv1alpha1.TenantControlPlane) bool {
return false
}
func (r *Setup) CleanUp(context.Context, *kamajiv1alpha1.TenantControlPlane) (bool, error) {
return false, nil
}
func (r *Setup) Define(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
secret := &corev1.Secret{}
namespacedName := types.NamespacedName{
Namespace: tenantControlPlane.GetNamespace(),
Name: tenantControlPlane.Status.Storage.Config.SecretName,
}
if err := r.Client.Get(ctx, namespacedName, secret); err != nil {
return err
}
r.resource = SetupResource{
schema: string(secret.Data["DB_SCHEMA"]),
user: string(secret.Data["DB_USER"]),
password: string(secret.Data["DB_PASSWORD"]),
}
return nil
}
func (r *Setup) GetClient() client.Client {
return r.Client
}
func (r *Setup) CreateOrUpdate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
if tenantControlPlane.Status.Storage.Setup.Checksum != "" &&
tenantControlPlane.Status.Storage.Setup.Checksum != tenantControlPlane.Status.Storage.Config.Checksum {
if err := r.Delete(ctx, tenantControlPlane); err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.OperationResultUpdated, nil
}
reconcilationResult := controllerutil.OperationResultNone
var operationResult controllerutil.OperationResult
var err error
operationResult, err = r.createDB(ctx, tenantControlPlane)
if err != nil {
return reconcilationResult, err
}
reconcilationResult = utils.UpdateOperationResult(reconcilationResult, operationResult)
operationResult, err = r.createUser(ctx, tenantControlPlane)
if err != nil {
return reconcilationResult, err
}
reconcilationResult = utils.UpdateOperationResult(reconcilationResult, operationResult)
operationResult, err = r.createGrantPrivileges(ctx, tenantControlPlane)
if err != nil {
return reconcilationResult, err
}
reconcilationResult = utils.UpdateOperationResult(reconcilationResult, operationResult)
return reconcilationResult, nil
}
func (r *Setup) GetName() string {
return "datastore-setup"
}
func (r *Setup) Delete(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
if err := r.Define(ctx, tenantControlPlane); err != nil {
return err
}
if err := r.revokeGrantPrivileges(ctx, tenantControlPlane); err != nil {
return err
}
if err := r.deleteDB(ctx, tenantControlPlane); err != nil {
return err
}
if err := r.deleteUser(ctx, tenantControlPlane); err != nil {
return err
}
return nil
}
func (r *Setup) UpdateTenantControlPlaneStatus(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
tenantControlPlane.Status.Storage.Setup.Schema = r.resource.schema
tenantControlPlane.Status.Storage.Setup.User = r.resource.user
tenantControlPlane.Status.Storage.Setup.LastUpdate = metav1.Now()
tenantControlPlane.Status.Storage.Setup.Checksum = tenantControlPlane.Status.Storage.Config.Checksum
return nil
}
func (r *Setup) createDB(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
exists, err := r.Connection.DBExists(ctx, r.resource.schema)
if err != nil {
return controllerutil.OperationResultNone, err
}
if exists {
return controllerutil.OperationResultNone, nil
}
if err := r.Connection.CreateDB(ctx, r.resource.schema); err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.OperationResultCreated, nil
}
func (r *Setup) deleteDB(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) error {
exists, err := r.Connection.DBExists(ctx, r.resource.schema)
if err != nil {
return err
}
if !exists {
return nil
}
if err := r.Connection.DeleteDB(ctx, r.resource.schema); err != nil {
return err
}
return nil
}
func (r *Setup) createUser(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
exists, err := r.Connection.UserExists(ctx, r.resource.user)
if err != nil {
return controllerutil.OperationResultNone, err
}
if exists {
return controllerutil.OperationResultNone, nil
}
if err := r.Connection.CreateUser(ctx, r.resource.user, r.resource.password); err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.OperationResultCreated, nil
}
func (r *Setup) deleteUser(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) error {
exists, err := r.Connection.UserExists(ctx, r.resource.user)
if err != nil {
return err
}
if !exists {
return nil
}
if err := r.Connection.DeleteUser(ctx, r.resource.user); err != nil {
return err
}
return nil
}
func (r *Setup) createGrantPrivileges(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
exists, err := r.Connection.GrantPrivilegesExists(ctx, r.resource.user, r.resource.schema)
if err != nil {
return controllerutil.OperationResultNone, err
}
if exists {
return controllerutil.OperationResultNone, nil
}
if err := r.Connection.GrantPrivileges(ctx, r.resource.user, r.resource.schema); err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.OperationResultCreated, nil
}
func (r *Setup) revokeGrantPrivileges(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) error {
exists, err := r.Connection.GrantPrivilegesExists(ctx, r.resource.user, r.resource.schema)
if err != nil {
return err
}
if !exists {
return nil
}
if err := r.Connection.RevokePrivileges(ctx, r.resource.user, r.resource.schema); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,111 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package datastore
import (
"context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/utilities"
)
type Config struct {
resource *corev1.Secret
Client client.Client
ConnString string
Driver string
}
func (r *Config) ShouldStatusBeUpdated(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) bool {
return tenantControlPlane.Status.Storage.Config.Checksum != r.resource.GetAnnotations()["checksum"] ||
tenantControlPlane.Status.Storage.Driver != r.Driver
}
func (r *Config) ShouldCleanup(*kamajiv1alpha1.TenantControlPlane) bool {
return false
}
func (r *Config) CleanUp(context.Context, *kamajiv1alpha1.TenantControlPlane) (bool, error) {
return false, nil
}
func (r *Config) Define(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
r.resource = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: r.getPrefixedName(tenantControlPlane),
Namespace: tenantControlPlane.GetNamespace(),
},
}
return nil
}
func (r *Config) getPrefixedName(tenantControlPlane *kamajiv1alpha1.TenantControlPlane) string {
return utilities.AddTenantPrefix(r.GetName(), tenantControlPlane)
}
func (r *Config) GetClient() client.Client {
return r.Client
}
func (r *Config) CreateOrUpdate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
return utilities.CreateOrUpdateWithConflict(ctx, r.Client, r.resource, r.mutate(ctx, tenantControlPlane))
}
func (r *Config) GetName() string {
return "datastore-config"
}
func (r *Config) UpdateTenantControlPlaneStatus(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
tenantControlPlane.Status.Storage.Driver = r.Driver
tenantControlPlane.Status.Storage.Config.SecretName = r.resource.GetName()
tenantControlPlane.Status.Storage.Config.Checksum = r.resource.GetAnnotations()["checksum"]
return nil
}
func (r *Config) mutate(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) controllerutil.MutateFn {
return func() error {
var password []byte
savedHash, ok := r.resource.GetAnnotations()["checksum"]
switch {
case ok && savedHash == utilities.CalculateConfigMapChecksum(r.resource.StringData):
password = r.resource.Data["DB_PASSWORD"]
default:
password = []byte(utilities.GenerateUUIDString())
}
r.resource.Data = map[string][]byte{
"DB_CONNECTION_STRING": []byte(r.ConnString),
"DB_SCHEMA": []byte(tenantControlPlane.GetName()),
"DB_USER": []byte(tenantControlPlane.GetName()),
"DB_PASSWORD": password,
}
annotations := r.resource.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations["checksum"] = utilities.CalculateConfigMapChecksum(r.resource.StringData)
r.resource.SetAnnotations(annotations)
r.resource.SetLabels(utilities.MergeMaps(
utilities.KamajiLabels(),
map[string]string{
"kamaji.clastix.io/name": tenantControlPlane.GetName(),
"kamaji.clastix.io/component": r.GetName(),
},
))
return ctrl.SetControllerReference(tenantControlPlane, r.resource, r.Client.Scheme())
}
}

View File

@@ -1,125 +0,0 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package resources
import (
"context"
"fmt"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/etcd"
"github.com/clastix/kamaji/internal/utilities"
)
type ETCDCACertificatesResource struct {
resource *corev1.Secret
Client client.Client
Log logr.Logger
Name string
DataStore kamajiv1alpha1.DataStore
}
func (r *ETCDCACertificatesResource) ShouldStatusBeUpdated(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) bool {
if tenantControlPlane.Status.Certificates.ETCD == nil {
return true
}
return tenantControlPlane.Status.Certificates.ETCD.CA.Checksum != r.resource.GetAnnotations()["checksum"]
}
func (r *ETCDCACertificatesResource) ShouldCleanup(*kamajiv1alpha1.TenantControlPlane) bool {
return false
}
func (r *ETCDCACertificatesResource) CleanUp(context.Context, *kamajiv1alpha1.TenantControlPlane) (bool, error) {
return false, nil
}
func (r *ETCDCACertificatesResource) Define(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
r.resource = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: r.getPrefixedName(tenantControlPlane),
Namespace: tenantControlPlane.GetNamespace(),
},
}
return nil
}
func (r *ETCDCACertificatesResource) CreateOrUpdate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
return utilities.CreateOrUpdateWithConflict(ctx, r.Client, r.resource, r.mutate(ctx, tenantControlPlane))
}
func (r *ETCDCACertificatesResource) GetName() string {
return r.Name
}
func (r *ETCDCACertificatesResource) UpdateTenantControlPlaneStatus(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
if tenantControlPlane.Status.Certificates.ETCD == nil {
tenantControlPlane.Status.Certificates.ETCD = &kamajiv1alpha1.ETCDCertificatesStatus{}
}
tenantControlPlane.Status.Certificates.ETCD.CA.SecretName = r.resource.GetName()
tenantControlPlane.Status.Certificates.ETCD.CA.LastUpdate = metav1.Now()
tenantControlPlane.Status.Certificates.ETCD.CA.Checksum = r.resource.GetAnnotations()["checksum"]
return nil
}
func (r *ETCDCACertificatesResource) getPrefixedName(tenantControlPlane *kamajiv1alpha1.TenantControlPlane) string {
return utilities.AddTenantPrefix(r.Name, tenantControlPlane)
}
func (r *ETCDCACertificatesResource) mutate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) controllerutil.MutateFn {
return func() error {
if r.DataStore.Spec.TLSConfig.CertificateAuthority.PrivateKey == nil {
return fmt.Errorf("missing private key, cannot generate certificate for the given tenant control plane")
}
if etcdStatus := tenantControlPlane.Status.Certificates.ETCD; etcdStatus != nil && len(etcdStatus.CA.Checksum) > 0 && etcdStatus.CA.Checksum == r.resource.GetAnnotations()["checksum"] {
isValid, err := etcd.IsETCDCertificateAndKeyPairValid(r.resource.Data[kubeadmconstants.CACertName], r.resource.Data[kubeadmconstants.CAKeyName])
if err != nil {
r.Log.Info(fmt.Sprintf("etcd certificates are not valid: %s", err.Error()))
}
if isValid {
return nil
}
}
ca, err := r.DataStore.Spec.TLSConfig.CertificateAuthority.Certificate.GetContent(ctx, r.Client)
if err != nil {
return err
}
key, err := r.DataStore.Spec.TLSConfig.CertificateAuthority.PrivateKey.GetContent(ctx, r.Client)
if err != nil {
return err
}
r.resource.Data = map[string][]byte{
kubeadmconstants.CACertName: ca,
kubeadmconstants.CAKeyName: key,
}
r.resource.SetLabels(utilities.KamajiLabels())
annotations := r.resource.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations["checksum"] = utilities.CalculateConfigMapChecksum(r.resource.StringData)
r.resource.SetAnnotations(annotations)
return ctrl.SetControllerReference(tenantControlPlane, r.resource, r.Client.Scheme())
}
}

View File

@@ -1,130 +0,0 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package resources
import (
"context"
"fmt"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/etcd"
"github.com/clastix/kamaji/internal/utilities"
)
type ETCDCertificatesResource struct {
resource *corev1.Secret
Client client.Client
Log logr.Logger
Name string
}
func (r *ETCDCertificatesResource) ShouldStatusBeUpdated(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) bool {
if tenantControlPlane.Status.Certificates.ETCD == nil {
return true
}
return tenantControlPlane.Status.Certificates.ETCD.APIServer.Checksum != r.resource.GetAnnotations()["checksum"]
}
func (r *ETCDCertificatesResource) ShouldCleanup(*kamajiv1alpha1.TenantControlPlane) bool {
return false
}
func (r *ETCDCertificatesResource) CleanUp(context.Context, *kamajiv1alpha1.TenantControlPlane) (bool, error) {
return false, nil
}
func (r *ETCDCertificatesResource) Define(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
r.resource = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: r.getPrefixedName(tenantControlPlane),
Namespace: tenantControlPlane.GetNamespace(),
},
}
return nil
}
func (r *ETCDCertificatesResource) CreateOrUpdate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
return utilities.CreateOrUpdateWithConflict(ctx, r.Client, r.resource, r.mutate(ctx, tenantControlPlane))
}
func (r *ETCDCertificatesResource) GetName() string {
return r.Name
}
func (r *ETCDCertificatesResource) UpdateTenantControlPlaneStatus(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
if tenantControlPlane.Status.Certificates.ETCD == nil {
tenantControlPlane.Status.Certificates.ETCD = &kamajiv1alpha1.ETCDCertificatesStatus{}
}
tenantControlPlane.Status.Certificates.ETCD.APIServer.SecretName = r.resource.GetName()
tenantControlPlane.Status.Certificates.ETCD.APIServer.LastUpdate = metav1.Now()
tenantControlPlane.Status.Certificates.ETCD.APIServer.Checksum = r.resource.GetAnnotations()["checksum"]
return nil
}
func (r *ETCDCertificatesResource) getPrefixedName(tenantControlPlane *kamajiv1alpha1.TenantControlPlane) string {
return utilities.AddTenantPrefix(r.Name, tenantControlPlane)
}
func (r *ETCDCertificatesResource) mutate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) controllerutil.MutateFn {
return func() error {
if tenantControlPlane.Status.Certificates.ETCD == nil {
return fmt.Errorf("etcd is still synchronizing latest changes")
}
if checksum := tenantControlPlane.Status.Certificates.ETCD.APIServer.Checksum; len(checksum) > 0 && checksum == r.resource.GetAnnotations()["checksum"] {
isValid, err := etcd.IsETCDCertificateAndKeyPairValid(r.resource.Data[kubeadmconstants.APIServerEtcdClientCertName], r.resource.Data[kubeadmconstants.APIServerEtcdClientKeyName])
if err != nil {
r.Log.Info(fmt.Sprintf("etcd certificates are not valid: %s", err.Error()))
}
if isValid {
return nil
}
}
etcdCASecretNamespacedName := k8stypes.NamespacedName{Namespace: tenantControlPlane.GetNamespace(), Name: tenantControlPlane.Status.Certificates.ETCD.CA.SecretName}
etcdCASecret := &corev1.Secret{}
if err := r.Client.Get(ctx, etcdCASecretNamespacedName, etcdCASecret); err != nil {
return err
}
cert, privKey, err := etcd.GetETCDCACertificateAndKeyPair(
tenantControlPlane.GetName(),
etcdCASecret.Data[kubeadmconstants.CACertName],
etcdCASecret.Data[kubeadmconstants.CAKeyName],
)
if err != nil {
return err
}
r.resource.Data = map[string][]byte{
kubeadmconstants.APIServerEtcdClientCertName: cert.Bytes(),
kubeadmconstants.APIServerEtcdClientKeyName: privKey.Bytes(),
}
r.resource.SetLabels(utilities.KamajiLabels())
annotations := r.resource.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations["checksum"] = utilities.CalculateConfigMapChecksum(r.resource.StringData)
r.resource.SetAnnotations(annotations)
return ctrl.SetControllerReference(tenantControlPlane, r.resource, r.Client.Scheme())
}
}

View File

@@ -1,273 +0,0 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package resources
import (
"context"
"github.com/go-logr/logr"
etcdclient "go.etcd.io/etcd/client/v3"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/etcd"
)
type etcdSetupResource struct {
role etcd.Role
user etcd.User
}
type ETCDSetupResource struct {
resource *etcdSetupResource
Client client.Client
Log logr.Logger
DataStore kamajiv1alpha1.DataStore
}
func (r *ETCDSetupResource) ShouldStatusBeUpdated(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) bool {
if tenantControlPlane.Status.Storage.ETCD == nil {
return true
}
return tenantControlPlane.Status.Storage.ETCD.Role.Name != r.resource.role.Name ||
tenantControlPlane.Status.Storage.ETCD.User.Name != r.resource.user.Name
}
func (r *ETCDSetupResource) ShouldCleanup(*kamajiv1alpha1.TenantControlPlane) bool {
return false
}
func (r *ETCDSetupResource) CleanUp(context.Context, *kamajiv1alpha1.TenantControlPlane) (bool, error) {
return false, nil
}
func (r *ETCDSetupResource) Define(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
r.resource = &etcdSetupResource{
role: etcd.Role{Name: tenantControlPlane.Name, Exists: false},
user: etcd.User{Name: tenantControlPlane.Name, Exists: false},
}
return nil
}
func (r *ETCDSetupResource) CreateOrUpdate(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
return r.reconcile(ctx)
}
func (r *ETCDSetupResource) Delete(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
if err := r.Define(ctx, tenantControlPlane); err != nil {
return err
}
client, err := r.getETCDClient(ctx)
if err != nil {
return err
}
if err = r.deleteData(ctx, client, tenantControlPlane); err != nil {
return err
}
if err = r.deleteUser(ctx, client, tenantControlPlane); err != nil {
return err
}
if err = r.deleteRole(ctx, client, tenantControlPlane); err != nil {
return err
}
return nil
}
func (r *ETCDSetupResource) GetName() string {
return "etcd-setup"
}
func (r *ETCDSetupResource) UpdateTenantControlPlaneStatus(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
if tenantControlPlane.Status.Storage.ETCD == nil {
tenantControlPlane.Status.Storage.ETCD = &kamajiv1alpha1.ETCDStatus{}
}
tenantControlPlane.Status.Storage.ETCD.Role = r.resource.role
tenantControlPlane.Status.Storage.ETCD.User = r.resource.user
return nil
}
func (r *ETCDSetupResource) reconcile(ctx context.Context) (controllerutil.OperationResult, error) {
reconcilationResult := controllerutil.OperationResultNone
var operationResult controllerutil.OperationResult
client, err := r.getETCDClient(ctx)
if err != nil {
return reconcilationResult, err
}
defer client.Close()
operationResult, err = r.reconcileUser(ctx, client)
if err != nil {
return reconcilationResult, err
}
reconcilationResult = updateOperationResult(reconcilationResult, operationResult)
operationResult, err = r.reconcileRole(ctx, client)
if err != nil {
return controllerutil.OperationResultNone, err
}
reconcilationResult = updateOperationResult(reconcilationResult, operationResult)
operationResult, err = r.grantUserRole(ctx, client)
if err != nil {
return controllerutil.OperationResultNone, err
}
reconcilationResult = updateOperationResult(reconcilationResult, operationResult)
operationResult, err = r.grantRolePermissions(ctx, client)
if err != nil {
return controllerutil.OperationResultNone, err
}
reconcilationResult = updateOperationResult(reconcilationResult, operationResult)
return reconcilationResult, nil
}
func (r *ETCDSetupResource) getETCDClient(ctx context.Context) (*etcdclient.Client, error) {
ca, err := r.DataStore.Spec.TLSConfig.CertificateAuthority.Certificate.GetContent(ctx, r.Client)
if err != nil {
return nil, err
}
crt, err := r.DataStore.Spec.TLSConfig.ClientCertificate.Certificate.GetContent(ctx, r.Client)
if err != nil {
return nil, err
}
key, err := r.DataStore.Spec.TLSConfig.ClientCertificate.PrivateKey.GetContent(ctx, r.Client)
if err != nil {
return nil, err
}
config := etcd.Config{
ETCDCertificate: crt,
ETCDPrivateKey: key,
ETCDCA: ca,
Endpoints: r.DataStore.Spec.Endpoints,
}
return etcd.NewClient(config)
}
func (r *ETCDSetupResource) reconcileUser(ctx context.Context, client *etcdclient.Client) (controllerutil.OperationResult, error) {
if err := etcd.GetUser(ctx, client, &r.resource.user); err != nil {
return controllerutil.OperationResultNone, err
}
if !r.resource.user.Exists {
if err := etcd.AddUser(ctx, client, r.resource.user.Name); err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.OperationResultCreated, nil
}
return controllerutil.OperationResultNone, nil
}
func (r *ETCDSetupResource) reconcileRole(ctx context.Context, client *etcdclient.Client) (controllerutil.OperationResult, error) {
if err := etcd.GetRole(ctx, client, &r.resource.role); err != nil {
return controllerutil.OperationResultNone, err
}
if !r.resource.role.Exists {
if err := etcd.AddRole(ctx, client, r.resource.role.Name); err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.OperationResultCreated, nil
}
return controllerutil.OperationResultNone, nil
}
func (r *ETCDSetupResource) grantUserRole(ctx context.Context, client *etcdclient.Client) (controllerutil.OperationResult, error) {
if err := etcd.GetUser(ctx, client, &r.resource.user); err != nil {
return controllerutil.OperationResultNone, err
}
if len(r.resource.user.Roles) > 0 && isRole(r.resource.user.Roles, r.resource.role.Name) {
return controllerutil.OperationResultNone, nil
}
if err := etcd.GrantUserRole(ctx, client, r.resource.user, r.resource.role); err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.OperationResultUpdated, nil
}
func (r *ETCDSetupResource) grantRolePermissions(ctx context.Context, client *etcdclient.Client) (controllerutil.OperationResult, error) {
if err := etcd.GetRole(ctx, client, &r.resource.role); err != nil {
return controllerutil.OperationResultNone, err
}
if len(r.resource.role.Permissions) > 0 && isPermission(r.resource.role.Permissions, r.resource.role.Name) {
return controllerutil.OperationResultNone, nil
}
if err := etcd.GrantRolePermission(ctx, client, r.resource.role); err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.OperationResultUpdated, nil
}
func (r *ETCDSetupResource) deleteData(ctx context.Context, client *etcdclient.Client, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
return etcd.CleanUpPrefix(ctx, client, tenantControlPlane.GetName())
}
func (r *ETCDSetupResource) deleteUser(ctx context.Context, client *etcdclient.Client, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
if err := etcd.GetUser(ctx, client, &r.resource.user); err != nil {
return err
}
if !r.resource.user.Exists {
return nil
}
return etcd.RemoveUser(ctx, client, tenantControlPlane.GetName())
}
func (r *ETCDSetupResource) deleteRole(ctx context.Context, client *etcdclient.Client, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
if err := etcd.GetRole(ctx, client, &r.resource.role); err != nil {
return err
}
if !r.resource.role.Exists {
return nil
}
return etcd.RemoveRole(ctx, client, tenantControlPlane.GetName())
}
func isRole(s []string, x string) bool {
for _, o := range s {
if o == x {
return true
}
}
return false
}
func isPermission(s []etcd.Permission, role string) bool {
key := etcd.BuildKey(role)
for _, o := range s {
if o.Key == key {
return true
}
}
return false
}

View File

@@ -20,8 +20,7 @@ import (
type KubernetesDeploymentResource struct {
resource *appsv1.Deployment
Client client.Client
DataStoreDriver kamajiv1alpha1.Driver
ETCDEndpoints []string
DataStore kamajiv1alpha1.DataStore
Name string
KineContainerImage string
}
@@ -64,8 +63,7 @@ func (r *KubernetesDeploymentResource) mutate(ctx context.Context, tenantControl
d := builder.Deployment{
Address: address,
ETCDEndpoints: r.ETCDEndpoints,
ETCDStorageType: r.DataStoreDriver,
DataStore: r.DataStore,
KineContainerImage: r.KineContainerImage,
}
d.SetLabels(r.resource, utilities.MergeMaps(utilities.CommonLabels(tenantControlPlane.GetName()), tenantControlPlane.Spec.ControlPlane.Deployment.AdditionalMetadata.Labels))
@@ -132,11 +130,6 @@ func (r *KubernetesDeploymentResource) deploymentTemplateLabels(ctx context.Cont
"component.kamaji.clastix.io/scheduler-kubeconfig": hash(ctx, tenantControlPlane.GetNamespace(), tenantControlPlane.Status.KubeConfig.Scheduler.SecretName),
}
if r.DataStoreDriver == kamajiv1alpha1.EtcdDriver {
labels["component.kamaji.clastix.io/etcd-ca-certificates"] = hash(ctx, tenantControlPlane.GetNamespace(), tenantControlPlane.Status.Certificates.ETCD.CA.SecretName)
labels["component.kamaji.clastix.io/etcd-certificates"] = hash(ctx, tenantControlPlane.GetNamespace(), tenantControlPlane.Status.Certificates.ETCD.APIServer.SecretName)
}
return labels
}

View File

@@ -22,7 +22,6 @@ import (
type KubernetesServiceResource struct {
resource *corev1.Service
Client client.Client
Name string
}
func (r *KubernetesServiceResource) ShouldStatusBeUpdated(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) bool {
@@ -63,8 +62,6 @@ func (r *KubernetesServiceResource) Define(_ context.Context, tenantControlPlane
},
}
r.Name = "service"
return nil
}
@@ -124,5 +121,5 @@ func (r *KubernetesServiceResource) mutate(ctx context.Context, tenantControlPla
}
func (r *KubernetesServiceResource) GetName() string {
return r.Name
return "service"
}

View File

@@ -18,7 +18,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/types"
"github.com/clastix/kamaji/internal/utilities"
)
@@ -33,11 +32,10 @@ const (
)
type KubernetesDeploymentResource struct {
resource *appsv1.Deployment
Client client.Client
ETCDStorageType types.ETCDStorageType
ETCDEndpoints []string
Name string
resource *appsv1.Deployment
Client client.Client
ETCDEndpoints []string
Name string
}
func (r *KubernetesDeploymentResource) isStatusEqual(tenantControlPlane *kamajiv1alpha1.TenantControlPlane) bool {

View File

@@ -15,6 +15,7 @@ import (
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/kubeadm"
"github.com/clastix/kamaji/internal/resources/utils"
)
type kubeadmPhase int
@@ -104,7 +105,7 @@ func enrichBootstrapToken(bootstrapToken *bootstraptokenv1.BootstrapToken) {
}
if bootstrapToken.Token.ID == "" {
bootstrapToken.Token.ID = fmt.Sprintf("%s.%s", randomString(6), randomString(16))
bootstrapToken.Token.ID = fmt.Sprintf("%s.%s", utils.RandomString(6), utils.RandomString(16))
}
}

View File

@@ -1,123 +0,0 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package resources
import (
"context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/types"
"github.com/clastix/kamaji/internal/utilities"
)
type SQLCertificate struct {
resource *corev1.Secret
Client client.Client
Name string
StorageType types.ETCDStorageType
DataStore kamajiv1alpha1.DataStore
}
func (r *SQLCertificate) ShouldStatusBeUpdated(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) bool {
return tenantControlPlane.Status.Storage.Kine.Certificate.Checksum != r.resource.GetAnnotations()["checksum"]
}
func (r *SQLCertificate) ShouldCleanup(*kamajiv1alpha1.TenantControlPlane) bool {
return false
}
func (r *SQLCertificate) CleanUp(context.Context, *kamajiv1alpha1.TenantControlPlane) (bool, error) {
return false, nil
}
func (r *SQLCertificate) Define(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
r.resource = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: r.getPrefixedName(tenantControlPlane),
Namespace: tenantControlPlane.GetNamespace(),
},
Data: map[string][]byte{},
}
return nil
}
func (r *SQLCertificate) getPrefixedName(tenantControlPlane *kamajiv1alpha1.TenantControlPlane) string {
return utilities.AddTenantPrefix(r.GetName(), tenantControlPlane)
}
func (r *SQLCertificate) GetClient() client.Client {
return r.Client
}
func (r *SQLCertificate) CreateOrUpdate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
return utilities.CreateOrUpdateWithConflict(ctx, r.Client, r.resource, r.mutate(ctx, tenantControlPlane))
}
func (r *SQLCertificate) GetName() string {
return r.Name
}
func (r *SQLCertificate) UpdateTenantControlPlaneStatus(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
if tenantControlPlane.Status.Storage.Kine == nil {
tenantControlPlane.Status.Storage.Kine = &kamajiv1alpha1.KineStatus{}
}
tenantControlPlane.Status.Storage.Kine.Certificate.SecretName = r.resource.GetName()
tenantControlPlane.Status.Storage.Kine.Certificate.Checksum = r.resource.GetAnnotations()["checksum"]
tenantControlPlane.Status.Storage.Kine.Certificate.LastUpdate = metav1.Now()
return nil
}
func (r *SQLCertificate) mutate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) controllerutil.MutateFn {
return func() error {
ca, err := r.DataStore.Spec.TLSConfig.CertificateAuthority.Certificate.GetContent(ctx, r.Client)
if err != nil {
return nil
}
crt, err := r.DataStore.Spec.TLSConfig.ClientCertificate.Certificate.GetContent(ctx, r.Client)
if err != nil {
return nil
}
key, err := r.DataStore.Spec.TLSConfig.ClientCertificate.PrivateKey.GetContent(ctx, r.Client)
if err != nil {
return nil
}
r.resource.Data = map[string][]byte{
"ca.crt": ca,
"server.crt": crt,
"server.key": key,
}
annotations := r.resource.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations["checksum"] = utilities.CalculateConfigMapChecksum(r.resource.StringData)
r.resource.SetAnnotations(annotations)
r.resource.SetLabels(utilities.MergeMaps(
utilities.KamajiLabels(),
r.resource.GetLabels(),
map[string]string{
"kamaji.clastix.io/name": tenantControlPlane.GetName(),
"kamaji.clastix.io/component": r.GetName(),
},
))
return ctrl.SetControllerReference(tenantControlPlane, r.resource, r.Client.Scheme())
}
}

View File

@@ -1,245 +0,0 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package resources
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/sql"
)
type sqlSetupResource struct {
schema string
user string
password string
}
type SQLSetup struct {
resource sqlSetupResource
Client client.Client
DBConnection sql.DBConnection
Driver string
}
func (r *SQLSetup) ShouldStatusBeUpdated(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) bool {
if tenantControlPlane.Status.Storage.Kine == nil {
return true
}
return tenantControlPlane.Status.Storage.Kine.Driver != r.Driver &&
tenantControlPlane.Status.Storage.Kine.Setup.Checksum != tenantControlPlane.Status.Storage.Kine.Config.Checksum
}
func (r *SQLSetup) ShouldCleanup(_ *kamajiv1alpha1.TenantControlPlane) bool {
return false
}
func (r *SQLSetup) CleanUp(context.Context, *kamajiv1alpha1.TenantControlPlane) (bool, error) {
return false, nil
}
func (r *SQLSetup) Define(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
secret := &corev1.Secret{}
namespacedName := types.NamespacedName{
Namespace: tenantControlPlane.GetNamespace(),
Name: tenantControlPlane.Status.Storage.Kine.Config.SecretName,
}
if err := r.Client.Get(ctx, namespacedName, secret); err != nil {
return err
}
r.resource = sqlSetupResource{
schema: string(secret.Data["DB_SCHEMA"]),
user: string(secret.Data["DB_USER"]),
password: string(secret.Data["DB_PASSWORD"]),
}
return nil
}
func (r *SQLSetup) GetClient() client.Client {
return r.Client
}
func (r *SQLSetup) CreateOrUpdate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
if tenantControlPlane.Status.Storage.Kine.Setup.Checksum != "" &&
tenantControlPlane.Status.Storage.Kine.Setup.Checksum != tenantControlPlane.Status.Storage.Kine.Config.Checksum {
if err := r.Delete(ctx, tenantControlPlane); err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.OperationResultUpdated, nil
}
reconcilationResult := controllerutil.OperationResultNone
var operationResult controllerutil.OperationResult
var err error
operationResult, err = r.createDB(ctx, tenantControlPlane)
if err != nil {
return reconcilationResult, err
}
reconcilationResult = updateOperationResult(reconcilationResult, operationResult)
operationResult, err = r.createUser(ctx, tenantControlPlane)
if err != nil {
return reconcilationResult, err
}
reconcilationResult = updateOperationResult(reconcilationResult, operationResult)
operationResult, err = r.createGrantPrivileges(ctx, tenantControlPlane)
if err != nil {
return reconcilationResult, err
}
reconcilationResult = updateOperationResult(reconcilationResult, operationResult)
return reconcilationResult, nil
}
func (r *SQLSetup) GetName() string {
return "sql-setup"
}
func (r *SQLSetup) Delete(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
if err := r.Define(ctx, tenantControlPlane); err != nil {
return err
}
if err := r.revokeGrantPrivileges(ctx, tenantControlPlane); err != nil {
return err
}
if err := r.deleteDB(ctx, tenantControlPlane); err != nil {
return err
}
if err := r.deleteUser(ctx, tenantControlPlane); err != nil {
return err
}
return nil
}
func (r *SQLSetup) UpdateTenantControlPlaneStatus(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
if tenantControlPlane.Status.Storage.Kine == nil {
return fmt.Errorf("sql configuration is not ready")
}
tenantControlPlane.Status.Storage.Kine.Setup.Schema = r.resource.schema
tenantControlPlane.Status.Storage.Kine.Setup.User = r.resource.user
tenantControlPlane.Status.Storage.Kine.Setup.LastUpdate = metav1.Now()
tenantControlPlane.Status.Storage.Kine.Setup.Checksum = tenantControlPlane.Status.Storage.Kine.Config.Checksum
return nil
}
func (r *SQLSetup) createDB(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
exists, err := r.DBConnection.DBExists(ctx, r.resource.schema)
if err != nil {
return controllerutil.OperationResultNone, err
}
if exists {
return controllerutil.OperationResultNone, nil
}
if err := r.DBConnection.CreateDB(ctx, r.resource.schema); err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.OperationResultCreated, nil
}
func (r *SQLSetup) deleteDB(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) error {
exists, err := r.DBConnection.DBExists(ctx, r.resource.schema)
if err != nil {
return err
}
if !exists {
return nil
}
if err := r.DBConnection.DeleteDB(ctx, r.resource.schema); err != nil {
return err
}
return nil
}
func (r *SQLSetup) createUser(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
exists, err := r.DBConnection.UserExists(ctx, r.resource.user)
if err != nil {
return controllerutil.OperationResultNone, err
}
if exists {
return controllerutil.OperationResultNone, nil
}
if err := r.DBConnection.CreateUser(ctx, r.resource.user, r.resource.password); err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.OperationResultCreated, nil
}
func (r *SQLSetup) deleteUser(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) error {
exists, err := r.DBConnection.UserExists(ctx, r.resource.user)
if err != nil {
return err
}
if !exists {
return nil
}
if err := r.DBConnection.DeleteUser(ctx, r.resource.user); err != nil {
return err
}
return nil
}
func (r *SQLSetup) createGrantPrivileges(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
exists, err := r.DBConnection.GrantPrivilegesExists(ctx, r.resource.user, r.resource.schema)
if err != nil {
return controllerutil.OperationResultNone, err
}
if exists {
return controllerutil.OperationResultNone, nil
}
if err := r.DBConnection.GrantPrivileges(ctx, r.resource.user, r.resource.schema); err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.OperationResultCreated, nil
}
func (r *SQLSetup) revokeGrantPrivileges(ctx context.Context, _ *kamajiv1alpha1.TenantControlPlane) error {
exists, err := r.DBConnection.GrantPrivilegesExists(ctx, r.resource.user, r.resource.schema)
if err != nil {
return err
}
if !exists {
return nil
}
if err := r.DBConnection.RevokePrivileges(ctx, r.resource.user, r.resource.schema); err != nil {
return err
}
return nil
}

View File

@@ -1,123 +0,0 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package resources
import (
"context"
"strconv"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/utilities"
)
type SQLStorageConfig struct {
resource *corev1.Secret
Client client.Client
Name string
Host string
Port int
Driver string
}
func (r *SQLStorageConfig) ShouldStatusBeUpdated(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) bool {
if tenantControlPlane.Status.Storage.Kine == nil {
return true
}
return tenantControlPlane.Status.Storage.Kine.Config.Checksum != r.resource.GetAnnotations()["checksum"] ||
tenantControlPlane.Status.Storage.Kine.Driver != r.Driver
}
func (r *SQLStorageConfig) ShouldCleanup(*kamajiv1alpha1.TenantControlPlane) bool {
return false
}
func (r *SQLStorageConfig) CleanUp(context.Context, *kamajiv1alpha1.TenantControlPlane) (bool, error) {
return false, nil
}
func (r *SQLStorageConfig) Define(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
r.resource = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: r.getPrefixedName(tenantControlPlane),
Namespace: tenantControlPlane.GetNamespace(),
},
}
return nil
}
func (r *SQLStorageConfig) getPrefixedName(tenantControlPlane *kamajiv1alpha1.TenantControlPlane) string {
return utilities.AddTenantPrefix(r.Name, tenantControlPlane)
}
func (r *SQLStorageConfig) GetClient() client.Client {
return r.Client
}
func (r *SQLStorageConfig) CreateOrUpdate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
return utilities.CreateOrUpdateWithConflict(ctx, r.Client, r.resource, r.mutate(ctx, tenantControlPlane))
}
func (r *SQLStorageConfig) GetName() string {
return r.Name
}
func (r *SQLStorageConfig) UpdateTenantControlPlaneStatus(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) error {
if tenantControlPlane.Status.Storage.Kine == nil {
tenantControlPlane.Status.Storage.Kine = &kamajiv1alpha1.KineStatus{}
}
tenantControlPlane.Status.Storage.Kine.Driver = r.Driver
tenantControlPlane.Status.Storage.Kine.Config.SecretName = r.resource.GetName()
tenantControlPlane.Status.Storage.Kine.Config.Checksum = r.resource.GetAnnotations()["checksum"]
return nil
}
func (r *SQLStorageConfig) mutate(_ context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) controllerutil.MutateFn {
return func() error {
var password []byte
savedHash, ok := r.resource.GetAnnotations()["checksum"]
switch {
case ok && savedHash == utilities.CalculateConfigMapChecksum(r.resource.StringData):
password = r.resource.Data["DB_PASSWORD"]
default:
password = []byte(utilities.GenerateUUIDString())
}
r.resource.Data = map[string][]byte{
"DB_HOST": []byte(r.Host),
"DB_PORT": []byte(strconv.Itoa(r.Port)),
"DB_SCHEMA": []byte(tenantControlPlane.GetName()),
"DB_USER": []byte(tenantControlPlane.GetName()),
"DB_PASSWORD": password,
}
annotations := r.resource.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations["checksum"] = utilities.CalculateConfigMapChecksum(r.resource.StringData)
r.resource.SetAnnotations(annotations)
r.resource.SetLabels(utilities.MergeMaps(
utilities.KamajiLabels(),
map[string]string{
"kamaji.clastix.io/name": tenantControlPlane.GetName(),
"kamaji.clastix.io/component": r.GetName(),
},
))
return ctrl.SetControllerReference(tenantControlPlane, r.resource, r.Client.Scheme())
}
}

View File

@@ -1,7 +1,7 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package resources
package utils
import (
"math/rand"
@@ -12,7 +12,7 @@ import (
var letters = []byte("abcdefghijklmnopqrstuvwxyz")
func updateOperationResult(current controllerutil.OperationResult, op controllerutil.OperationResult) controllerutil.OperationResult {
func UpdateOperationResult(current controllerutil.OperationResult, op controllerutil.OperationResult) controllerutil.OperationResult {
if current == controllerutil.OperationResultCreated || op == controllerutil.OperationResultCreated {
return controllerutil.OperationResultCreated
}
@@ -32,7 +32,7 @@ func updateOperationResult(current controllerutil.OperationResult, op controller
return controllerutil.OperationResultNone
}
func randomString(n int) string {
func RandomString(n int) string {
rand.Seed(time.Now().UnixNano())
b := make([]byte, n)
for i := range b {

View File

@@ -1,10 +0,0 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package sql
const (
defaultProtocol = "tcp"
firstPort = 1024
sqlErrorNoRows = "sql: no rows in result set"
)

View File

@@ -1,110 +0,0 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package sql
import (
"context"
"crypto/tls"
"fmt"
"net/url"
)
type Driver int
const (
MySQL Driver = iota
PostgreSQL
)
func (d Driver) ToString() string {
switch d {
case MySQL:
return "mysql"
case PostgreSQL:
return "postgresql"
default:
return ""
}
}
type ConnectionConfig struct {
SQLDriver Driver
User string
Password string
Host string
Port int
DBName string
TLSConfig *tls.Config
Parameters map[string][]string
}
func (config ConnectionConfig) GetDataSourceName() string {
userPassword := config.getDataSourceNameUserPassword()
db := config.getDataSourceNameDB()
dataSourceName := fmt.Sprintf("%s%s/%s?%s", userPassword, db, config.DBName, config.formatParameters())
return dataSourceName
}
func (config ConnectionConfig) getDataSourceNameUserPassword() string {
if config.User == "" {
return ""
}
if config.Password == "" {
return fmt.Sprintf("%s@", config.User)
}
return fmt.Sprintf("%s:%s@", config.User, config.Password)
}
func (config ConnectionConfig) getDataSourceNameDB() string {
if config.Host == "" || config.Port < firstPort {
return ""
}
return fmt.Sprintf("%s(%s:%d)", defaultProtocol, config.Host, config.Port)
}
func (config ConnectionConfig) formatParameters() string {
if len(config.Parameters) == 0 {
return ""
}
values := url.Values(config.Parameters)
return values.Encode()
}
type DBConnection interface {
CreateUser(ctx context.Context, user, password string) error
CreateDB(ctx context.Context, dbName string) error
GrantPrivileges(ctx context.Context, user, dbName string) error
UserExists(ctx context.Context, user string) (bool, error)
DBExists(ctx context.Context, dbName string) (bool, error)
GrantPrivilegesExists(ctx context.Context, user, dbName string) (bool, error)
DeleteUser(ctx context.Context, user string) error
DeleteDB(ctx context.Context, dbName string) error
RevokePrivileges(ctx context.Context, user, dbName string) error
GetHost() string
GetPort() int
Close() error
Check() error
Driver() string
}
func GetDBConnection(config ConnectionConfig) (DBConnection, error) {
switch config.SQLDriver {
case MySQL:
return getMySQLDB(config)
case PostgreSQL:
return getPostgreSQLDB(config)
default:
return nil, fmt.Errorf("%s is not a valid driver", config.SQLDriver.ToString())
}
}
func checkEmptyQueryResult(err error) bool {
return err.Error() == sqlErrorNoRows
}

View File

@@ -1,45 +0,0 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package types
import (
"fmt"
"github.com/spf13/viper"
)
type ETCDStorageType int
const (
ETCD ETCDStorageType = iota
KineMySQL
KinePostgreSQL
)
var etcdStorageTypeString = map[string]ETCDStorageType{"etcd": ETCD, "kine-mysql": KineMySQL, "kine-postgresql": KinePostgreSQL}
func (s ETCDStorageType) String() string {
return [...]string{"etcd", "kine-mysql", "kine-postgresql"}[s]
}
// ParseETCDStorageType returns the ETCDStorageType given a string representation of the type.
func ParseETCDStorageType(s string) ETCDStorageType {
if storageType, ok := etcdStorageTypeString[s]; ok {
return storageType
}
panic(fmt.Errorf("unsupported storage type %s", s))
}
// ParseETCDEndpoint returns the default ETCD endpoints used to interact with the Tenant Control Plane backing storage.
func ParseETCDEndpoint(conf *viper.Viper) string {
switch ParseETCDStorageType(conf.GetString("etcd-storage-type")) {
case ETCD:
return conf.GetString("etcd-endpoints")
case KineMySQL, KinePostgreSQL:
return "127.0.0.1:2379"
default:
panic("unsupported storage type")
}
}