mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-02-14 18:09:57 +00:00
🐛 Use context cancel func to trigger reselect (#658)
* Use context cancel func to trigger reselect Signed-off-by: Jian Qiu <jqiu@redhat.com> * Add test to improve coverage Signed-off-by: Jian Qiu <jqiu@redhat.com> --------- Signed-off-by: Jian Qiu <jqiu@redhat.com>
This commit is contained in:
@@ -3,7 +3,6 @@ package spoke
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
@@ -116,25 +115,3 @@ func checkBootstrapKubeConfigValid(ctx context.Context, managedCluster, bootstra
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// reSelectChecker is a health checker that checks if the bootstrap kubeconfig should be reselected.
|
||||
//
|
||||
// It is used by 2 controllers: the hubTimeoutController and hubAcceptController.
|
||||
//
|
||||
// If timeout to connect to a hub or the hubAcceptsClient flag is set to false, then shouldReSelect
|
||||
// is set to true. And then checker will return an error, trigger the agent to restart and reselect
|
||||
// the bootstrap kubeconfig.
|
||||
type reSelectChecker struct {
|
||||
shouldReSelect bool
|
||||
}
|
||||
|
||||
func (b *reSelectChecker) Check(_ *http.Request) error {
|
||||
if b.shouldReSelect {
|
||||
return fmt.Errorf("reselect bootstrap kubeconfig")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *reSelectChecker) Name() string {
|
||||
return "reSelectChecker"
|
||||
}
|
||||
|
||||
@@ -54,10 +54,11 @@ type SpokeAgentConfig struct {
|
||||
|
||||
internalHubConfigValidFunc wait.ConditionWithContextFunc
|
||||
|
||||
hubKubeConfigChecker *hubKubeConfigHealthChecker
|
||||
bootstrapKubeconfigEventHandler *bootstrapKubeconfigEventHandler
|
||||
hubKubeConfigChecker *hubKubeConfigHealthChecker
|
||||
|
||||
reSelectChecker *reSelectChecker
|
||||
// agentStopFunc is the function to stop the agent, and the external system can restart the agent
|
||||
// with the refreshed configuration.
|
||||
agentStopFunc context.CancelFunc
|
||||
}
|
||||
|
||||
// NewSpokeAgentConfig returns a SpokeAgentConfig
|
||||
@@ -67,12 +68,7 @@ func NewSpokeAgentConfig(commonOpts *commonoptions.AgentOptions, opts *SpokeAgen
|
||||
agentOptions: commonOpts,
|
||||
registrationOption: opts,
|
||||
driver: registerDriver,
|
||||
|
||||
reSelectChecker: &reSelectChecker{shouldReSelect: false},
|
||||
bootstrapKubeconfigEventHandler: &bootstrapKubeconfigEventHandler{
|
||||
bootstrapKubeconfigSecretName: &opts.BootstrapKubeconfigSecret,
|
||||
cancel: cancel,
|
||||
},
|
||||
agentStopFunc: cancel,
|
||||
}
|
||||
cfg.hubKubeConfigChecker = &hubKubeConfigHealthChecker{
|
||||
checkFunc: cfg.IsHubKubeConfigValid,
|
||||
@@ -84,7 +80,6 @@ func NewSpokeAgentConfig(commonOpts *commonoptions.AgentOptions, opts *SpokeAgen
|
||||
func (o *SpokeAgentConfig) HealthCheckers() []healthz.HealthChecker {
|
||||
return []healthz.HealthChecker{
|
||||
o.hubKubeConfigChecker,
|
||||
o.reSelectChecker,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,8 +207,8 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to select a bootstrap kubeconfig: %w", err)
|
||||
}
|
||||
logger.Info("Select bootstrap kubeconfig", "index", index, "file", o.registrationOption.BootstrapKubeconfigs[index])
|
||||
|
||||
recorder.Eventf("BootstrapSelected", "Select bootstrap kubeconfig with index %d and file %s",
|
||||
index, o.registrationOption.BootstrapKubeconfigs[index])
|
||||
o.currentBootstrapKubeConfig = o.registrationOption.BootstrapKubeconfigs[index]
|
||||
} else {
|
||||
o.currentBootstrapKubeConfig = o.registrationOption.BootstrapKubeconfig
|
||||
@@ -243,17 +238,18 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
|
||||
go spokeClusterCreatingController.Run(ctx, 1)
|
||||
|
||||
secretInformer := namespacedManagementKubeInformerFactory.Core().V1().Secrets()
|
||||
if o.bootstrapKubeconfigEventHandler != nil {
|
||||
// Register BootstrapKubeconfigEventHandler as an event handler of secret informer,
|
||||
// monitor the bootstrap kubeconfig and restart the pod immediately if it changes.
|
||||
//
|
||||
// The BootstrapKubeconfigEventHandler was originally part of the healthcheck and was
|
||||
// moved out to take some cases into account. For example, the work agent may resync a
|
||||
// wrong bootstrap kubeconfig from the cache before restarting since the healthcheck will
|
||||
// retry 3 times.
|
||||
if _, err = secretInformer.Informer().AddEventHandler(o.bootstrapKubeconfigEventHandler); err != nil {
|
||||
return err
|
||||
}
|
||||
// Register BootstrapKubeconfigEventHandler as an event handler of secret informer,
|
||||
// monitor the bootstrap kubeconfig and restart the pod immediately if it changes.
|
||||
//
|
||||
// The BootstrapKubeconfigEventHandler was originally part of the healthcheck and was
|
||||
// moved out to take some cases into account. For example, the work agent may resync a
|
||||
// wrong bootstrap kubeconfig from the cache before restarting since the healthcheck will
|
||||
// retry 3 times.
|
||||
if _, err = secretInformer.Informer().AddEventHandler(&bootstrapKubeconfigEventHandler{
|
||||
bootstrapKubeconfigSecretName: &o.registrationOption.BootstrapKubeconfigSecret,
|
||||
cancel: o.agentStopFunc,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hubKubeconfigSecretController := registration.NewHubKubeconfigSecretController(
|
||||
@@ -448,7 +444,7 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
|
||||
hubClusterInformerFactory.Cluster().V1().ManagedClusters(),
|
||||
func(ctx context.Context) error {
|
||||
logger.Info("Failed to connect to hub because of hubAcceptClient set to false, restart agent to reselect a new bootstrap kubeconfig")
|
||||
o.reSelectChecker.shouldReSelect = true
|
||||
o.agentStopFunc()
|
||||
return nil
|
||||
},
|
||||
recorder,
|
||||
@@ -460,7 +456,7 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
|
||||
o.registrationOption.HubConnectionTimeoutSeconds,
|
||||
func(ctx context.Context) error {
|
||||
logger.Info("Failed to connect to hub because of lease out-of-date, restart agent to reselect a new bootstrap kubeconfig")
|
||||
o.reSelectChecker.shouldReSelect = true
|
||||
o.agentStopFunc()
|
||||
return nil
|
||||
},
|
||||
recorder,
|
||||
|
||||
@@ -3,13 +3,20 @@ package spoke
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
"github.com/openshift/library-go/pkg/controller/controllercmd"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/client-go/rest"
|
||||
"sigs.k8s.io/controller-runtime/pkg/envtest"
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/log"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log/zap"
|
||||
|
||||
ocmfeature "open-cluster-management.io/api/feature"
|
||||
|
||||
@@ -17,8 +24,20 @@ import (
|
||||
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
|
||||
"open-cluster-management.io/ocm/pkg/features"
|
||||
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
|
||||
"open-cluster-management.io/ocm/test/integration/util"
|
||||
)
|
||||
|
||||
var testEnv *envtest.Environment
|
||||
var cfg *rest.Config
|
||||
var bootstrapKubeConfigFile string
|
||||
var authn *util.TestAuthn
|
||||
|
||||
var CRDPaths = []string{
|
||||
// agent
|
||||
"../../../vendor/open-cluster-management.io/api/cluster/v1/0000_00_clusters.open-cluster-management.io_managedclusters.crd.yaml",
|
||||
"../../../vendor/open-cluster-management.io/api/addon/v1alpha1/0000_01_addon.open-cluster-management.io_managedclusteraddons.crd.yaml",
|
||||
}
|
||||
|
||||
func init() {
|
||||
utilruntime.Must(features.SpokeMutableFeatureGate.Add(ocmfeature.DefaultSpokeRegistrationFeatureGates))
|
||||
}
|
||||
@@ -192,3 +211,78 @@ func TestGetSpokeClusterCABundle(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager(t *testing.T) {
|
||||
gomega.RegisterFailHandler(ginkgo.Fail)
|
||||
ginkgo.RunSpecs(t, "Manager Suite")
|
||||
}
|
||||
|
||||
var _ = ginkgo.BeforeSuite(func() {
|
||||
logf.SetLogger(zap.New(zap.WriteTo(ginkgo.GinkgoWriter), zap.UseDevMode(true)))
|
||||
|
||||
ginkgo.By("bootstrapping test environment")
|
||||
|
||||
var err error
|
||||
|
||||
authn = util.DefaultTestAuthn
|
||||
apiServer := &envtest.APIServer{}
|
||||
apiServer.SecureServing.Authn = authn
|
||||
|
||||
testEnv = &envtest.Environment{
|
||||
ControlPlane: envtest.ControlPlane{
|
||||
APIServer: apiServer,
|
||||
},
|
||||
ErrorIfCRDPathMissing: true,
|
||||
CRDDirectoryPaths: CRDPaths,
|
||||
}
|
||||
|
||||
cfg, err = testEnv.Start()
|
||||
gomega.Expect(err).ToNot(gomega.HaveOccurred())
|
||||
gomega.Expect(cfg).ToNot(gomega.BeNil())
|
||||
|
||||
// prepare configs
|
||||
securePort := testEnv.ControlPlane.APIServer.SecureServing.Port
|
||||
gomega.Expect(len(securePort)).ToNot(gomega.BeZero())
|
||||
|
||||
serverCertFile := fmt.Sprintf("%s/apiserver.crt", testEnv.ControlPlane.APIServer.CertDir)
|
||||
bootstrapKubeConfigFile = path.Join(util.TestDir, "bootstrap", "kubeconfig")
|
||||
err = authn.CreateBootstrapKubeConfigWithCertAge(bootstrapKubeConfigFile, serverCertFile, securePort, 24*time.Hour)
|
||||
gomega.Expect(err).NotTo(gomega.HaveOccurred())
|
||||
|
||||
err = features.SpokeMutableFeatureGate.Add(ocmfeature.DefaultSpokeRegistrationFeatureGates)
|
||||
gomega.Expect(err).ToNot(gomega.HaveOccurred())
|
||||
})
|
||||
|
||||
var _ = ginkgo.AfterSuite(func() {
|
||||
ginkgo.By("tearing down the test environment")
|
||||
|
||||
err := testEnv.Stop()
|
||||
gomega.Expect(err).ToNot(gomega.HaveOccurred())
|
||||
})
|
||||
|
||||
var _ = ginkgo.Describe("start agent", func() {
|
||||
ginkgo.It("start hub manager", func() {
|
||||
_ = features.SpokeMutableFeatureGate.SetFromMap(map[string]bool{
|
||||
string(ocmfeature.MultipleHubs): false,
|
||||
})
|
||||
ctx, stopAgent := context.WithCancel(context.Background())
|
||||
agentOptions := NewSpokeAgentOptions()
|
||||
commonOptions := commonoptions.NewAgentOptions()
|
||||
commonOptions.AgentID = "test"
|
||||
commonOptions.SpokeClusterName = "cluster1"
|
||||
agentOptions.BootstrapKubeconfig = bootstrapKubeConfigFile
|
||||
|
||||
agentConfig := NewSpokeAgentConfig(commonOptions, agentOptions, stopAgent)
|
||||
go func() {
|
||||
err := agentConfig.RunSpokeAgent(ctx, &controllercmd.ControllerContext{
|
||||
KubeConfig: cfg,
|
||||
EventRecorder: util.NewIntegrationTestEventRecorder("agent"),
|
||||
})
|
||||
// should get err since bootstrap is not finished yet.
|
||||
gomega.Expect(err).Should(gomega.HaveOccurred())
|
||||
}()
|
||||
// wait for 5 second until the controller is started
|
||||
time.Sleep(5 * time.Second)
|
||||
stopAgent()
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user