Files
kamaji/controllers/datastore_controller.go
Patryk Rostkowski 489c191c0b feat: add custom Kamaji metrics and dashboard (#1108)
Signed-off-by: Patryk Rostkowski <patrostkowski@gmail.com>
2026-04-07 18:57:50 +02:00

424 lines
14 KiB
Go

// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package controllers
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/controllers/utils"
"github.com/clastix/kamaji/internal/metrics"
)
type DataStore struct {
Client client.Client
Metrics *metrics.Recorder
// TenantControlPlaneTrigger is the channel used to communicate across the controllers:
// if a Data Source is updated, we have to be sure that the reconciliation of the certificates content
// for each Tenant Control Plane is put in place properly.
TenantControlPlaneTrigger chan event.GenericEvent
}
//+kubebuilder:rbac:groups=kamaji.clastix.io,resources=datastores,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=kamaji.clastix.io,resources=datastores/status,verbs=get;update;patch
func (r *DataStore) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
var err error
logger := log.FromContext(ctx)
defer func(c context.Context) {
metricCtx, cancelMetricCtx := metrics.NewRefreshContextFrom(c)
defer cancelMetricCtx()
if gaugeErr := r.refreshDatastoreMetrics(metricCtx); gaugeErr != nil {
logger.WithName("metrics").Error(gaugeErr, "cannot refresh DataStore metrics")
}
}(ctx)
var ds kamajiv1alpha1.DataStore
if dsErr := r.Client.Get(ctx, request.NamespacedName, &ds); dsErr != nil {
if k8serrors.IsNotFound(dsErr) {
logger.Info("resource may have been deleted, skipping")
return reconcile.Result{}, nil
}
logger.Error(dsErr, "cannot retrieve the required resource")
return reconcile.Result{}, dsErr
}
if utils.IsPaused(&ds) {
logger.Info("paused reconciliation, no further actions")
return reconcile.Result{}, nil
}
if ds.GetDeletionTimestamp() == nil && !controllerutil.ContainsFinalizer(&ds, kamajiv1alpha1.DataStoreTCPFinalizer) {
logger.Info("missing finalizer, adding it")
ds.SetFinalizers(append(ds.GetFinalizers(), kamajiv1alpha1.DataStoreTCPFinalizer))
if uErr := r.Client.Update(ctx, &ds); uErr != nil {
return reconcile.Result{}, uErr
}
return reconcile.Result{}, nil
}
// Extracting the list of TenantControlPlane objects referenced by the given DataStore:
// this data is used to reference these in the Status, as well as propagating changes
// that would be required, such as changing TLS Configuration, or Basic Auth.
var tcpList kamajiv1alpha1.TenantControlPlaneList
if lErr := r.Client.List(ctx, &tcpList, client.MatchingFieldsSelector{
Selector: fields.OneTermEqualSelector(kamajiv1alpha1.TenantControlPlaneUsedDataStoreKey, ds.GetName()),
}); lErr != nil {
return reconcile.Result{}, fmt.Errorf("cannot retrieve list of the Tenant Control Plane using the following instance: %w", lErr)
}
tcpSets := sets.NewString()
for _, tcp := range tcpList.Items {
tcpSets.Insert(getNamespacedName(tcp.GetNamespace(), tcp.GetName()).String())
}
ds.Status.UsedBy = tcpSets.List()
// Performing the status update only at the end of the reconciliation loop:
// this is performed in defer to avoid duplication of code,
// and triggering a reconciliation of depending on TenantControlPlanes only if the update was successful.
defer func() {
if meta.IsStatusConditionTrue(ds.Status.Conditions, kamajiv1alpha1.DataStoreConditionAllowedDeletionType) {
logger.Info("removing finalizer upon true condition")
controllerutil.RemoveFinalizer(&ds, kamajiv1alpha1.DataStoreTCPFinalizer)
if uErr := r.Client.Update(ctx, &ds); uErr != nil {
logger.Error(uErr, "cannot update object")
return
}
logger.Info("finalizer removed successfully")
return
}
ds.Status.ObservedGeneration = ds.Generation
ds.Status.Ready = meta.IsStatusConditionTrue(ds.Status.Conditions, kamajiv1alpha1.DataStoreConditionValidType)
if err = r.Client.Status().Update(ctx, &ds); err != nil {
logger.Error(err, "cannot update the status for the given instance")
return
}
if !ds.Status.Ready {
logger.Info("skipping triggering, DataStore is not ready")
return
}
logger.Info("triggering cascading reconciliation for TenantControlPlanes")
for _, tcp := range tcpList.Items {
var shrunkTCP kamajiv1alpha1.TenantControlPlane
shrunkTCP.Name = tcp.Name
shrunkTCP.Namespace = tcp.Namespace
go utils.TriggerChannel(ctx, r.TenantControlPlaneTrigger, shrunkTCP)
}
}()
if ds.GetDeletionTimestamp() != nil {
if len(tcpList.Items) > 0 {
logger.Info("deletion is blocked due to DataStore still being referenced")
meta.SetStatusCondition(&ds.Status.Conditions, metav1.Condition{
Type: kamajiv1alpha1.DataStoreConditionAllowedDeletionType,
Status: metav1.ConditionFalse,
ObservedGeneration: ds.Generation,
Reason: "DataStoreStillUsed",
Message: "The DataStore is still used and referenced by one (or more) TenantControlPlane objects.",
})
return reconcile.Result{}, nil
}
if meta.IsStatusConditionFalse(ds.Status.Conditions, kamajiv1alpha1.DataStoreConditionAllowedDeletionType) {
logger.Info("DataStore is not used by any TenantControlPlane object")
meta.SetStatusCondition(&ds.Status.Conditions, metav1.Condition{
Type: kamajiv1alpha1.DataStoreConditionAllowedDeletionType,
Status: metav1.ConditionTrue,
ObservedGeneration: ds.Generation,
Reason: "DataStoreUnused",
Message: "",
})
return reconcile.Result{}, nil
}
logger.Info("DataStore can be safely deleted")
return reconcile.Result{}, nil
}
if exists := meta.FindStatusCondition(ds.Status.Conditions, kamajiv1alpha1.DataStoreConditionValidType); exists == nil {
logger.Info("missing starting condition")
meta.SetStatusCondition(&ds.Status.Conditions, metav1.Condition{
Type: kamajiv1alpha1.DataStoreConditionValidType,
Status: metav1.ConditionUnknown,
ObservedGeneration: ds.Generation,
Reason: "MissingCondition",
Message: "Controller will process the validation.",
})
if sErr := r.Client.Status().Update(ctx, &ds); sErr != nil {
return reconcile.Result{}, fmt.Errorf("cannot update the status for the given instance: %w", sErr)
}
return reconcile.Result{}, nil
}
if ds.Spec.BasicAuth != nil {
logger.Info("validating basic authentication")
if vErr := r.validateBasicAuth(ctx, ds); vErr != nil {
meta.SetStatusCondition(&ds.Status.Conditions, metav1.Condition{
Type: kamajiv1alpha1.DataStoreConditionValidType,
Status: metav1.ConditionFalse,
ObservedGeneration: ds.Generation,
Reason: "BasicAuthValidationFailed",
Message: vErr.Error(),
})
logger.Info("invalid basic authentication")
return reconcile.Result{}, nil
}
logger.Info("basic authentication is valid")
}
logger.Info("validating TLS configuration")
if vErr := r.validateTLSConfig(ctx, ds); vErr != nil {
meta.SetStatusCondition(&ds.Status.Conditions, metav1.Condition{
Type: kamajiv1alpha1.DataStoreConditionValidType,
Status: metav1.ConditionFalse,
ObservedGeneration: ds.Generation,
Reason: "TLSConfigurationValidationFailed",
Message: vErr.Error(),
})
logger.Info("invalid TLS configuration")
return reconcile.Result{}, nil
}
logger.Info("TLS configuration is valid")
meta.SetStatusCondition(&ds.Status.Conditions, metav1.Condition{
Type: kamajiv1alpha1.DataStoreConditionValidType,
Status: metav1.ConditionTrue,
ObservedGeneration: ds.Status.ObservedGeneration,
Reason: "DataStoreIsValid",
Message: "",
})
return reconcile.Result{}, err
}
func (r *DataStore) SetupWithManager(mgr controllerruntime.Manager) error {
if err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
metricCtx, cancelMetricCtx := metrics.NewRefreshContextFrom(ctx)
defer cancelMetricCtx()
if err := r.refreshDatastoreMetrics(metricCtx); err != nil {
controllerruntime.Log.WithName("metrics").Error(err, "cannot initialize DataStore metrics")
}
return nil
})); err != nil {
return err
}
enqueueFn := func(tcp *kamajiv1alpha1.TenantControlPlane, limitingInterface workqueue.TypedRateLimitingInterface[reconcile.Request]) {
if dataStoreName := tcp.Status.Storage.DataStoreName; len(dataStoreName) > 0 {
limitingInterface.AddRateLimited(reconcile.Request{
NamespacedName: k8stypes.NamespacedName{
Name: dataStoreName,
},
})
}
}
//nolint:forcetypeassert
return controllerruntime.NewControllerManagedBy(mgr).
For(&kamajiv1alpha1.DataStore{}).
Watches(&kamajiv1alpha1.TenantControlPlane{}, handler.Funcs{
CreateFunc: func(_ context.Context, createEvent event.TypedCreateEvent[client.Object], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
enqueueFn(createEvent.Object.(*kamajiv1alpha1.TenantControlPlane), w)
},
UpdateFunc: func(_ context.Context, updateEvent event.TypedUpdateEvent[client.Object], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
enqueueFn(updateEvent.ObjectOld.(*kamajiv1alpha1.TenantControlPlane), w)
enqueueFn(updateEvent.ObjectNew.(*kamajiv1alpha1.TenantControlPlane), w)
},
DeleteFunc: func(_ context.Context, deleteEvent event.TypedDeleteEvent[client.Object], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
enqueueFn(deleteEvent.Object.(*kamajiv1alpha1.TenantControlPlane), w)
},
}).
Complete(r)
}
func (r *DataStore) refreshDatastoreMetrics(ctx context.Context) error {
var dataStoreList kamajiv1alpha1.DataStoreList
if err := r.Client.List(ctx, &dataStoreList); err != nil {
return err
}
metricsRecorder := r.metricsRecorder()
metricsRecorder.ResetDataStoreInfo()
metricsRecorder.ResetDataStoreStatus()
metricsRecorder.ResetDatastoresReadyAndDriverCounts()
for i := range dataStoreList.Items {
ds := &dataStoreList.Items[i]
driver := metrics.NormalizeDataStoreDriverLabel(ds.Spec.Driver)
ready := metrics.ReadyLabelFalse
if ds.Status.Ready {
ready = metrics.ReadyLabelTrue
}
status := classifyDataStoreStatusLabel(ds)
metricsRecorder.SetDataStoreStatus(ds.GetName(), status, ready)
metricsRecorder.SetDataStoreInfo(
ds.GetName(),
driver,
)
counts := metrics.NewSingleDriverReadyCounts(driver)
if ds.Status.Ready {
counts[metrics.ReadyLabelTrue][driver] = 1
} else {
counts[metrics.ReadyLabelFalse][driver] = 1
}
metricsRecorder.SetDatastoresReadyAndDriverCounts(ds.GetName(), counts)
}
return nil
}
func (r *DataStore) metricsRecorder() *metrics.Recorder {
if r.Metrics == nil {
r.Metrics = metrics.DefaultRecorder()
}
return r.Metrics
}
func classifyDataStoreStatusLabel(ds *kamajiv1alpha1.DataStore) string {
condition := meta.FindStatusCondition(ds.Status.Conditions, kamajiv1alpha1.DataStoreConditionValidType)
if condition == nil {
return metrics.DataStoreStatusUnknown
}
return metrics.NormalizeDataStoreConditionStatusLabel(condition.Status)
}
func (r *DataStore) validateBasicAuth(ctx context.Context, ds kamajiv1alpha1.DataStore) error {
if err := r.validateContentReference(ctx, ds.Spec.BasicAuth.Password); err != nil {
return fmt.Errorf("basic-auth password is not valid, %w", err)
}
if err := r.validateContentReference(ctx, ds.Spec.BasicAuth.Username); err != nil {
return fmt.Errorf("basic-auth username is not valid, %w", err)
}
return nil
}
func (r *DataStore) validateTLSConfig(ctx context.Context, ds kamajiv1alpha1.DataStore) error {
if ds.Spec.TLSConfig == nil && ds.Spec.Driver != kamajiv1alpha1.EtcdDriver {
return nil
}
if err := r.validateContentReference(ctx, ds.Spec.TLSConfig.CertificateAuthority.Certificate); err != nil {
return fmt.Errorf("CA certificate is not valid, %w", err)
}
if ds.Spec.Driver == kamajiv1alpha1.EtcdDriver {
if ds.Spec.TLSConfig.CertificateAuthority.PrivateKey == nil {
return fmt.Errorf("CA private key is required when using the etcd driver")
}
if ds.Spec.TLSConfig.ClientCertificate == nil {
return fmt.Errorf("client certificate is required when using the etcd driver")
}
}
if ds.Spec.TLSConfig.CertificateAuthority.PrivateKey != nil {
if err := r.validateContentReference(ctx, *ds.Spec.TLSConfig.CertificateAuthority.PrivateKey); err != nil {
return fmt.Errorf("CA private key is not valid, %w", err)
}
}
if ds.Spec.TLSConfig.ClientCertificate != nil {
if err := r.validateContentReference(ctx, ds.Spec.TLSConfig.ClientCertificate.Certificate); err != nil {
return fmt.Errorf("client certificate is not valid, %w", err)
}
if err := r.validateContentReference(ctx, ds.Spec.TLSConfig.ClientCertificate.PrivateKey); err != nil {
return fmt.Errorf("client private key is not valid, %w", err)
}
}
return nil
}
func (r *DataStore) validateContentReference(ctx context.Context, ref kamajiv1alpha1.ContentRef) error {
switch {
case len(ref.Content) > 0:
return nil
case ref.SecretRef == nil:
return fmt.Errorf("the Secret reference is mandatory when bare content is not specified")
case len(ref.SecretRef.SecretReference.Name) == 0:
return fmt.Errorf("the Secret reference name is mandatory")
case len(ref.SecretRef.SecretReference.Namespace) == 0:
return fmt.Errorf("the Secret reference namespace is mandatory")
}
if err := r.Client.Get(ctx, k8stypes.NamespacedName{Name: ref.SecretRef.SecretReference.Name, Namespace: ref.SecretRef.SecretReference.Namespace}, &corev1.Secret{}); err != nil {
if k8serrors.IsNotFound(err) {
return fmt.Errorf("secret %s/%s is not found", ref.SecretRef.SecretReference.Namespace, ref.SecretRef.SecretReference.Name)
}
return err
}
return nil
}