Split logic of controller to multiple reconcilers (#176)

This could make code clearer
also update the crd fix

Signed-off-by: Jian Qiu <jqiu@redhat.com>

Signed-off-by: Jian Qiu <jqiu@redhat.com>
This commit is contained in:
Jian Qiu
2023-01-17 05:44:26 +08:00
committed by GitHub
parent 17031158f6
commit e7fd0453dd
18 changed files with 265 additions and 34 deletions

View File

@@ -1,13 +1,13 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: placementmanifestworks.work.open-cluster-management.io
name: placemanifestworks.work.open-cluster-management.io
spec:
group: work.open-cluster-management.io
names:
kind: PlaceManifestWork
listKind: PlaceManifestWorkList
plural: placementmanifestworks
plural: placemanifestworks
shortNames:
- pmw
- pmws

4
go.mod
View File

@@ -22,7 +22,7 @@ require (
k8s.io/component-base v0.24.3
k8s.io/klog/v2 v2.70.1
k8s.io/utils v0.0.0-20220725171434-9bab9ef40391
open-cluster-management.io/api v0.9.1-0.20221116150238-55691c2c5f04
open-cluster-management.io/api v0.9.1-0.20230113024003-6529d52caeeb
sigs.k8s.io/controller-runtime v0.12.3
)
@@ -57,7 +57,7 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.1.2 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect

7
go.sum
View File

@@ -350,8 +350,9 @@ github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLe
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
@@ -1240,8 +1241,8 @@ modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k=
modernc.org/strutil v1.0.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/xc v1.0.0/go.mod h1:mRNCo0bvLjGhHO9WsyuKVU4q0ceiDDDoEeWDJHrNx8I=
open-cluster-management.io/api v0.9.1-0.20221116150238-55691c2c5f04 h1:lXewKgs1MeWN1wPIlMGQxfQj83CgjkNXQKVJVYQEB5I=
open-cluster-management.io/api v0.9.1-0.20221116150238-55691c2c5f04/go.mod h1:9KkJPh/zpDevXj2P+zkvSVjC2pq2PQ1JDNLLEes8TEc=
open-cluster-management.io/api v0.9.1-0.20230113024003-6529d52caeeb h1:I4YWWnSpMirqj/heYbu44WeKBK3MJGboLgr4MEJfygs=
open-cluster-management.io/api v0.9.1-0.20230113024003-6529d52caeeb/go.mod h1:6BB/Y6r3hXlPjpJgDwIs6Ubxyx/kXXOg6D9Cntg1I9E=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=

View File

@@ -0,0 +1,31 @@
package placemanifestworkcontroller
import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
)
// addFinalizerReconciler is to add finalizer to the placeManifestWork.
type addFinalizerReconciler struct {
workClient workclientset.Interface
}
func (a *addFinalizerReconciler) reconcile(ctx context.Context, pw *workapiv1alpha1.PlaceManifestWork) (*workapiv1alpha1.PlaceManifestWork, reconcileState, error) {
// Do not need to add finalizer if it is in delete state already.
if !pw.DeletionTimestamp.IsZero() {
return pw, reconcileStop, nil
}
// don't add finalizer to instances that already have it
for i := range pw.Finalizers {
if pw.Finalizers[i] == PlaceManifestWorkFinalizer {
return pw, reconcileContinue, nil
}
}
// if this conflicts, we'll simply try again later
pw.Finalizers = append(pw.Finalizers, PlaceManifestWorkFinalizer)
_, err := a.workClient.WorkV1alpha1().PlaceManifestWorks(pw.Namespace).Update(ctx, pw, metav1.UpdateOptions{})
return pw, reconcileStop, err
}

View File

@@ -2,19 +2,25 @@ package placemanifestworkcontroller
import (
"context"
"encoding/json"
"fmt"
jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
clusterinformerv1beta1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta1"
clusterlister "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workinformerv1 "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
workinformerv1alpha1 "open-cluster-management.io/api/client/work/informers/externalversions/work/v1alpha1"
worklisterv1 "open-cluster-management.io/api/client/work/listers/work/v1"
worklisterv1alpha1 "open-cluster-management.io/api/client/work/listers/work/v1alpha1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
)
@@ -24,17 +30,33 @@ const (
// that owns this manifestwork
// TODO move this to the api repo
PlaceManifestWorkControllerNameLabelKey = "work.open-cluster-management.io/placemanifestwork"
// PlaceManifestWorkFinalizer is the name of the finalizer added to placeManifestWork. It is used to ensure
// related manifestworks is deleted
PlaceManifestWorkFinalizer = "work.open-cluster-management.io/manifest-work-cleanup"
)
type PlaceManifestWorkController struct {
workClient workclientset.Interface
placeManifestWorkLister worklisterv1alpha1.PlaceManifestWorkLister
placeManifestWorkIndexer cache.Indexer
placementLister clusterlister.PlacementLister
placeDecisionLister clusterlister.PlacementDecisionLister
manifestWorkLister worklisterv1.ManifestWorkLister
reconcilers []placeManifestWorkReconcile
}
// placeManifestWorkReconcile is a interface for reconcile logic. It returns an updated placeManifestWork and whether further
// reconcile needs to proceed.
type placeManifestWorkReconcile interface {
reconcile(ctx context.Context, pw *workapiv1alpha1.PlaceManifestWork) (*workapiv1alpha1.PlaceManifestWork, reconcileState, error)
}
type reconcileState int64
const (
reconcileStop reconcileState = iota
reconcileContinue
)
func NewPlaceManifestWorkController(
recorder events.Recorder,
workClient workclientset.Interface,
@@ -47,9 +69,13 @@ func NewPlaceManifestWorkController(
workClient: workClient,
placeManifestWorkLister: placeManifestWorkInformer.Lister(),
placeManifestWorkIndexer: placeManifestWorkInformer.Informer().GetIndexer(),
placementLister: placementInformer.Lister(),
placeDecisionLister: placeDecisionInformer.Lister(),
manifestWorkLister: manifestWorkInformer.Lister(),
reconcilers: []placeManifestWorkReconcile{
&finalizeReconciler{workClient: workClient, manifestWorkLister: manifestWorkInformer.Lister()},
&addFinalizerReconciler{workClient: workClient},
&deployReconciler{workClient: workClient, manifestWorkLister: manifestWorkInformer.Lister(), placementLister: placementInformer.Lister(), placeDecisionLister: placeDecisionInformer.Lister()},
&statusReconciler{workClient: workClient, manifestWorkLister: manifestWorkInformer.Lister(), placeDecisionLister: placeDecisionInformer.Lister()},
},
}
err := placeManifestWorkInformer.Informer().AddIndexers(
@@ -93,15 +119,74 @@ func NewPlaceManifestWorkController(
// sync is the main reconcile loop for placeManifest work. It is triggered every 15sec
func (m *PlaceManifestWorkController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
placeManifestWorkName := controllerContext.QueueKey()
klog.V(4).Infof("Reconciling placeManifestWork %q", placeManifestWorkName)
// TODO: add watcher for manifest and placement
// TODO: add annotation to determine the placeManifestwork owner
// TODO: add the finalizer for PlaceManifestWork
return nil
key := controllerContext.QueueKey()
klog.V(4).Infof("Reconciling placeManifestWork %q", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
// ignore placement whose key is not in format: namespace/name
utilruntime.HandleError(err)
return nil
}
placementManifestWork, err := m.placeManifestWorkLister.PlaceManifestWorks(namespace).Get(name)
switch {
case errors.IsNotFound(err):
return nil
case err != nil:
return err
}
oldPlacementManifestWork := placementManifestWork
placementManifestWork = placementManifestWork.DeepCopy()
var state reconcileState
var errs []error
for _, reconciler := range m.reconcilers {
placementManifestWork, state, err = reconciler.reconcile(ctx, placementManifestWork)
if err != nil {
errs = append(errs, err)
}
if state == reconcileStop {
break
}
}
// Patch status
if err := m.patchPlaceManifestStatus(ctx, oldPlacementManifestWork, placementManifestWork); err != nil {
errs = append(errs, err)
}
return utilerrors.NewAggregate(errs)
}
func (m *PlaceManifestWorkController) finalizePlaceManifestWork(placeManifestWork workapiv1alpha1.PlaceManifestWork) error {
// TODO: Delete all ManifestWork owned by the given placeManifestWork
return nil
func (m *PlaceManifestWorkController) patchPlaceManifestStatus(ctx context.Context, old, new *workapiv1alpha1.PlaceManifestWork) error {
if apiequality.Semantic.DeepEqual(old.Status, new.Status) {
return nil
}
oldData, err := json.Marshal(workapiv1alpha1.PlaceManifestWork{
Status: old.Status,
})
if err != nil {
return fmt.Errorf("failed to Marshal old data for placeManifestWork status %s: %w", old.Name, err)
}
newData, err := json.Marshal(workapiv1alpha1.PlaceManifestWork{
ObjectMeta: metav1.ObjectMeta{
UID: old.UID,
ResourceVersion: old.ResourceVersion,
}, // to ensure they appear in the patch as preconditions
Status: new.Status,
})
if err != nil {
return fmt.Errorf("failed to Marshal new data for work status %s: %w", old.Name, err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for work %s: %w", old.Name, err)
}
_, err = m.workClient.WorkV1alpha1().PlaceManifestWorks(old.Namespace).Patch(ctx, old.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}

View File

@@ -0,0 +1,22 @@
package placemanifestworkcontroller
import (
"context"
clusterlister "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
worklisterv1 "open-cluster-management.io/api/client/work/listers/work/v1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
)
// deployReconciler is to manage ManifestWork based on the placement.
type deployReconciler struct {
workClient workclientset.Interface
manifestWorkLister worklisterv1.ManifestWorkLister
placeDecisionLister clusterlister.PlacementDecisionLister
placementLister clusterlister.PlacementLister
}
func (d *deployReconciler) reconcile(ctx context.Context, pw *workapiv1alpha1.PlaceManifestWork) (*workapiv1alpha1.PlaceManifestWork, reconcileState, error) {
// TODO the main manifestwork create/update/delete logic, we may also need to manage the status condition for whether manifestwork is created here
return pw, reconcileContinue, nil
}

View File

@@ -0,0 +1,44 @@
package placemanifestworkcontroller
import (
"context"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
worklisterv1 "open-cluster-management.io/api/client/work/listers/work/v1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
)
// finalizeReconciler is to finalize the placeManifestWork by deleteing all related manifestWorks.
type finalizeReconciler struct {
workClient workclientset.Interface
manifestWorkLister worklisterv1.ManifestWorkLister
}
func (f *finalizeReconciler) reconcile(ctx context.Context, pw *workapiv1alpha1.PlaceManifestWork) (*workapiv1alpha1.PlaceManifestWork, reconcileState, error) {
if pw.DeletionTimestamp.IsZero() {
return pw, reconcileContinue, nil
}
var found bool
for i := range pw.Finalizers {
if pw.Finalizers[i] == PlaceManifestWorkFinalizer {
found = true
break
}
}
// if there is no finalizer, we do not need to reconcile anymore and we do not need to
if !found {
return pw, reconcileStop, nil
}
if err := f.finalizePlaceManifestWork(ctx, pw); err != nil {
return pw, reconcileStop, err
}
// TODO remove finalizer finally
return pw, reconcileStop, nil
}
func (m *finalizeReconciler) finalizePlaceManifestWork(ctx context.Context, placeManifestWork *workapiv1alpha1.PlaceManifestWork) error {
// TODO: Delete all ManifestWork owned by the given placeManifestWork
return nil
}

View File

@@ -0,0 +1,21 @@
package placemanifestworkcontroller
import (
"context"
clusterlister "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
worklisterv1 "open-cluster-management.io/api/client/work/listers/work/v1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
)
// statusReconciler is to update placeManifestwork status.
type statusReconciler struct {
workClient workclientset.Interface
manifestWorkLister worklisterv1.ManifestWorkLister
placeDecisionLister clusterlister.PlacementDecisionLister
}
func (d *statusReconciler) reconcile(ctx context.Context, pw *workapiv1alpha1.PlaceManifestWork) (*workapiv1alpha1.PlaceManifestWork, reconcileState, error) {
// TODO the logic for update placeManifestwork status
return pw, reconcileContinue, nil
}

View File

@@ -26,8 +26,8 @@ var (
// NewMD5 and NewSHA1.
func NewHash(h hash.Hash, space UUID, data []byte, version int) UUID {
h.Reset()
h.Write(space[:])
h.Write(data)
h.Write(space[:]) //nolint:errcheck
h.Write(data) //nolint:errcheck
s := h.Sum(nil)
var uuid UUID
copy(uuid[:], s)

View File

@@ -9,7 +9,7 @@ import (
"fmt"
)
// Scan implements sql.Scanner so UUIDs can be read from databases transparently
// Scan implements sql.Scanner so UUIDs can be read from databases transparently.
// Currently, database types that map to string and []byte are supported. Please
// consult database-specific driver documentation for matching types.
func (uuid *UUID) Scan(src interface{}) error {

View File

@@ -35,6 +35,12 @@ const (
var rander = rand.Reader // random function
type invalidLengthError struct{ len int }
func (err invalidLengthError) Error() string {
return fmt.Sprintf("invalid UUID length: %d", err.len)
}
// Parse decodes s into a UUID or returns an error. Both the standard UUID
// forms of xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx and
// urn:uuid:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx are decoded as well as the
@@ -68,7 +74,7 @@ func Parse(s string) (UUID, error) {
}
return uuid, nil
default:
return uuid, fmt.Errorf("invalid UUID length: %d", len(s))
return uuid, invalidLengthError{len(s)}
}
// s is now at least 36 bytes long
// it must be of the form xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
@@ -112,7 +118,7 @@ func ParseBytes(b []byte) (UUID, error) {
}
return uuid, nil
default:
return uuid, fmt.Errorf("invalid UUID length: %d", len(b))
return uuid, invalidLengthError{len(b)}
}
// s is now at least 36 bytes long
// it must be of the form xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx

View File

@@ -14,6 +14,14 @@ func New() UUID {
return Must(NewRandom())
}
// NewString creates a new random UUID and returns it as a string or panics.
// NewString is equivalent to the expression
//
// uuid.New().String()
func NewString() string {
return Must(NewRandom()).String()
}
// NewRandom returns a Random (Version 4) UUID.
//
// The strength of the UUIDs is based on the strength of the crypto/rand

4
vendor/modules.txt vendored
View File

@@ -120,7 +120,7 @@ github.com/google/go-cmp/cmp/internal/value
# github.com/google/gofuzz v1.1.0
## explicit; go 1.12
github.com/google/gofuzz
# github.com/google/uuid v1.1.2
# github.com/google/uuid v1.2.0
## explicit
github.com/google/uuid
# github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
@@ -1218,7 +1218,7 @@ k8s.io/utils/path
k8s.io/utils/pointer
k8s.io/utils/strings/slices
k8s.io/utils/trace
# open-cluster-management.io/api v0.9.1-0.20221116150238-55691c2c5f04
# open-cluster-management.io/api v0.9.1-0.20230113024003-6529d52caeeb
## explicit; go 1.19
open-cluster-management.io/api/client/cluster/clientset/versioned
open-cluster-management.io/api/client/cluster/clientset/versioned/scheme

View File

@@ -69,6 +69,10 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface {
func NewForConfig(c *rest.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.UserAgent == "" {
configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent()
}
// share the transport between all clients
httpClient, err := rest.HTTPClientFor(&configShallowCopy)
if err != nil {

View File

@@ -53,6 +53,10 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface {
func NewForConfig(c *rest.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.UserAgent == "" {
configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent()
}
// share the transport between all clients
httpClient, err := rest.HTTPClientFor(&configShallowCopy)
if err != nil {

View File

@@ -229,3 +229,8 @@ type ManagedClusterList struct {
// Items is a list of managed clusters.
Items []ManagedCluster `json:"items"`
}
const (
// ClusterNameLabelKey is the key of a label to set ManagedCluster name.
ClusterNameLabelKey = "open-cluster-management.io/cluster-name"
)

View File

@@ -1,13 +1,13 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: placementmanifestworks.work.open-cluster-management.io
name: placemanifestworks.work.open-cluster-management.io
spec:
group: work.open-cluster-management.io
names:
kind: PlaceManifestWork
listKind: PlaceManifestWorkList
plural: placementmanifestworks
plural: placemanifestworks
shortNames:
- pmw
- pmws

View File

@@ -28,7 +28,7 @@ import (
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:root=true
// +kubebuilder:resource:path=placementmanifestworks,shortName=pmw;pmws,scope=Namespaced
// +kubebuilder:resource:path=placemanifestworks,shortName=pmw;pmws,scope=Namespaced
// +kubebuilder:storageversion
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Placement",type="string",JSONPath=".status.conditions[?(@.type==\"PlacementVerified\")].reason",description="Reason"