Resolve apply conflict

Signed-off-by: Jian Qiu <jqiu@redhat.com>
This commit is contained in:
Jian Qiu
2021-07-20 11:28:28 +08:00
parent 2a9a4aa091
commit 0ccab23cc2

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"reflect"
"strconv"
"strings"
"time"
@@ -19,6 +18,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"github.com/openshift/library-go/pkg/controller/factory"
@@ -145,7 +145,19 @@ func (m *ManifestWorkController) sync(ctx context.Context, controllerContext fac
errs := []error{}
// Apply resources on spoke cluster.
resourceResults := m.applyManifest(manifestWork.Spec.Workload.Manifests, controllerContext.Recorder(), *owner)
resourceResults := make([]resourceapply.ApplyResult, len(manifestWork.Spec.Workload.Manifests))
retry.RetryOnConflict(retry.DefaultBackoff, func() error {
resourceResults = m.applyManifests(manifestWork.Spec.Workload.Manifests, controllerContext.Recorder(), *owner, resourceResults)
for _, result := range resourceResults {
if errors.IsConflict(result.Error) {
return result.Error
}
}
return nil
})
newManifestConditions := []workapiv1.ManifestCondition{}
for index, result := range resourceResults {
if result.Error != nil {
@@ -181,37 +193,49 @@ func (m *ManifestWorkController) sync(ctx context.Context, controllerContext fac
return err
}
func (m *ManifestWorkController) applyManifest(manifests []workapiv1.Manifest, recorder events.Recorder, owner metav1.OwnerReference) []resourceapply.ApplyResult {
func (m *ManifestWorkController) applyManifests(
manifests []workapiv1.Manifest, recorder events.Recorder, owner metav1.OwnerReference, existingResults []resourceapply.ApplyResult) []resourceapply.ApplyResult {
for index, manifest := range manifests {
switch {
case existingResults[index].Result == nil:
// Apply if there is not result.
existingResults[index] = m.applyOneManifest(manifest, recorder, owner)
case errors.IsConflict(existingResults[index].Error):
// Apply if there is a resource confilct error.
existingResults[index] = m.applyOneManifest(manifest, recorder, owner)
}
}
return existingResults
}
func (m *ManifestWorkController) applyOneManifest(
manifest workapiv1.Manifest, recorder events.Recorder, owner metav1.OwnerReference) resourceapply.ApplyResult {
clientHolder := resourceapply.NewClientHolder().
WithAPIExtensionsClient(m.spokeAPIExtensionClient).
WithKubernetes(m.spokeKubeclient).
WithDynamicClient(m.spokeDynamicClient)
// Using index as the file name and apply manifests
indexstrings := []string{}
for i := 0; i < len(manifests); i++ {
indexstrings = append(indexstrings, strconv.Itoa(i))
}
results := resourceapply.ApplyDirectly(clientHolder, recorder, func(name string) ([]byte, error) {
index, _ := strconv.ParseInt(name, 10, 32)
unstructuredObj := &unstructured.Unstructured{}
err := unstructuredObj.UnmarshalJSON(manifests[index].Raw)
err := unstructuredObj.UnmarshalJSON(manifest.Raw)
if err != nil {
return nil, err
}
unstructuredObj.SetOwnerReferences([]metav1.OwnerReference{owner})
return unstructuredObj.MarshalJSON()
}, indexstrings...)
}, "manifest")
// Try apply with dynamic client if the manifest cannot be decoded by scheme or typed client is not found
// TODO we should check the certain error.
for index, result := range results {
// Use dynamic client when scheme cannot decode manifest or typed client cannot handle the object
if isDecodeError(result.Error) || isUnhandledError(result.Error) || isUnsupportedError(result.Error) {
results[index].Result, results[index].Changed, results[index].Error = m.applyUnstructrued(manifests[index].Raw, owner, recorder)
}
// Use dynamic client when scheme cannot decode manifest or typed client cannot handle the object
result := results[0]
if isDecodeError(result.Error) || isUnhandledError(result.Error) || isUnsupportedError(result.Error) {
result.Result, result.Changed, result.Error = m.applyUnstructrued(manifest.Raw, owner, recorder)
}
return results
return result
}
func (m *ManifestWorkController) decodeUnstructured(data []byte) (schema.GroupVersionResource, *unstructured.Unstructured, error) {