diff --git a/pkg/spoke/controllers/manifestcontroller/manifestwork_controller.go b/pkg/spoke/controllers/manifestcontroller/manifestwork_controller.go index c81502c49..3ac9671ce 100644 --- a/pkg/spoke/controllers/manifestcontroller/manifestwork_controller.go +++ b/pkg/spoke/controllers/manifestcontroller/manifestwork_controller.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "reflect" - "strconv" "strings" "time" @@ -19,6 +18,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "github.com/openshift/library-go/pkg/controller/factory" @@ -145,7 +145,19 @@ func (m *ManifestWorkController) sync(ctx context.Context, controllerContext fac errs := []error{} // Apply resources on spoke cluster. - resourceResults := m.applyManifest(manifestWork.Spec.Workload.Manifests, controllerContext.Recorder(), *owner) + resourceResults := make([]resourceapply.ApplyResult, len(manifestWork.Spec.Workload.Manifests)) + retry.RetryOnConflict(retry.DefaultBackoff, func() error { + resourceResults = m.applyManifests(manifestWork.Spec.Workload.Manifests, controllerContext.Recorder(), *owner, resourceResults) + + for _, result := range resourceResults { + if errors.IsConflict(result.Error) { + return result.Error + } + } + + return nil + }) + newManifestConditions := []workapiv1.ManifestCondition{} for index, result := range resourceResults { if result.Error != nil { @@ -181,37 +193,49 @@ func (m *ManifestWorkController) sync(ctx context.Context, controllerContext fac return err } -func (m *ManifestWorkController) applyManifest(manifests []workapiv1.Manifest, recorder events.Recorder, owner metav1.OwnerReference) []resourceapply.ApplyResult { +func (m *ManifestWorkController) applyManifests( + manifests []workapiv1.Manifest, recorder events.Recorder, owner metav1.OwnerReference, existingResults []resourceapply.ApplyResult) []resourceapply.ApplyResult { + for index, manifest := range manifests { + switch { + case existingResults[index].Result == nil: + // Apply if there is not result. + existingResults[index] = m.applyOneManifest(manifest, recorder, owner) + case errors.IsConflict(existingResults[index].Error): + // Apply if there is a resource confilct error. + existingResults[index] = m.applyOneManifest(manifest, recorder, owner) + } + } + + return existingResults +} + +func (m *ManifestWorkController) applyOneManifest( + manifest workapiv1.Manifest, recorder events.Recorder, owner metav1.OwnerReference) resourceapply.ApplyResult { clientHolder := resourceapply.NewClientHolder(). WithAPIExtensionsClient(m.spokeAPIExtensionClient). WithKubernetes(m.spokeKubeclient). WithDynamicClient(m.spokeDynamicClient) - // Using index as the file name and apply manifests - indexstrings := []string{} - for i := 0; i < len(manifests); i++ { - indexstrings = append(indexstrings, strconv.Itoa(i)) - } results := resourceapply.ApplyDirectly(clientHolder, recorder, func(name string) ([]byte, error) { - index, _ := strconv.ParseInt(name, 10, 32) unstructuredObj := &unstructured.Unstructured{} - err := unstructuredObj.UnmarshalJSON(manifests[index].Raw) + err := unstructuredObj.UnmarshalJSON(manifest.Raw) if err != nil { return nil, err } unstructuredObj.SetOwnerReferences([]metav1.OwnerReference{owner}) return unstructuredObj.MarshalJSON() - }, indexstrings...) + }, "manifest") // Try apply with dynamic client if the manifest cannot be decoded by scheme or typed client is not found // TODO we should check the certain error. - for index, result := range results { - // Use dynamic client when scheme cannot decode manifest or typed client cannot handle the object - if isDecodeError(result.Error) || isUnhandledError(result.Error) || isUnsupportedError(result.Error) { - results[index].Result, results[index].Changed, results[index].Error = m.applyUnstructrued(manifests[index].Raw, owner, recorder) - } + // Use dynamic client when scheme cannot decode manifest or typed client cannot handle the object + result := results[0] + + if isDecodeError(result.Error) || isUnhandledError(result.Error) || isUnsupportedError(result.Error) { + result.Result, result.Changed, result.Error = m.applyUnstructrued(manifest.Raw, owner, recorder) } - return results + + return result } func (m *ManifestWorkController) decodeUnstructured(data []byte) (schema.GroupVersionResource, *unstructured.Unstructured, error) {