mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-05-17 22:58:53 +00:00
Merge pull request #89 from qiujian16/resolve-conflict
Resolve apply conflict
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -19,6 +18,7 @@ import (
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/openshift/library-go/pkg/controller/factory"
|
||||
@@ -145,7 +145,19 @@ func (m *ManifestWorkController) sync(ctx context.Context, controllerContext fac
|
||||
|
||||
errs := []error{}
|
||||
// Apply resources on spoke cluster.
|
||||
resourceResults := m.applyManifest(manifestWork.Spec.Workload.Manifests, controllerContext.Recorder(), *owner)
|
||||
resourceResults := make([]resourceapply.ApplyResult, len(manifestWork.Spec.Workload.Manifests))
|
||||
retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
resourceResults = m.applyManifests(manifestWork.Spec.Workload.Manifests, controllerContext.Recorder(), *owner, resourceResults)
|
||||
|
||||
for _, result := range resourceResults {
|
||||
if errors.IsConflict(result.Error) {
|
||||
return result.Error
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
newManifestConditions := []workapiv1.ManifestCondition{}
|
||||
for index, result := range resourceResults {
|
||||
if result.Error != nil {
|
||||
@@ -181,37 +193,49 @@ func (m *ManifestWorkController) sync(ctx context.Context, controllerContext fac
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *ManifestWorkController) applyManifest(manifests []workapiv1.Manifest, recorder events.Recorder, owner metav1.OwnerReference) []resourceapply.ApplyResult {
|
||||
func (m *ManifestWorkController) applyManifests(
|
||||
manifests []workapiv1.Manifest, recorder events.Recorder, owner metav1.OwnerReference, existingResults []resourceapply.ApplyResult) []resourceapply.ApplyResult {
|
||||
for index, manifest := range manifests {
|
||||
switch {
|
||||
case existingResults[index].Result == nil:
|
||||
// Apply if there is not result.
|
||||
existingResults[index] = m.applyOneManifest(manifest, recorder, owner)
|
||||
case errors.IsConflict(existingResults[index].Error):
|
||||
// Apply if there is a resource confilct error.
|
||||
existingResults[index] = m.applyOneManifest(manifest, recorder, owner)
|
||||
}
|
||||
}
|
||||
|
||||
return existingResults
|
||||
}
|
||||
|
||||
func (m *ManifestWorkController) applyOneManifest(
|
||||
manifest workapiv1.Manifest, recorder events.Recorder, owner metav1.OwnerReference) resourceapply.ApplyResult {
|
||||
clientHolder := resourceapply.NewClientHolder().
|
||||
WithAPIExtensionsClient(m.spokeAPIExtensionClient).
|
||||
WithKubernetes(m.spokeKubeclient).
|
||||
WithDynamicClient(m.spokeDynamicClient)
|
||||
|
||||
// Using index as the file name and apply manifests
|
||||
indexstrings := []string{}
|
||||
for i := 0; i < len(manifests); i++ {
|
||||
indexstrings = append(indexstrings, strconv.Itoa(i))
|
||||
}
|
||||
results := resourceapply.ApplyDirectly(clientHolder, recorder, func(name string) ([]byte, error) {
|
||||
index, _ := strconv.ParseInt(name, 10, 32)
|
||||
unstructuredObj := &unstructured.Unstructured{}
|
||||
err := unstructuredObj.UnmarshalJSON(manifests[index].Raw)
|
||||
err := unstructuredObj.UnmarshalJSON(manifest.Raw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
unstructuredObj.SetOwnerReferences([]metav1.OwnerReference{owner})
|
||||
return unstructuredObj.MarshalJSON()
|
||||
}, indexstrings...)
|
||||
}, "manifest")
|
||||
|
||||
// Try apply with dynamic client if the manifest cannot be decoded by scheme or typed client is not found
|
||||
// TODO we should check the certain error.
|
||||
for index, result := range results {
|
||||
// Use dynamic client when scheme cannot decode manifest or typed client cannot handle the object
|
||||
if isDecodeError(result.Error) || isUnhandledError(result.Error) || isUnsupportedError(result.Error) {
|
||||
results[index].Result, results[index].Changed, results[index].Error = m.applyUnstructrued(manifests[index].Raw, owner, recorder)
|
||||
}
|
||||
// Use dynamic client when scheme cannot decode manifest or typed client cannot handle the object
|
||||
result := results[0]
|
||||
|
||||
if isDecodeError(result.Error) || isUnhandledError(result.Error) || isUnsupportedError(result.Error) {
|
||||
result.Result, result.Changed, result.Error = m.applyUnstructrued(manifest.Raw, owner, recorder)
|
||||
}
|
||||
return results
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (m *ManifestWorkController) decodeUnstructured(data []byte) (schema.GroupVersionResource, *unstructured.Unstructured, error) {
|
||||
|
||||
Reference in New Issue
Block a user