add taints controller (#188)

Signed-off-by: JiaHao Wei <1335649406@qq.com>
This commit is contained in:
JiaHao Wei
2022-01-26 13:38:40 +08:00
committed by GitHub
parent a9ee5e431f
commit acf2ee2cfc
9 changed files with 555 additions and 3 deletions

View File

@@ -246,3 +246,47 @@ func FindTaintByKey(managedCluster *clusterv1.ManagedCluster, key string) *clust
}
return nil
}
func IsTaintEqual(taint1, taint2 clusterv1.Taint) bool {
// Ignore the comparison of time
return taint1.Key == taint2.Key && taint1.Value == taint2.Value && taint1.Effect == taint2.Effect
}
// AddTaints add taints to the specified slice, if it did not already exist.
// Return a boolean indicating whether the slice has been updated.
func AddTaints(taints *[]clusterv1.Taint, taint clusterv1.Taint) bool {
if taints == nil || *taints == nil {
*taints = make([]clusterv1.Taint, 0)
}
if FindTaint(*taints, taint) != nil {
return false
}
*taints = append(*taints, taint)
return true
}
func RemoveTaints(taints *[]clusterv1.Taint, targets ...clusterv1.Taint) (updated bool) {
if taints == nil || len(*taints) == 0 || len(targets) == 0 {
return false
}
newTaints := make([]clusterv1.Taint, 0)
for _, v := range *taints {
if FindTaint(targets, v) == nil {
newTaints = append(newTaints, v)
}
}
updated = len(*taints) != len(newTaints)
*taints = newTaints
return updated
}
func FindTaint(taints []clusterv1.Taint, taint clusterv1.Taint) *clusterv1.Taint {
for i := range taints {
if IsTaintEqual(taints[i], taint) {
return &taints[i]
}
}
return nil
}

View File

@@ -383,3 +383,146 @@ func getApplyFileNames(applyFiles map[string]runtime.Object) []string {
}
return keys
}
var (
UnavailableTaint = clusterv1.Taint{
Key: clusterv1.ManagedClusterTaintUnavailable,
Effect: clusterv1.TaintEffectNoSelect,
}
UnreachableTaint = clusterv1.Taint{
Key: clusterv1.ManagedClusterTaintUnreachable,
Effect: clusterv1.TaintEffectNoSelect,
}
)
func TestIsTaintsEqual(t *testing.T) {
cases := []struct {
name string
taints1 []clusterv1.Taint
taints2 []clusterv1.Taint
expect bool
}{
{
name: "two empty taints",
taints1: []clusterv1.Taint{},
taints2: []clusterv1.Taint{},
expect: true,
},
{
name: "two nil taints",
taints1: nil,
taints2: nil,
expect: true,
},
{
name: "len(taints1) = 1, len(taints2) = 0",
taints1: []clusterv1.Taint{UnavailableTaint},
taints2: []clusterv1.Taint{},
expect: false,
},
{
name: "taints1 is the same as taints",
taints1: []clusterv1.Taint{UnreachableTaint},
taints2: []clusterv1.Taint{UnreachableTaint},
expect: true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
actual := reflect.DeepEqual(c.taints1, c.taints2)
if actual != c.expect {
t.Errorf("expected %t, but %t", c.expect, actual)
}
})
}
}
func TestAddTaints(t *testing.T) {
cases := []struct {
name string
taints []clusterv1.Taint
addTaint clusterv1.Taint
resTaints []clusterv1.Taint
expectUpdated bool
}{
{
name: "add taint success",
taints: []clusterv1.Taint{},
addTaint: UnreachableTaint,
expectUpdated: true,
resTaints: []clusterv1.Taint{UnreachableTaint},
},
{
name: "add taint fail, taint already exists",
taints: []clusterv1.Taint{UnreachableTaint},
addTaint: UnreachableTaint,
expectUpdated: false,
resTaints: []clusterv1.Taint{UnreachableTaint},
},
{
name: "nil pointer judgment",
taints: nil,
addTaint: UnreachableTaint,
expectUpdated: true,
resTaints: []clusterv1.Taint{UnreachableTaint},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
updated := AddTaints(&c.taints, c.addTaint)
if updated != c.expectUpdated {
t.Errorf("updated expected %t, but %t", c.expectUpdated, updated)
}
if !reflect.DeepEqual(c.taints, c.resTaints) {
t.Errorf("taints expected %+v, but %+v", c.taints, c.resTaints)
}
})
}
}
func TestRemoveTaints(t *testing.T) {
cases := []struct {
name string
taints []clusterv1.Taint
removeTaint clusterv1.Taint
resTaints []clusterv1.Taint
expectUpdated bool
}{
{
name: "nil pointer judgment",
taints: nil,
removeTaint: UnreachableTaint,
expectUpdated: false,
resTaints: nil,
},
{
name: "remove success",
taints: []clusterv1.Taint{UnreachableTaint},
removeTaint: UnreachableTaint,
expectUpdated: true,
resTaints: []clusterv1.Taint{},
},
{
name: "remove taint failed, taint not exists",
taints: []clusterv1.Taint{UnreachableTaint},
removeTaint: UnavailableTaint,
expectUpdated: false,
resTaints: []clusterv1.Taint{UnreachableTaint},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
updated := RemoveTaints(&c.taints, c.removeTaint)
if updated != c.expectUpdated {
t.Errorf("updated expected %t, but %t", c.expectUpdated, updated)
}
if !reflect.DeepEqual(c.taints, c.resTaints) {
t.Errorf("taints expected %+v, but %+v", c.taints, c.resTaints)
}
})
}
}

View File

@@ -90,14 +90,27 @@ func NewAcceptedManagedCluster() *clusterv1.ManagedCluster {
func NewAvailableManagedCluster() *clusterv1.ManagedCluster {
managedCluster := NewAcceptedManagedCluster()
availableCondtion := NewManagedClusterCondition(
availableCondition := NewManagedClusterCondition(
clusterv1.ManagedClusterConditionAvailable,
"True",
"ManagedClusterAvailable",
"Managed cluster is available",
nil,
)
managedCluster.Status.Conditions = append(managedCluster.Status.Conditions, availableCondtion)
managedCluster.Status.Conditions = append(managedCluster.Status.Conditions, availableCondition)
return managedCluster
}
func NewUnAvailableManagedCluster() *clusterv1.ManagedCluster {
managedCluster := NewAcceptedManagedCluster()
condition := NewManagedClusterCondition(
clusterv1.ManagedClusterConditionAvailable,
"False",
"ManagedClusterUnAvailable",
"Managed cluster is Unavailable",
nil,
)
managedCluster.Status.Conditions = append(managedCluster.Status.Conditions, condition)
return managedCluster
}

View File

@@ -2,6 +2,7 @@ package hub
import (
"context"
"open-cluster-management.io/registration/pkg/hub/taint"
"time"
addonclient "open-cluster-management.io/api/client/addon/clientset/versioned"
@@ -25,6 +26,8 @@ import (
"k8s.io/client-go/rest"
)
var ResyncInterval = 5 * time.Minute
// RunControllerManager starts the controllers on hub to manage spoke cluster registration.
func RunControllerManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
// If qps in kubconfig is not set, increase the qps and burst to enhance the ability of kube client to handle
@@ -68,6 +71,12 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
controllerContext.EventRecorder,
)
taintController := taint.NewTaintController(
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters(),
controllerContext.EventRecorder,
)
csrController := csr.NewCSRApprovingController(
kubeClient,
kubeInfomers.Certificates().V1().CertificateSigningRequests(),
@@ -79,7 +88,7 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters(),
kubeInfomers.Coordination().V1().Leases(),
5*time.Minute, //TODO: this interval time should be allowed to change from outside
ResyncInterval, //TODO: this interval time should be allowed to change from outside
controllerContext.EventRecorder,
)
@@ -127,6 +136,7 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
go addOnInformers.Start(ctx.Done())
go managedClusterController.Run(ctx, 1)
go taintController.Run(ctx, 1)
go csrController.Run(ctx, 1)
go leaseController.Run(ctx, 1)
go rbacFinalizerController.Run(ctx, 1)

View File

@@ -0,0 +1,96 @@
package taint
import (
"context"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
informerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
listerv1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
v1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/registration/pkg/helpers"
)
var (
UnavailableTaint = v1.Taint{
Key: v1.ManagedClusterTaintUnavailable,
Effect: v1.TaintEffectNoSelect,
}
UnreachableTaint = v1.Taint{
Key: v1.ManagedClusterTaintUnreachable,
Effect: v1.TaintEffectNoSelect,
}
)
// taintController
type taintController struct {
clusterClient clientset.Interface
clusterLister listerv1.ManagedClusterLister
eventRecorder events.Recorder
}
// NewTaintController creates a new taint controller
func NewTaintController(
clusterClient clientset.Interface,
clusterInformer informerv1.ManagedClusterInformer,
recorder events.Recorder) factory.Controller {
c := &taintController{
clusterClient: clusterClient,
clusterLister: clusterInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("taint-controller"),
}
return factory.New().
WithInformersQueueKeyFunc(func(obj runtime.Object) string {
accessor, _ := meta.Accessor(obj)
return accessor.GetName()
}, clusterInformer.Informer()).
WithSync(c.sync).
ToController("taintController", recorder)
}
func (c *taintController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
managedClusterName := syncCtx.QueueKey()
klog.V(4).Infof("Reconciling ManagedCluster %s", managedClusterName)
managedCluster, err := c.clusterLister.Get(managedClusterName)
if errors.IsNotFound(err) {
// Spoke cluster not found, could have been deleted, do nothing.
return nil
}
if err != nil {
return err
}
if !managedCluster.DeletionTimestamp.IsZero() {
return nil
}
managedCluster = managedCluster.DeepCopy()
newTaints := managedCluster.Spec.Taints
cond := meta.FindStatusCondition(managedCluster.Status.Conditions, v1.ManagedClusterConditionAvailable)
var updated bool
switch {
case cond == nil || cond.Status == metav1.ConditionUnknown:
updated = helpers.RemoveTaints(&newTaints, UnavailableTaint)
updated = helpers.AddTaints(&newTaints, UnreachableTaint) || updated
case cond.Status == metav1.ConditionFalse:
updated = helpers.RemoveTaints(&newTaints, UnreachableTaint)
updated = helpers.AddTaints(&newTaints, UnavailableTaint) || updated
case cond.Status == metav1.ConditionTrue:
updated = helpers.RemoveTaints(&newTaints, UnavailableTaint, UnreachableTaint)
}
if updated {
managedCluster.Spec.Taints = newTaints
if _, err = c.clusterClient.ClusterV1().ManagedClusters().Update(ctx, managedCluster, metav1.UpdateOptions{}); err != nil {
return err
}
c.eventRecorder.Eventf("ManagedClusterConditionAvailableUpdated", "Update the original taints to the %+v", newTaints)
}
return nil
}

View File

@@ -0,0 +1,96 @@
package taint
import (
"context"
v1 "open-cluster-management.io/api/cluster/v1"
"reflect"
"testing"
"time"
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
testinghelpers "open-cluster-management.io/registration/pkg/helpers/testing"
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
"k8s.io/apimachinery/pkg/runtime"
clienttesting "k8s.io/client-go/testing"
)
func TestSyncTaintCluster(t *testing.T) {
cases := []struct {
name string
startingObjects []runtime.Object
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "ManagedClusterConditionAvailable conditionStatus is True",
startingObjects: []runtime.Object{testinghelpers.NewAvailableManagedCluster()},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertNoActions(t, actions)
},
},
{
name: "ManagedClusterConditionAvailable conditionStatus is False",
startingObjects: []runtime.Object{testinghelpers.NewUnAvailableManagedCluster()},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertActions(t, actions, "update")
managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster)
taints := []v1.Taint{UnavailableTaint}
if !reflect.DeepEqual(managedCluster.Spec.Taints, taints) {
t.Errorf("expected taint %#v, but actualTaints: %#v", taints, managedCluster.Spec.Taints)
}
},
},
{
name: "There is no ManagedClusterConditionAvailable",
startingObjects: []runtime.Object{testinghelpers.NewManagedCluster()},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertActions(t, actions, "update")
managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster)
taints := []v1.Taint{UnreachableTaint}
if !reflect.DeepEqual(managedCluster.Spec.Taints, taints) {
t.Errorf("expected taint %#v, but actualTaints: %#v", taints, managedCluster.Spec.Taints)
}
},
},
{
name: "ManagedClusterConditionAvailable conditionStatus is Unknown",
startingObjects: []runtime.Object{testinghelpers.NewUnknownManagedCluster()},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertActions(t, actions, "update")
managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster)
taints := []v1.Taint{UnreachableTaint}
if !reflect.DeepEqual(managedCluster.Spec.Taints, taints) {
t.Errorf("expected taint %#v, but actualTaints: %#v", taints, managedCluster.Spec.Taints)
}
},
},
{
name: "sync a deleted spoke cluster",
startingObjects: []runtime.Object{},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertNoActions(t, actions)
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...)
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore()
for _, cluster := range c.startingObjects {
clusterStore.Add(cluster)
}
ctrl := taintController{clusterClient, clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), eventstesting.NewTestingEventRecorder(t)}
syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName))
if syncErr != nil {
t.Errorf("unexpected err: %v", syncErr)
}
c.validateActions(t, clusterClient.Actions())
})
}
}

1
pkg/hub/taint/doc.go Normal file
View File

@@ -0,0 +1 @@
package taint