Files
kamaji/controllers/certificate_lifecycle_controller.go
Dario Tranchitella b027e23b99 feat: enhancing concurrent reconciliations (#790)
* feat: buffered channels for generic events

Channels used for GenericEvent feeding for cross controllers triggers
are now buffered according to the --max-concurrent-tcp-reconciles: this
is required to avoid channel full errors when dealing with large
management clusters serving a sizeable amount of Tenant Control Planes.

Increasing this value will put more pressure on memory (mostly for GC)
and CPU (provisioning multiple certificates at the same time).

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* refactor: retrying datastore status update

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* feat(performance): reducing memory consumption for channel triggers

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* feat(datastore): reconcile events only for root object changes

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* feat: waiting soot manager exit before termination

This change introduces a grace period of 10 seconds before abruptly
terminating the Tenant Control Plane deployment, allowing the soot
manager to complete its exit procedure and avoid false positive errors
due to API Server being unresponsive due to user deletion.

Aim of this change is reducing the amount of false positive errors upon
mass deletion of Tenant COntrol Plane objects.

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

* refactor: unbuffered channel with timeout

WatchesRawSource is non blocking, no need to check if channel is full.
To prevent deadlocks a WithTimeout check has been introduced.

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>

---------

Signed-off-by: Dario Tranchitella <dario@tranchitella.eu>
2025-04-23 21:00:29 +02:00

166 lines
4.2 KiB
Go

// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package controllers
import (
"context"
"crypto/x509"
"fmt"
"time"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientcmdapiv1 "k8s.io/client-go/tools/clientcmd/api/v1"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/constants"
"github.com/clastix/kamaji/internal/crypto"
"github.com/clastix/kamaji/internal/utilities"
)
type CertificateLifecycle struct {
Channel chan event.GenericEvent
Deadline time.Duration
client client.Client
}
func (s *CertificateLifecycle) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
logger := log.FromContext(ctx)
logger.Info("starting CertificateLifecycle handling")
secret := corev1.Secret{}
err := s.client.Get(ctx, request.NamespacedName, &secret)
if k8serrors.IsNotFound(err) {
logger.Info("resource have been deleted, skipping")
return reconcile.Result{}, nil
}
if err != nil {
logger.Error(err, "cannot retrieve the required resource")
return reconcile.Result{}, err
}
checkType, ok := secret.GetLabels()[constants.ControllerLabelResource]
if !ok {
logger.Info("missing controller label, shouldn't happen")
return reconcile.Result{}, nil
}
var crt *x509.Certificate
switch checkType {
case "x509":
crt, err = s.extractCertificateFromBareSecret(secret)
case "kubeconfig":
crt, err = s.extractCertificateFromKubeconfig(secret)
default:
err = fmt.Errorf("unsupported strategy, %s", checkType)
}
if err != nil {
logger.Error(err, "skipping reconciliation")
return reconcile.Result{}, nil
}
deadline := time.Now().Add(s.Deadline)
if deadline.After(crt.NotAfter) {
logger.Info("certificate near expiration, must be rotated")
s.Channel <- event.GenericEvent{Object: &kamajiv1alpha1.TenantControlPlane{
ObjectMeta: metav1.ObjectMeta{
Name: secret.GetOwnerReferences()[0].Name,
Namespace: secret.Namespace,
},
}}
logger.Info("certificate rotation triggered")
return reconcile.Result{}, nil
}
after := crt.NotAfter.Sub(deadline)
logger.Info("certificate is still valid, enqueuing back", "after", after.String())
return reconcile.Result{Requeue: true, RequeueAfter: after}, nil
}
func (s *CertificateLifecycle) extractCertificateFromBareSecret(secret corev1.Secret) (*x509.Certificate, error) {
var crt *x509.Certificate
var err error
for _, v := range secret.Data {
if crt, err = crypto.ParseCertificateBytes(v); err == nil {
break
}
}
if crt == nil {
return nil, fmt.Errorf("none of the provided keys is containing a valid x509 certificate")
}
return crt, nil
}
func (s *CertificateLifecycle) extractCertificateFromKubeconfig(secret corev1.Secret) (*x509.Certificate, error) {
var kc *clientcmdapiv1.Config
var err error
for k := range secret.Data {
if kc, err = utilities.DecodeKubeconfig(secret, k); err == nil {
break
}
}
if kc == nil {
return nil, fmt.Errorf("none of the provided keys is containing a valid kubeconfig")
}
crt, err := crypto.ParseCertificateBytes(kc.AuthInfos[0].AuthInfo.ClientCertificateData)
if err != nil {
return nil, errors.Wrap(err, "cannot parse kubeconfig certificate bytes")
}
return crt, nil
}
func (s *CertificateLifecycle) SetupWithManager(mgr controllerruntime.Manager) error {
s.client = mgr.GetClient()
supportedStrategies := sets.New[string]("x509", "kubeconfig")
return controllerruntime.NewControllerManagedBy(mgr).
For(&corev1.Secret{}, builder.WithPredicates(predicate.NewPredicateFuncs(func(object client.Object) bool {
labels := object.GetLabels()
if labels == nil {
return false
}
value, ok := labels[constants.ControllerLabelResource]
if !ok {
return false
}
return supportedStrategies.Has(value)
}))).
Complete(s)
}