Feat: rework cluster to align velaux and cli (#3299)

Signed-off-by: Somefive <yd219913@alibaba-inc.com>
This commit is contained in:
Somefive
2022-02-22 15:48:22 +08:00
committed by GitHub
parent fb831345f0
commit 7059c8df79
16 changed files with 1197 additions and 677 deletions

View File

@@ -114,11 +114,3 @@ var DefaultFilterAnnots = []string{
oam.AnnotationFilterAnnotationKeys,
oam.AnnotationLastAppliedConfiguration,
}
// Cluster contains base info of cluster
type Cluster struct {
Name string
Type string
EndPoint string
Accepted bool
}

View File

@@ -18,7 +18,7 @@ FAIL=false
for file in $(git ls-files | grep "\.go$" | grep -v vendor/); do
echo -n "Header check: $file... "
if [[ -z $(cat ${file} | grep "Copyright [0-9]\{4\}.\? The KubeVela Authors") && -z $(cat ${file} | grep "Copyright [0-9]\{4\} The Crossplane Authors") ]]; then
if [[ -z $(cat ${file} | grep "Copyright [0-9]\{4\}\(-[0-9]\{4\}\)\?.\? The KubeVela Authors") && -z $(cat ${file} | grep "Copyright [0-9]\{4\} The Crossplane Authors") ]]; then
ERR=true
fi
if [ $ERR == true ]; then

View File

@@ -33,6 +33,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
velatypes "github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/apiserver/clients"
"github.com/oam-dev/kubevela/pkg/apiserver/datastore"
"github.com/oam-dev/kubevela/pkg/apiserver/log"
@@ -198,14 +199,14 @@ func joinClusterByKubeConfigString(ctx context.Context, k8sClient client.Client,
defer func() {
_ = os.Remove(tmpFileName)
}()
cluster, err := multicluster.JoinClusterByKubeConfig(ctx, k8sClient, tmpFileName, clusterName)
clusterConfig, err := multicluster.JoinClusterByKubeConfig(ctx, k8sClient, tmpFileName, clusterName, multicluster.JoinClusterCreateNamespaceOption(velatypes.DefaultKubeVelaNS))
if err != nil {
if errors.Is(err, multicluster.ErrClusterExists) {
return "", bcode.ErrClusterExistsInKubernetes
}
return "", errors.Wrapf(err, "failed to join cluster")
}
return cluster.Server, nil
return clusterConfig.Cluster.Server, nil
}
func createClusterModelFromRequest(req apis.CreateClusterRequest, oldCluster *model.Cluster) (newCluster *model.Cluster) {

View File

@@ -1,153 +0,0 @@
/*
Copyright 2021 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clustermanager
import (
"context"
"fmt"
"github.com/oam-dev/cluster-gateway/pkg/apis/cluster/v1alpha1"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
crdv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierror "k8s.io/apimachinery/pkg/api/errors"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/clientcmd"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/multicluster"
"github.com/oam-dev/kubevela/pkg/utils/common"
)
// GetClient returns a kube client for given kubeConfigData
func GetClient(kubeConfigData []byte) (client.Client, error) {
clientConfig, err := clientcmd.NewClientConfigFromBytes(kubeConfigData)
if err != nil {
return nil, err
}
restConfig, err := clientConfig.ClientConfig()
if err != nil {
return nil, err
}
return client.New(restConfig, client.Options{Scheme: common.Scheme})
}
// GetRegisteredClusters will get all registered clusters in control plane
func GetRegisteredClusters(c client.Client) ([]types.Cluster, error) {
var clusters []types.Cluster
secrets := corev1.SecretList{}
if err := c.List(context.Background(), &secrets, client.HasLabels{v1alpha1.LabelKeyClusterCredentialType}, client.InNamespace(multicluster.ClusterGatewaySecretNamespace)); err != nil {
return nil, errors.Wrapf(err, "failed to get clusterSecret secrets")
}
for _, clusterSecret := range secrets.Items {
endpoint := string(clusterSecret.Data["endpoint"])
if endp, ok := clusterSecret.GetLabels()[v1alpha1.LabelKeyClusterEndpointType]; ok {
endpoint = endp
}
clusters = append(clusters, types.Cluster{
Name: clusterSecret.Name,
Type: clusterSecret.GetLabels()[v1alpha1.LabelKeyClusterCredentialType],
EndPoint: endpoint,
Accepted: true,
})
}
crdName := k8stypes.NamespacedName{Name: "managedclusters." + clusterv1.GroupName}
if err := c.Get(context.Background(), crdName, &crdv1.CustomResourceDefinition{}); err != nil {
if apierror.IsNotFound(err) {
return clusters, nil
}
return nil, err
}
managedClusters := clusterv1.ManagedClusterList{}
if err := c.List(context.Background(), &managedClusters); err != nil {
return nil, errors.Wrapf(err, "failed to get managed clusters")
}
for _, cluster := range managedClusters.Items {
if len(cluster.Spec.ManagedClusterClientConfigs) != 0 {
clusters = append(clusters, types.Cluster{
Name: cluster.Name,
Type: "OCM ManagedServiceAccount",
EndPoint: "-",
Accepted: cluster.Spec.HubAcceptsClient,
})
}
}
return clusters, nil
}
// EnsureClusterNotExists will check the cluster is not existed in control plane
func EnsureClusterNotExists(c client.Client, clusterName string) error {
exist, err := clusterExists(c, clusterName)
if err != nil {
return err
}
if exist {
return fmt.Errorf("cluster %s already exists", clusterName)
}
return nil
}
// EnsureClusterExists will check the cluster is existed in control plane
func EnsureClusterExists(c client.Client, clusterName string) error {
exist, err := clusterExists(c, clusterName)
if err != nil {
return err
}
if !exist {
return fmt.Errorf("cluster %s not exists", clusterName)
}
return nil
}
// clusterExists will check whether the cluster exist or not
func clusterExists(c client.Client, clusterName string) (bool, error) {
err := c.Get(context.Background(),
k8stypes.NamespacedName{
Name: clusterName,
Namespace: multicluster.ClusterGatewaySecretNamespace,
},
&corev1.Secret{})
if err == nil {
return true, nil
}
if !apierror.IsNotFound(err) {
return false, errors.Wrapf(err, "failed to check duplicate cluster")
}
crdName := k8stypes.NamespacedName{Name: "managedclusters." + clusterv1.GroupName}
if err = c.Get(context.Background(), crdName, &crdv1.CustomResourceDefinition{}); err != nil {
if apierror.IsNotFound(err) {
return false, nil
}
return false, errors.Wrapf(err, "failed to get managedcluster CRD to check duplicate cluster")
}
err = c.Get(context.Background(), k8stypes.NamespacedName{
Name: clusterName,
Namespace: multicluster.ClusterGatewaySecretNamespace,
}, &clusterv1.ManagedCluster{})
if err == nil {
return true, nil
}
if !apierror.IsNotFound(err) {
return false, errors.Wrapf(err, "failed to check duplicate cluster")
}
return false, nil
}

View File

@@ -17,69 +17,479 @@ limitations under the License.
package multicluster
import (
"bytes"
"context"
"fmt"
v1alpha12 "github.com/oam-dev/cluster-gateway/pkg/apis/cluster/v1alpha1"
"github.com/briandowns/spinner"
"github.com/oam-dev/cluster-register/pkg/hub"
"github.com/oam-dev/cluster-register/pkg/spoke"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
v14 "k8s.io/api/storage/v1"
errors2 "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
types2 "k8s.io/apimachinery/pkg/types"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
ocmclusterv1 "open-cluster-management.io/api/cluster/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
clusterv1alpha1 "github.com/oam-dev/cluster-gateway/pkg/apis/cluster/v1alpha1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/policy/envbinding"
errors3 "github.com/oam-dev/kubevela/pkg/utils/errors"
"github.com/oam-dev/kubevela/pkg/utils"
velaerrors "github.com/oam-dev/kubevela/pkg/utils/errors"
cmdutil "github.com/oam-dev/kubevela/pkg/utils/util"
)
// ensureVelaSystemNamespaceInstalled ensures vela namespace to be installed in child cluster
func ensureVelaSystemNamespaceInstalled(ctx context.Context, c client.Client, clusterName string, createNamespace string) error {
remoteCtx := ContextWithClusterName(ctx, clusterName)
if err := c.Get(remoteCtx, types2.NamespacedName{Name: createNamespace}, &v1.Namespace{}); err != nil {
if !errors2.IsNotFound(err) {
return errors.Wrapf(err, "failed to check vela-system ")
}
if err = c.Create(remoteCtx, &v1.Namespace{ObjectMeta: v12.ObjectMeta{Name: createNamespace}}); err != nil {
return errors.Wrapf(err, "failed to create vela-system namespace")
// KubeClusterConfig info for cluster management
type KubeClusterConfig struct {
ClusterName string
CreateNamespace string
*clientcmdapi.Config
*clientcmdapi.Cluster
*clientcmdapi.AuthInfo
// Logs records intermediate logs (which do not return error) during running
Logs bytes.Buffer
}
// SetClusterName set cluster name if not empty
func (clusterConfig *KubeClusterConfig) SetClusterName(clusterName string) *KubeClusterConfig {
if clusterName != "" {
clusterConfig.ClusterName = clusterName
}
return clusterConfig
}
// SetCreateNamespace set create namespace, if empty, no namespace will be created
func (clusterConfig *KubeClusterConfig) SetCreateNamespace(createNamespace string) *KubeClusterConfig {
clusterConfig.CreateNamespace = createNamespace
return clusterConfig
}
// Validate check if config is valid for join
func (clusterConfig *KubeClusterConfig) Validate() error {
switch clusterConfig.ClusterName {
case "":
return errors.Errorf("ClusterName cannot be empty")
case ClusterLocalName:
return errors.Errorf("ClusterName cannot be `%s`, it is reserved as the local cluster", ClusterLocalName)
}
return nil
}
// RegisterByVelaSecret create cluster secrets for KubeVela to use
func (clusterConfig *KubeClusterConfig) RegisterByVelaSecret(ctx context.Context, cli client.Client) error {
if err := ensureClusterNotExists(ctx, cli, clusterConfig.ClusterName); err != nil {
return errors.Wrapf(err, "cannot use cluster name %s", clusterConfig.ClusterName)
}
var credentialType clusterv1alpha1.CredentialType
data := map[string][]byte{
"endpoint": []byte(clusterConfig.Cluster.Server),
"ca.crt": clusterConfig.Cluster.CertificateAuthorityData,
}
if len(clusterConfig.AuthInfo.Token) > 0 {
credentialType = clusterv1alpha1.CredentialTypeServiceAccountToken
data["token"] = []byte(clusterConfig.AuthInfo.Token)
} else {
credentialType = clusterv1alpha1.CredentialTypeX509Certificate
data["tls.crt"] = clusterConfig.AuthInfo.ClientCertificateData
data["tls.key"] = clusterConfig.AuthInfo.ClientKeyData
}
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: clusterConfig.ClusterName,
Namespace: ClusterGatewaySecretNamespace,
Labels: map[string]string{
clusterv1alpha1.LabelKeyClusterCredentialType: string(credentialType),
},
},
Type: corev1.SecretTypeOpaque,
Data: data,
}
if err := cli.Create(ctx, secret); err != nil {
return errors.Wrapf(err, "failed to add cluster to kubernetes")
}
// TODO(somefive): create namespace now only work for cluster secret
if clusterConfig.CreateNamespace != "" {
if err := ensureNamespaceExists(ctx, cli, clusterConfig.ClusterName, clusterConfig.CreateNamespace); err != nil {
_ = cli.Delete(ctx, secret)
return errors.Wrapf(err, "failed to ensure %s namespace installed in cluster %s", clusterConfig.CreateNamespace, clusterConfig.ClusterName)
}
}
return nil
}
// ensureClusterNotExists checks if child cluster has already been joined, if joined, error is returned
// RegisterClusterManagedByOCM create ocm managed cluster for use
// TODO(somefive): OCM ManagedCluster only support cli join now
func (clusterConfig *KubeClusterConfig) RegisterClusterManagedByOCM(ctx context.Context, args *JoinClusterArgs) error {
newTrackingSpinner := args.trackingSpinnerFactory
hubCluster, err := hub.NewHubCluster(args.hubConfig)
if err != nil {
return errors.Wrap(err, "fail to create client connect to hub cluster")
}
hubTracker := newTrackingSpinner("Checking the environment of hub cluster..")
hubTracker.FinalMSG = "Hub cluster all set, continue registration.\n"
hubTracker.Start()
crdName := apitypes.NamespacedName{Name: "managedclusters." + ocmclusterv1.GroupName}
if err := hubCluster.Client.Get(context.Background(), crdName, &apiextensionsv1.CustomResourceDefinition{}); err != nil {
return err
}
clusters, err := ListVirtualClusters(context.Background(), hubCluster.Client)
if err != nil {
return err
}
for _, cluster := range clusters {
if cluster.Name == clusterConfig.ClusterName && cluster.Accepted {
return errors.Errorf("you have register a cluster named %s", clusterConfig.ClusterName)
}
}
hubTracker.Stop()
spokeRestConf, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return clusterConfig.Config, nil
})
if err != nil {
return errors.Wrap(err, "fail to convert spoke-cluster kubeconfig")
}
spokeTracker := newTrackingSpinner("Building registration config for the managed cluster")
spokeTracker.FinalMSG = "Successfully prepared registration config.\n"
spokeTracker.Start()
overridingRegistrationEndpoint := ""
if !*args.inClusterBootstrap {
args.ioStreams.Infof("Using the api endpoint from hub kubeconfig %q as registration entry.\n", args.hubConfig.Host)
overridingRegistrationEndpoint = args.hubConfig.Host
}
hubKubeToken, err := hubCluster.GenerateHubClusterKubeConfig(ctx, overridingRegistrationEndpoint)
if err != nil {
return errors.Wrap(err, "fail to generate the token for spoke-cluster")
}
spokeCluster, err := spoke.NewSpokeCluster(clusterConfig.ClusterName, spokeRestConf, hubKubeToken)
if err != nil {
return errors.Wrap(err, "fail to connect spoke cluster")
}
err = spokeCluster.InitSpokeClusterEnv(ctx)
if err != nil {
return errors.Wrap(err, "fail to prepare the env for spoke-cluster")
}
spokeTracker.Stop()
registrationOperatorTracker := newTrackingSpinner("Waiting for registration operators running: (`kubectl -n open-cluster-management get pod -l app=klusterlet`)")
registrationOperatorTracker.FinalMSG = "Registration operator successfully deployed.\n"
registrationOperatorTracker.Start()
if err := spokeCluster.WaitForRegistrationOperatorReady(ctx); err != nil {
return errors.Wrap(err, "fail to setup registration operator for spoke-cluster")
}
registrationOperatorTracker.Stop()
registrationAgentTracker := newTrackingSpinner("Waiting for registration agent running: (`kubectl -n open-cluster-management-agent get pod -l app=klusterlet-registration-agent`)")
registrationAgentTracker.FinalMSG = "Registration agent successfully deployed.\n"
registrationAgentTracker.Start()
if err := spokeCluster.WaitForRegistrationAgentReady(ctx); err != nil {
return errors.Wrap(err, "fail to setup registration agent for spoke-cluster")
}
registrationAgentTracker.Stop()
csrCreationTracker := newTrackingSpinner("Waiting for CSRs created (`kubectl get csr -l open-cluster-management.io/cluster-name=" + spokeCluster.Name + "`)")
csrCreationTracker.FinalMSG = "Successfully found corresponding CSR from the agent.\n"
csrCreationTracker.Start()
if err := hubCluster.WaitForCSRCreated(ctx, spokeCluster.Name); err != nil {
return errors.Wrap(err, "failed found CSR created by registration agent")
}
csrCreationTracker.Stop()
args.ioStreams.Infof("Approving the CSR for cluster %q.\n", spokeCluster.Name)
if err := hubCluster.ApproveCSR(ctx, spokeCluster.Name); err != nil {
return errors.Wrap(err, "failed found CSR created by registration agent")
}
ready, err := hubCluster.WaitForSpokeClusterReady(ctx, clusterConfig.ClusterName)
if err != nil || !ready {
return errors.Errorf("fail to waiting for register request")
}
if err = hubCluster.RegisterSpokeCluster(ctx, spokeCluster.Name); err != nil {
return errors.Wrap(err, "fail to approve spoke cluster")
}
return nil
}
// LoadKubeClusterConfigFromFile create KubeClusterConfig from kubeconfig file
func LoadKubeClusterConfigFromFile(filepath string) (*KubeClusterConfig, error) {
clusterConfig := &KubeClusterConfig{}
var err error
clusterConfig.Config, err = clientcmd.LoadFromFile(filepath)
if err != nil {
return nil, errors.Wrapf(err, "failed to get kubeconfig")
}
if len(clusterConfig.Config.CurrentContext) == 0 {
return nil, fmt.Errorf("current-context is not set")
}
var ok bool
ctx, ok := clusterConfig.Config.Contexts[clusterConfig.Config.CurrentContext]
if !ok {
return nil, fmt.Errorf("current-context %s not found", clusterConfig.Config.CurrentContext)
}
clusterConfig.Cluster, ok = clusterConfig.Config.Clusters[ctx.Cluster]
if !ok {
return nil, fmt.Errorf("cluster %s not found", ctx.Cluster)
}
clusterConfig.AuthInfo, ok = clusterConfig.Config.AuthInfos[ctx.AuthInfo]
if !ok {
return nil, fmt.Errorf("authInfo %s not found", ctx.AuthInfo)
}
clusterConfig.ClusterName = ctx.Cluster
if endpoint, err := utils.ParseAPIServerEndpoint(clusterConfig.Cluster.Server); err == nil {
clusterConfig.Cluster.Server = endpoint
} else {
_, _ = fmt.Fprintf(&clusterConfig.Logs, "failed to parse server endpoint: %v", err)
}
return clusterConfig, nil
}
const (
// ClusterGateWayEngine cluster-gateway cluster management solution
ClusterGateWayEngine = "cluster-gateway"
// OCMEngine ocm cluster management solution
OCMEngine = "ocm"
)
// JoinClusterArgs args for join cluster
type JoinClusterArgs struct {
engine string
createNamespace string
ioStreams cmdutil.IOStreams
hubConfig *rest.Config
inClusterBootstrap *bool
trackingSpinnerFactory func(string) *spinner.Spinner
}
func newJoinClusterArgs(options ...JoinClusterOption) *JoinClusterArgs {
args := &JoinClusterArgs{
engine: ClusterGateWayEngine,
}
for _, op := range options {
op.ApplyToArgs(args)
}
return args
}
// JoinClusterOption option for join cluster
type JoinClusterOption interface {
ApplyToArgs(args *JoinClusterArgs)
}
// JoinClusterCreateNamespaceOption create namespace when join cluster, if empty, no creation
type JoinClusterCreateNamespaceOption string
// ApplyToArgs apply to args
func (op JoinClusterCreateNamespaceOption) ApplyToArgs(args *JoinClusterArgs) {
args.createNamespace = string(op)
}
// JoinClusterEngineOption configure engine for join cluster, either cluster-gateway or ocm
type JoinClusterEngineOption string
// ApplyToArgs apply to args
func (op JoinClusterEngineOption) ApplyToArgs(args *JoinClusterArgs) {
args.engine = string(op)
}
// JoinClusterOCMOptions options used when joining clusters by ocm, only support cli for now
type JoinClusterOCMOptions struct {
IoStreams cmdutil.IOStreams
HubConfig *rest.Config
InClusterBootstrap *bool
TrackingSpinnerFactory func(string) *spinner.Spinner
}
// ApplyToArgs apply to args
func (op JoinClusterOCMOptions) ApplyToArgs(args *JoinClusterArgs) {
args.ioStreams = op.IoStreams
args.hubConfig = op.HubConfig
args.inClusterBootstrap = op.InClusterBootstrap
args.trackingSpinnerFactory = op.TrackingSpinnerFactory
}
// JoinClusterByKubeConfig add child cluster by kubeconfig path, return cluster info and error
func JoinClusterByKubeConfig(ctx context.Context, cli client.Client, kubeconfigPath string, clusterName string, options ...JoinClusterOption) (*KubeClusterConfig, error) {
args := newJoinClusterArgs(options...)
clusterConfig, err := LoadKubeClusterConfigFromFile(kubeconfigPath)
if err != nil {
return nil, err
}
if err := clusterConfig.SetClusterName(clusterName).SetCreateNamespace(args.createNamespace).Validate(); err != nil {
return nil, err
}
switch args.engine {
case ClusterGateWayEngine:
if err = clusterConfig.RegisterByVelaSecret(ctx, cli); err != nil {
return nil, err
}
case OCMEngine:
if args.inClusterBootstrap == nil {
return nil, errors.Wrapf(err, "failed to determine the registration endpoint for the hub cluster "+
"when parsing --in-cluster-bootstrap flag")
}
if err = clusterConfig.RegisterClusterManagedByOCM(ctx, args); err != nil {
return clusterConfig, err
}
}
return clusterConfig, nil
}
// DetachClusterArgs args for detaching cluster
type DetachClusterArgs struct {
managedClusterKubeConfigPath string
}
func newDetachClusterArgs(options ...DetachClusterOption) *DetachClusterArgs {
args := &DetachClusterArgs{}
for _, op := range options {
op.ApplyToArgs(args)
}
return args
}
// DetachClusterOption option for detach cluster
type DetachClusterOption interface {
ApplyToArgs(args *DetachClusterArgs)
}
// DetachClusterManagedClusterKubeConfigPathOption configure the managed cluster kubeconfig path while detach ocm cluster
type DetachClusterManagedClusterKubeConfigPathOption string
// ApplyToArgs apply to args
func (op DetachClusterManagedClusterKubeConfigPathOption) ApplyToArgs(args *DetachClusterArgs) {
args.managedClusterKubeConfigPath = string(op)
}
// DetachCluster detach cluster by name, if cluster is using by application, it will return error
func DetachCluster(ctx context.Context, cli client.Client, clusterName string, options ...DetachClusterOption) error {
args := newDetachClusterArgs(options...)
if clusterName == ClusterLocalName {
return ErrReservedLocalClusterName
}
vc, err := GetVirtualCluster(ctx, cli, clusterName)
if err != nil {
return err
}
switch vc.Type {
case clusterv1alpha1.CredentialTypeX509Certificate, clusterv1alpha1.CredentialTypeServiceAccountToken:
clusterSecret, err := getMutableClusterSecret(ctx, cli, clusterName)
if err != nil {
return errors.Wrapf(err, "cluster %s is not mutable now", clusterName)
}
if err := cli.Delete(ctx, clusterSecret); err != nil {
return errors.Wrapf(err, "failed to detach cluster %s", clusterName)
}
case CredentialTypeOCMManagedCluster:
if args.managedClusterKubeConfigPath == "" {
return errors.New("kubeconfig-path must be set to detach ocm managed cluster")
}
config, err := clientcmd.LoadFromFile(args.managedClusterKubeConfigPath)
if err != nil {
return err
}
restConfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return config, nil
})
if err != nil {
return err
}
if err = spoke.CleanSpokeClusterEnv(restConfig); err != nil {
return err
}
managedCluster := ocmclusterv1.ManagedCluster{ObjectMeta: metav1.ObjectMeta{Name: clusterName}}
if err = cli.Delete(context.Background(), &managedCluster); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
}
}
return nil
}
// RenameCluster rename cluster
func RenameCluster(ctx context.Context, k8sClient client.Client, oldClusterName string, newClusterName string) error {
if newClusterName == ClusterLocalName {
return ErrReservedLocalClusterName
}
clusterSecret, err := getMutableClusterSecret(ctx, k8sClient, oldClusterName)
if err != nil {
return errors.Wrapf(err, "cluster %s is not mutable now", oldClusterName)
}
if err := ensureClusterNotExists(ctx, k8sClient, newClusterName); err != nil {
return errors.Wrapf(err, "cannot set cluster name to %s", newClusterName)
}
if err := k8sClient.Delete(ctx, clusterSecret); err != nil {
return errors.Wrapf(err, "failed to rename cluster from %s to %s", oldClusterName, newClusterName)
}
clusterSecret.ObjectMeta = metav1.ObjectMeta{
Name: newClusterName,
Namespace: ClusterGatewaySecretNamespace,
Labels: clusterSecret.Labels,
Annotations: clusterSecret.Annotations,
}
if err := k8sClient.Create(ctx, clusterSecret); err != nil {
return errors.Wrapf(err, "failed to rename cluster from %s to %s", oldClusterName, newClusterName)
}
return nil
}
// ensureClusterNotExists will check the cluster is not existed in control plane
func ensureClusterNotExists(ctx context.Context, c client.Client, clusterName string) error {
secret := &v1.Secret{}
err := c.Get(ctx, types2.NamespacedName{Name: clusterName, Namespace: ClusterGatewaySecretNamespace}, secret)
if err == nil {
return ErrClusterExists
_, err := GetVirtualCluster(ctx, c, clusterName)
if err != nil {
if IsClusterNotExists(err) {
return nil
}
return err
}
if !errors2.IsNotFound(err) {
return errors.Wrapf(err, "failed to check duplicate cluster secret")
return ErrClusterExists
}
// ensureNamespaceExists ensures vela namespace to be installed in child cluster
func ensureNamespaceExists(ctx context.Context, c client.Client, clusterName string, createNamespace string) error {
remoteCtx := ContextWithClusterName(ctx, clusterName)
if err := c.Get(remoteCtx, apitypes.NamespacedName{Name: createNamespace}, &corev1.Namespace{}); err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to check if namespace %s exists", createNamespace)
}
if err = c.Create(remoteCtx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: createNamespace}}); err != nil {
return errors.Wrapf(err, "failed to create namespace %s", createNamespace)
}
}
return nil
}
// GetMutableClusterSecret retrieves the cluster secret and check if any application is using the cluster
func GetMutableClusterSecret(ctx context.Context, c client.Client, clusterName string) (*v1.Secret, error) {
clusterSecret := &v1.Secret{}
if err := c.Get(ctx, types2.NamespacedName{Namespace: ClusterGatewaySecretNamespace, Name: clusterName}, clusterSecret); err != nil {
// getMutableClusterSecret retrieves the cluster secret and check if any application is using the cluster
// TODO(somefive): should rework the logic of checking application cluster usage
func getMutableClusterSecret(ctx context.Context, c client.Client, clusterName string) (*corev1.Secret, error) {
clusterSecret := &corev1.Secret{}
if err := c.Get(ctx, apitypes.NamespacedName{Namespace: ClusterGatewaySecretNamespace, Name: clusterName}, clusterSecret); err != nil {
return nil, errors.Wrapf(err, "failed to find target cluster secret %s", clusterName)
}
labels := clusterSecret.GetLabels()
if labels == nil || labels[v1alpha12.LabelKeyClusterCredentialType] == "" {
return nil, fmt.Errorf("invalid cluster secret %s: cluster credential type label %s is not set", clusterName, v1alpha12.LabelKeyClusterCredentialType)
if labels == nil || labels[clusterv1alpha1.LabelKeyClusterCredentialType] == "" {
return nil, fmt.Errorf("invalid cluster secret %s: cluster credential type label %s is not set", clusterName, clusterv1alpha1.LabelKeyClusterCredentialType)
}
apps := &v1beta1.ApplicationList{}
if err := c.List(ctx, apps); err != nil {
return nil, errors.Wrap(err, "failed to find applications to check clusters")
}
errs := errors3.ErrorList{}
errs := velaerrors.ErrorList{}
for _, app := range apps.Items {
status, err := envbinding.GetEnvBindingPolicyStatus(app.DeepCopy(), "")
if err == nil && status != nil {
@@ -97,167 +507,3 @@ func GetMutableClusterSecret(ctx context.Context, c client.Client, clusterName s
}
return clusterSecret, nil
}
// JoinClusterByKubeConfig add child cluster by kubeconfig path, return cluster info and error
func JoinClusterByKubeConfig(_ctx context.Context, k8sClient client.Client, kubeconfigPath string, clusterName string) (*api.Cluster, error) {
config, err := clientcmd.LoadFromFile(kubeconfigPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to get kubeconfig")
}
if len(config.CurrentContext) == 0 {
return nil, fmt.Errorf("current-context is not set")
}
ctx, ok := config.Contexts[config.CurrentContext]
if !ok {
return nil, fmt.Errorf("current-context %s not found", config.CurrentContext)
}
cluster, ok := config.Clusters[ctx.Cluster]
if !ok {
return nil, fmt.Errorf("cluster %s not found", ctx.Cluster)
}
authInfo, ok := config.AuthInfos[ctx.AuthInfo]
if !ok {
return nil, fmt.Errorf("authInfo %s not found", ctx.AuthInfo)
}
if clusterName == "" {
clusterName = ctx.Cluster
}
if clusterName == ClusterLocalName {
return cluster, fmt.Errorf("cannot use `%s` as cluster name, it is reserved as the local cluster", ClusterLocalName)
}
if err := ensureClusterNotExists(_ctx, k8sClient, clusterName); err != nil {
return cluster, errors.Wrapf(err, "cannot use cluster name %s", clusterName)
}
var credentialType v1alpha12.CredentialType
data := map[string][]byte{
"endpoint": []byte(cluster.Server),
"ca.crt": cluster.CertificateAuthorityData,
}
if len(authInfo.Token) > 0 {
credentialType = v1alpha12.CredentialTypeServiceAccountToken
data["token"] = []byte(authInfo.Token)
} else {
credentialType = v1alpha12.CredentialTypeX509Certificate
data["tls.crt"] = authInfo.ClientCertificateData
data["tls.key"] = authInfo.ClientKeyData
}
secret := &v1.Secret{
ObjectMeta: v12.ObjectMeta{
Name: clusterName,
Namespace: ClusterGatewaySecretNamespace,
Labels: map[string]string{
v1alpha12.LabelKeyClusterCredentialType: string(credentialType),
},
},
Type: v1.SecretTypeOpaque,
Data: data,
}
if err := k8sClient.Create(_ctx, secret); err != nil {
return cluster, errors.Wrapf(err, "failed to add cluster to kubernetes")
}
if err := ensureVelaSystemNamespaceInstalled(_ctx, k8sClient, clusterName, types.DefaultKubeVelaNS); err != nil {
return nil, errors.Wrapf(err, "failed to create vela namespace in cluster %s", clusterName)
}
return cluster, nil
}
// DetachCluster detach cluster by name, if cluster is using by application, it will return error
func DetachCluster(ctx context.Context, k8sClient client.Client, clusterName string) error {
if clusterName == ClusterLocalName {
return ErrReservedLocalClusterName
}
clusterSecret, err := GetMutableClusterSecret(ctx, k8sClient, clusterName)
if err != nil {
return errors.Wrapf(err, "cluster %s is not mutable now", clusterName)
}
return k8sClient.Delete(ctx, clusterSecret)
}
// RenameCluster rename cluster
func RenameCluster(ctx context.Context, k8sClient client.Client, oldClusterName string, newClusterName string) error {
if newClusterName == ClusterLocalName {
return ErrReservedLocalClusterName
}
clusterSecret, err := GetMutableClusterSecret(ctx, k8sClient, oldClusterName)
if err != nil {
return errors.Wrapf(err, "cluster %s is not mutable now", oldClusterName)
}
if err := ensureClusterNotExists(ctx, k8sClient, newClusterName); err != nil {
return errors.Wrapf(err, "cannot set cluster name to %s", newClusterName)
}
if err := k8sClient.Delete(ctx, clusterSecret); err != nil {
return errors.Wrapf(err, "failed to rename cluster from %s to %s", oldClusterName, newClusterName)
}
clusterSecret.ObjectMeta = v12.ObjectMeta{
Name: newClusterName,
Namespace: ClusterGatewaySecretNamespace,
Labels: clusterSecret.Labels,
Annotations: clusterSecret.Annotations,
}
if err := k8sClient.Create(ctx, clusterSecret); err != nil {
return errors.Wrapf(err, "failed to rename cluster from %s to %s", oldClusterName, newClusterName)
}
return nil
}
// ClusterInfo describes the basic information of a cluster
type ClusterInfo struct {
Nodes *v1.NodeList
WorkerNumber int
MasterNumber int
MemoryCapacity resource.Quantity
CPUCapacity resource.Quantity
PodCapacity resource.Quantity
MemoryAllocatable resource.Quantity
CPUAllocatable resource.Quantity
PodAllocatable resource.Quantity
StorageClasses *v14.StorageClassList
}
// GetClusterInfo retrieves current cluster info from cluster
func GetClusterInfo(_ctx context.Context, k8sClient client.Client, clusterName string) (*ClusterInfo, error) {
ctx := ContextWithClusterName(_ctx, clusterName)
nodes := &v1.NodeList{}
if err := k8sClient.List(ctx, nodes); err != nil {
return nil, errors.Wrapf(err, "failed to list cluster nodes")
}
var workerNumber, masterNumber int
var memoryCapacity, cpuCapacity, podCapacity, memoryAllocatable, cpuAllocatable, podAllcatable resource.Quantity
for _, node := range nodes.Items {
if _, ok := node.Labels["node-role.kubernetes.io/master"]; ok {
masterNumber++
} else {
workerNumber++
}
capacity := node.Status.Capacity
memoryCapacity.Add(*capacity.Memory())
cpuCapacity.Add(*capacity.Cpu())
podCapacity.Add(*capacity.Pods())
allocatable := node.Status.Allocatable
memoryAllocatable.Add(*allocatable.Memory())
cpuAllocatable.Add(*allocatable.Cpu())
podAllcatable.Add(*allocatable.Pods())
}
storageClasses := &v14.StorageClassList{}
if err := k8sClient.List(ctx, storageClasses); err != nil {
return nil, errors.Wrapf(err, "failed to list storage classes")
}
return &ClusterInfo{
Nodes: nodes,
WorkerNumber: workerNumber,
MasterNumber: masterNumber,
MemoryCapacity: memoryCapacity,
CPUCapacity: cpuCapacity,
PodCapacity: podCapacity,
MemoryAllocatable: memoryAllocatable,
CPUAllocatable: cpuAllocatable,
PodAllocatable: podAllcatable,
StorageClasses: storageClasses,
}, nil
}

View File

@@ -26,6 +26,8 @@ import (
var (
// ErrClusterExists cluster already exists
ErrClusterExists = ClusterManagementError(fmt.Errorf("cluster already exists"))
// ErrClusterNotExists cluster not exists
ErrClusterNotExists = ClusterManagementError(fmt.Errorf("no such cluster"))
// ErrReservedLocalClusterName reserved cluster name is used
ErrReservedLocalClusterName = ClusterManagementError(fmt.Errorf("cluster name `local` is reserved for kubevela hub cluster"))
)

83
pkg/multicluster/o11n.go Normal file
View File

@@ -0,0 +1,83 @@
/*
Copyright 2020-2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multicluster
import (
"context"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// ClusterInfo describes the basic information of a cluster
type ClusterInfo struct {
Nodes *corev1.NodeList
WorkerNumber int
MasterNumber int
MemoryCapacity resource.Quantity
CPUCapacity resource.Quantity
PodCapacity resource.Quantity
MemoryAllocatable resource.Quantity
CPUAllocatable resource.Quantity
PodAllocatable resource.Quantity
StorageClasses *storagev1.StorageClassList
}
// GetClusterInfo retrieves current cluster info from cluster
func GetClusterInfo(_ctx context.Context, k8sClient client.Client, clusterName string) (*ClusterInfo, error) {
ctx := ContextWithClusterName(_ctx, clusterName)
nodes := &corev1.NodeList{}
if err := k8sClient.List(ctx, nodes); err != nil {
return nil, errors.Wrapf(err, "failed to list cluster nodes")
}
var workerNumber, masterNumber int
var memoryCapacity, cpuCapacity, podCapacity, memoryAllocatable, cpuAllocatable, podAllocatable resource.Quantity
for _, node := range nodes.Items {
if _, ok := node.Labels["node-role.kubernetes.io/master"]; ok {
masterNumber++
} else {
workerNumber++
}
capacity := node.Status.Capacity
memoryCapacity.Add(*capacity.Memory())
cpuCapacity.Add(*capacity.Cpu())
podCapacity.Add(*capacity.Pods())
allocatable := node.Status.Allocatable
memoryAllocatable.Add(*allocatable.Memory())
cpuAllocatable.Add(*allocatable.Cpu())
podAllocatable.Add(*allocatable.Pods())
}
storageClasses := &storagev1.StorageClassList{}
if err := k8sClient.List(ctx, storageClasses); err != nil {
return nil, errors.Wrapf(err, "failed to list storage classes")
}
return &ClusterInfo{
Nodes: nodes,
WorkerNumber: workerNumber,
MasterNumber: masterNumber,
MemoryCapacity: memoryCapacity,
CPUCapacity: cpuCapacity,
PodCapacity: podCapacity,
MemoryAllocatable: memoryAllocatable,
CPUAllocatable: cpuAllocatable,
PodAllocatable: podAllocatable,
StorageClasses: storageClasses,
}, nil
}

View File

@@ -0,0 +1,74 @@
/*
Copyright 2020-2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multicluster
import (
"math/rand"
"testing"
"time"
"github.com/oam-dev/cluster-gateway/pkg/apis/cluster/v1alpha1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/client-go/rest"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"github.com/oam-dev/kubevela/pkg/utils/common"
)
var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment
func TestUtils(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Utils Suite")
}
var _ = BeforeSuite(func(done Done) {
rand.Seed(time.Now().UnixNano())
By("bootstrapping test environment for utils test")
testEnv = &envtest.Environment{
ControlPlaneStartTimeout: time.Minute * 3,
ControlPlaneStopTimeout: time.Minute,
UseExistingCluster: pointer.BoolPtr(false),
CRDDirectoryPaths: []string{"./testdata"},
}
By("start kube test env")
var err error
cfg, err = testEnv.Start()
Expect(err).ShouldNot(HaveOccurred())
Expect(cfg).ToNot(BeNil())
By("new kube client")
cfg.Timeout = time.Minute * 2
Expect(v1alpha1.AddToScheme(common.Scheme)).Should(Succeed())
k8sClient, err = client.New(cfg, client.Options{Scheme: common.Scheme})
Expect(err).Should(BeNil())
Expect(k8sClient).ToNot(BeNil())
close(done)
}, 240)
var _ = AfterSuite(func() {
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).ToNot(HaveOccurred())
})

View File

@@ -0,0 +1,204 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: managedclusters.cluster.open-cluster-management.io
spec:
group: cluster.open-cluster-management.io
names:
kind: ManagedCluster
listKind: ManagedClusterList
plural: managedclusters
shortNames:
- mcl
- mcls
singular: managedcluster
scope: Cluster
preserveUnknownFields: false
versions:
- additionalPrinterColumns:
- jsonPath: .spec.hubAcceptsClient
name: Hub Accepted
type: boolean
- jsonPath: .spec.managedClusterClientConfigs[*].url
name: Managed Cluster URLs
type: string
- jsonPath: .status.conditions[?(@.type=="ManagedClusterJoined")].status
name: Joined
type: string
- jsonPath: .status.conditions[?(@.type=="ManagedClusterConditionAvailable")].status
name: Available
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
name: v1
schema:
openAPIV3Schema:
description: "ManagedCluster represents the desired state and current status of managed cluster. ManagedCluster is a cluster scoped resource. The name is the cluster UID. \n The cluster join process follows a double opt-in process: \n 1. Agent on managed cluster creates CSR on hub with cluster UID and agent name. 2. Agent on managed cluster creates ManagedCluster on hub. 3. Cluster admin on hub approves the CSR for UID and agent name of the ManagedCluster. 4. Cluster admin sets spec.acceptClient of ManagedCluster to true. 5. Cluster admin on managed cluster creates credential of kubeconfig to hub. \n Once the hub creates the cluster namespace, the Klusterlet agent on the ManagedCluster pushes the credential to the hub to use against the kube-apiserver of the ManagedCluster."
type: object
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: Spec represents a desired configuration for the agent on the managed cluster.
type: object
properties:
hubAcceptsClient:
description: hubAcceptsClient represents that hub accepts the joining of Klusterlet agent on the managed cluster with the hub. The default value is false, and can only be set true when the user on hub has an RBAC rule to UPDATE on the virtual subresource of managedclusters/accept. When the value is set true, a namespace whose name is the same as the name of ManagedCluster is created on the hub. This namespace represents the managed cluster, also role/rolebinding is created on the namespace to grant the permision of access from the agent on the managed cluster. When the value is set to false, the namespace representing the managed cluster is deleted.
type: boolean
leaseDurationSeconds:
description: LeaseDurationSeconds is used to coordinate the lease update time of Klusterlet agents on the managed cluster. If its value is zero, the Klusterlet agent will update its lease every 60 seconds by default
type: integer
format: int32
default: 60
managedClusterClientConfigs:
description: ManagedClusterClientConfigs represents a list of the apiserver address of the managed cluster. If it is empty, the managed cluster has no accessible address for the hub to connect with it.
type: array
items:
description: ClientConfig represents the apiserver address of the managed cluster. TODO include credential to connect to managed cluster kube-apiserver
type: object
properties:
caBundle:
description: CABundle is the ca bundle to connect to apiserver of the managed cluster. System certs are used if it is not set.
type: string
format: byte
url:
description: URL is the URL of apiserver endpoint of the managed cluster.
type: string
taints:
description: Taints is a property of managed cluster that allow the cluster to be repelled when scheduling. Taints, including 'ManagedClusterUnavailable' and 'ManagedClusterUnreachable', can not be added/removed by agent running on the managed cluster; while it's fine to add/remove other taints from either hub cluser or managed cluster.
type: array
items:
description: The managed cluster this Taint is attached to has the "effect" on any placement that does not tolerate the Taint.
type: object
required:
- effect
- key
properties:
effect:
description: Effect indicates the effect of the taint on placements that do not tolerate the taint. Valid effects are NoSelect, PreferNoSelect and NoSelectIfNew.
type: string
enum:
- NoSelect
- PreferNoSelect
- NoSelectIfNew
key:
description: Key is the taint key applied to a cluster. e.g. bar or foo.example.com/bar. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt)
type: string
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
timeAdded:
description: TimeAdded represents the time at which the taint was added.
type: string
format: date-time
nullable: true
value:
description: Value is the taint value corresponding to the taint key.
type: string
maxLength: 1024
status:
description: Status represents the current status of joined managed cluster
type: object
properties:
allocatable:
description: Allocatable represents the total allocatable resources on the managed cluster.
type: object
additionalProperties:
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
anyOf:
- type: integer
- type: string
x-kubernetes-int-or-string: true
capacity:
description: Capacity represents the total resource capacity from all nodeStatuses on the managed cluster.
type: object
additionalProperties:
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
anyOf:
- type: integer
- type: string
x-kubernetes-int-or-string: true
clusterClaims:
description: ClusterClaims represents cluster information that a managed cluster claims, for example a unique cluster identifier (id.k8s.io) and kubernetes version (kubeversion.open-cluster-management.io). They are written from the managed cluster. The set of claims is not uniform across a fleet, some claims can be vendor or version specific and may not be included from all managed clusters.
type: array
items:
description: ManagedClusterClaim represents a ClusterClaim collected from a managed cluster.
type: object
properties:
name:
description: Name is the name of a ClusterClaim resource on managed cluster. It's a well known or customized name to identify the claim.
type: string
maxLength: 253
minLength: 1
value:
description: Value is a claim-dependent string
type: string
maxLength: 1024
minLength: 1
conditions:
description: Conditions contains the different condition statuses for this managed cluster.
type: array
items:
description: "Condition contains details for one aspect of the current state of this API Resource. --- This struct is intended for direct use as an array at the field path .status.conditions. For example, type FooStatus struct{ // Represents the observations of a foo's current state. // Known .status.conditions.type are: \"Available\", \"Progressing\", and \"Degraded\" // +patchMergeKey=type // +patchStrategy=merge // +listType=map // +listMapKey=type Conditions []metav1.Condition `json:\"conditions,omitempty\" patchStrategy:\"merge\" patchMergeKey:\"type\" protobuf:\"bytes,1,rep,name=conditions\"` \n // other fields }"
type: object
required:
- lastTransitionTime
- message
- reason
- status
- type
properties:
lastTransitionTime:
description: lastTransitionTime is the last time the condition transitioned from one status to another. This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable.
type: string
format: date-time
message:
description: message is a human readable message indicating details about the transition. This may be an empty string.
type: string
maxLength: 32768
observedGeneration:
description: observedGeneration represents the .metadata.generation that the condition was set based upon. For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date with respect to the current state of the instance.
type: integer
format: int64
minimum: 0
reason:
description: reason contains a programmatic identifier indicating the reason for the condition's last transition. Producers of specific condition types may define expected values and meanings for this field, and whether the values are considered a guaranteed API. The value should be a CamelCase string. This field may not be empty.
type: string
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
status:
description: status of the condition, one of True, False, Unknown.
type: string
enum:
- "True"
- "False"
- Unknown
type:
description: type of condition in CamelCase or in foo.example.com/CamelCase. --- Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be useful (see .node.status.conditions), the ability to deconflict is important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt)
type: string
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
version:
description: Version represents the kubernetes version of the managed cluster.
type: object
properties:
kubernetes:
description: Kubernetes is the kubernetes version of managed cluster.
type: string
served: true
storage: true
subresources:
status: {}
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []

View File

@@ -0,0 +1,186 @@
/*
Copyright 2020-2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multicluster
import (
"context"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apilabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
apitypes "k8s.io/apimachinery/pkg/types"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/cluster-gateway/pkg/apis/cluster/v1alpha1"
velaerrors "github.com/oam-dev/kubevela/pkg/utils/errors"
)
const (
// CredentialTypeOCMManagedCluster identifies the virtual cluster from ocm
CredentialTypeOCMManagedCluster v1alpha1.CredentialType = "ManagedCluster"
)
// VirtualCluster contains base info of cluster, it unifies the difference between different cluster implementations
// like cluster secret or ocm managed cluster
type VirtualCluster struct {
Name string
Type v1alpha1.CredentialType
EndPoint string
Accepted bool
Labels map[string]string
}
// NewVirtualClusterFromSecret extract virtual cluster from cluster secret
func NewVirtualClusterFromSecret(secret *corev1.Secret) (*VirtualCluster, error) {
endpoint := string(secret.Data["endpoint"])
labels := secret.GetLabels()
if labels == nil {
labels = map[string]string{}
}
if _endpoint, ok := labels[v1alpha1.LabelKeyClusterEndpointType]; ok {
endpoint = _endpoint
}
credType, ok := labels[v1alpha1.LabelKeyClusterCredentialType]
if !ok {
return nil, errors.Errorf("secret is not a valid cluster secret, no credential type found")
}
return &VirtualCluster{
Name: secret.Name,
Type: v1alpha1.CredentialType(credType),
EndPoint: endpoint,
Accepted: true,
Labels: labels,
}, nil
}
// NewVirtualClusterFromManagedCluster extract virtual cluster from ocm managed cluster
func NewVirtualClusterFromManagedCluster(managedCluster *clusterv1.ManagedCluster) (*VirtualCluster, error) {
if len(managedCluster.Spec.ManagedClusterClientConfigs) == 0 {
return nil, errors.Errorf("managed cluster has no client config")
}
return &VirtualCluster{
Name: managedCluster.Name,
Type: CredentialTypeOCMManagedCluster,
EndPoint: "-",
Accepted: managedCluster.Spec.HubAcceptsClient,
Labels: managedCluster.GetLabels(),
}, nil
}
// GetVirtualCluster returns virtual cluster with given clusterName
func GetVirtualCluster(ctx context.Context, c client.Client, clusterName string) (vc *VirtualCluster, err error) {
secret := &corev1.Secret{}
err = c.Get(ctx, apitypes.NamespacedName{
Name: clusterName,
Namespace: ClusterGatewaySecretNamespace,
}, secret)
var secretErr error
if err == nil {
vc, secretErr = NewVirtualClusterFromSecret(secret)
if secretErr == nil {
return vc, nil
}
}
if err != nil && !apierrors.IsNotFound(err) {
secretErr = err
}
managedCluster := &clusterv1.ManagedCluster{}
err = c.Get(ctx, apitypes.NamespacedName{
Name: clusterName,
Namespace: ClusterGatewaySecretNamespace,
}, managedCluster)
var managedClusterErr error
if err == nil {
vc, managedClusterErr = NewVirtualClusterFromManagedCluster(managedCluster)
if managedClusterErr == nil {
return vc, nil
}
}
if err != nil && !apierrors.IsNotFound(err) && !velaerrors.IsCRDNotExists(err) {
managedClusterErr = err
}
if secretErr == nil && managedClusterErr == nil {
return nil, ErrClusterNotExists
}
var errs velaerrors.ErrorList
if secretErr != nil {
errs = append(errs, secretErr)
}
if managedClusterErr != nil {
errs = append(errs, managedClusterErr)
}
return nil, errs
}
// MatchVirtualClusterLabels filters the list/delete operation of cluster list
type MatchVirtualClusterLabels map[string]string
// ApplyToList applies this configuration to the given list options.
func (m MatchVirtualClusterLabels) ApplyToList(opts *client.ListOptions) {
sel := apilabels.SelectorFromValidatedSet(map[string]string(m))
r, err := apilabels.NewRequirement(v1alpha1.LabelKeyClusterCredentialType, selection.Exists, nil)
if err == nil {
sel = sel.Add(*r)
}
opts.LabelSelector = sel
opts.Namespace = ClusterGatewaySecretNamespace
}
// ApplyToDeleteAllOf applies this configuration to the given a List options.
func (m MatchVirtualClusterLabels) ApplyToDeleteAllOf(opts *client.DeleteAllOfOptions) {
m.ApplyToList(&opts.ListOptions)
}
// ListVirtualClusters will get all registered clusters in control plane
func ListVirtualClusters(ctx context.Context, c client.Client) ([]VirtualCluster, error) {
return FindVirtualClustersByLabels(ctx, c, map[string]string{})
}
// FindVirtualClustersByLabels will get all virtual clusters with matched labels in control plane
func FindVirtualClustersByLabels(ctx context.Context, c client.Client, labels map[string]string) ([]VirtualCluster, error) {
var clusters []VirtualCluster
secrets := corev1.SecretList{}
if err := c.List(ctx, &secrets, MatchVirtualClusterLabels(labels)); err != nil {
return nil, errors.Wrapf(err, "failed to get clusterSecret secrets")
}
for _, secret := range secrets.Items {
vc, err := NewVirtualClusterFromSecret(secret.DeepCopy())
if err == nil {
clusters = append(clusters, *vc)
}
}
managedClusters := clusterv1.ManagedClusterList{}
if err := c.List(context.Background(), &managedClusters, client.MatchingLabels(labels)); err != nil && !velaerrors.IsCRDNotExists(err) {
return nil, errors.Wrapf(err, "failed to get managed clusters")
}
for _, managedCluster := range managedClusters.Items {
vc, err := NewVirtualClusterFromManagedCluster(managedCluster.DeepCopy())
if err == nil {
clusters = append(clusters, *vc)
}
}
return clusters, nil
}

View File

@@ -0,0 +1,118 @@
/*
Copyright 2020-2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multicluster
import (
"context"
"github.com/oam-dev/cluster-gateway/pkg/apis/cluster/v1alpha1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
)
var _ = Describe("Test Virtual Cluster", func() {
It("Test Virtual Cluster", func() {
ClusterGatewaySecretNamespace = "vela-system"
ctx := context.Background()
Expect(k8sClient.Create(ctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ClusterGatewaySecretNamespace}})).Should(Succeed())
By("Initialize Secrets")
Expect(k8sClient.Create(ctx, &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cluster",
Namespace: ClusterGatewaySecretNamespace,
Labels: map[string]string{
v1alpha1.LabelKeyClusterCredentialType: string(v1alpha1.CredentialTypeX509Certificate),
v1alpha1.LabelKeyClusterEndpointType: v1alpha1.ClusterEndpointTypeConst,
"key": "value",
},
},
})).Should(Succeed())
Expect(k8sClient.Create(ctx, &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-no-label",
Namespace: ClusterGatewaySecretNamespace,
Labels: map[string]string{
v1alpha1.LabelKeyClusterCredentialType: string(v1alpha1.CredentialTypeX509Certificate),
},
},
})).Should(Succeed())
Expect(k8sClient.Create(ctx, &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-invalid",
Namespace: ClusterGatewaySecretNamespace,
},
})).Should(Succeed())
By("Test Get Virtual Cluster From Cluster Secret")
vc, err := GetVirtualCluster(ctx, k8sClient, "test-cluster")
Expect(err).Should(Succeed())
Expect(vc.Type).Should(Equal(v1alpha1.CredentialTypeX509Certificate))
Expect(vc.Labels["key"]).Should(Equal("value"))
_, err = GetVirtualCluster(ctx, k8sClient, "cluster-not-found")
Expect(err).ShouldNot(Succeed())
Expect(err.Error()).Should(ContainSubstring("no such cluster"))
_, err = GetVirtualCluster(ctx, k8sClient, "cluster-invalid")
Expect(err).ShouldNot(Succeed())
Expect(err.Error()).Should(ContainSubstring("not a valid cluster"))
By("Add OCM ManagedCluster")
Expect(k8sClient.Create(ctx, &clusterv1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "ocm-bad-cluster",
Namespace: ClusterGatewaySecretNamespace,
},
})).Should(Succeed())
Expect(k8sClient.Create(ctx, &clusterv1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "ocm-cluster",
Namespace: ClusterGatewaySecretNamespace,
Labels: map[string]string{"key": "value"},
},
Spec: clusterv1.ManagedClusterSpec{
ManagedClusterClientConfigs: []clusterv1.ClientConfig{{URL: "test-url"}},
},
})).Should(Succeed())
By("Test Get Virtual Cluster From OCM")
_, err = GetVirtualCluster(ctx, k8sClient, "ocm-bad-cluster")
Expect(err).ShouldNot(Succeed())
Expect(err.Error()).Should(ContainSubstring("has no client config"))
vc, err = GetVirtualCluster(ctx, k8sClient, "ocm-cluster")
Expect(err).Should(Succeed())
Expect(vc.Type).Should(Equal(CredentialTypeOCMManagedCluster))
By("Test List Virtual Clusters")
vcs, err := ListVirtualClusters(ctx, k8sClient)
Expect(err).Should(Succeed())
Expect(len(vcs)).Should(Equal(3))
vcs, err = FindVirtualClustersByLabels(ctx, k8sClient, map[string]string{"key": "value"})
Expect(err).Should(Succeed())
Expect(len(vcs)).Should(Equal(2))
})
})

28
pkg/utils/errors/crd.go Normal file
View File

@@ -0,0 +1,28 @@
/*
Copyright 2020-2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package errors
import (
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
)
// IsCRDNotExists check if error is crd not exists
func IsCRDNotExists(err error) bool {
var noKindMatchErr *meta.NoKindMatchError
return errors.As(err, &noKindMatchErr)
}

View File

@@ -28,7 +28,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"github.com/oam-dev/kubevela/pkg/apiserver/clients"
"github.com/oam-dev/kubevela/pkg/utils/common"
)
@@ -62,9 +61,6 @@ var _ = BeforeSuite(func(done Done) {
k8sClient, err = client.New(cfg, client.Options{Scheme: common.Scheme})
Expect(err).Should(BeNil())
Expect(k8sClient).ToNot(BeNil())
By("new kube client success")
clients.SetKubeClient(k8sClient)
Expect(err).Should(BeNil())
close(done)
}, 240)

View File

@@ -24,7 +24,6 @@ import (
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/clustermanager"
"github.com/oam-dev/kubevela/pkg/cue/model/value"
"github.com/oam-dev/kubevela/pkg/multicluster"
"github.com/oam-dev/kubevela/pkg/policy/envbinding"
@@ -103,7 +102,7 @@ func (p *provider) MakePlacementDecisions(ctx wfContext.Context, v *value.Value,
}
// check if target cluster exists
if clusterName != multicluster.ClusterLocalName {
if err = clustermanager.EnsureClusterExists(p, clusterName); err != nil {
if _, err := multicluster.GetVirtualCluster(context.Background(), p.Client, clusterName); err != nil {
return errors.Wrapf(err, "failed to get cluster %s for env %s", clusterName, env)
}
}

View File

@@ -273,6 +273,7 @@ func TestMakePlacementDecisions(t *testing.T) {
ObjectMeta: v12.ObjectMeta{
Namespace: multicluster.ClusterGatewaySecretNamespace,
Name: testCase.PreAddCluster,
Labels: map[string]string{v1alpha12.LabelKeyClusterCredentialType: string(v1alpha12.CredentialTypeX509Certificate)},
},
}))
}

View File

@@ -19,29 +19,17 @@ package cli
import (
"context"
"fmt"
"strings"
"github.com/fatih/color"
"github.com/oam-dev/cluster-gateway/pkg/config"
"github.com/oam-dev/cluster-gateway/pkg/generated/clientset/versioned"
"github.com/pkg/errors"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
ocmclusterv1 "open-cluster-management.io/api/cluster/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
clusterv1alpha1 "github.com/oam-dev/cluster-gateway/pkg/apis/cluster/v1alpha1"
"github.com/oam-dev/cluster-gateway/pkg/generated/clientset/versioned"
"github.com/oam-dev/cluster-register/pkg/hub"
"github.com/oam-dev/cluster-register/pkg/spoke"
"k8s.io/utils/pointer"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/clustermanager"
"github.com/oam-dev/kubevela/pkg/multicluster"
"github.com/oam-dev/kubevela/pkg/utils"
"github.com/oam-dev/kubevela/pkg/utils/common"
cmdutil "github.com/oam-dev/kubevela/pkg/utils/util"
)
@@ -58,10 +46,6 @@ const (
// hub kubeconfig will be used for registration.
FlagInClusterBootstrap = "in-cluster-boostrap"
// ClusterGateWayClusterManagement cluster-gateway cluster management solution
ClusterGateWayClusterManagement = "cluster-gateway"
// OCMClusterManagement ocm cluster management solution
OCMClusterManagement = "ocm"
// CreateNamespace specifies the namespace need to create in managedCluster
CreateNamespace = "create-namespace"
)
@@ -113,20 +97,35 @@ func NewClusterListCommand(c *common.Args) *cobra.Command {
Long: "list worker clusters managed by KubeVela.",
Args: cobra.ExactValidArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
table := newUITable().AddRow("CLUSTER", "TYPE", "ENDPOINT")
table := newUITable().AddRow("CLUSTER", "TYPE", "ENDPOINT", "ACCEPTED", "LABELS")
client, err := c.GetClient()
if err != nil {
return err
}
clusters, err := clustermanager.GetRegisteredClusters(client)
clusters, err := multicluster.ListVirtualClusters(context.Background(), client)
if err != nil {
return errors.Wrap(err, "fail to get registered cluster")
}
for _, cluster := range clusters {
table.AddRow(cluster.Name, cluster.Type, cluster.EndPoint)
var labels []string
for k, v := range cluster.Labels {
if !strings.HasPrefix(k, config.MetaApiGroupName) {
labels = append(labels, color.CyanString(k)+"="+color.GreenString(v))
}
}
if len(labels) == 0 {
labels = append(labels, "")
}
for i, l := range labels {
if i == 0 {
table.AddRow(cluster.Name, cluster.Type, cluster.EndPoint, fmt.Sprintf("%v", cluster.Accepted), l)
} else {
table.AddRow("", "", "", "", l)
}
}
}
if len(table.Rows) == 1 {
cmd.Println("No managed cluster found.")
cmd.Println("No cluster found.")
} else {
cmd.Println(table.String())
}
@@ -136,20 +135,6 @@ func NewClusterListCommand(c *common.Args) *cobra.Command {
return cmd
}
func ensureVelaSystemNamespaceInstalled(c client.Client, clusterName string, createNamespace string) error {
ctx := context.Background()
remoteCtx := multicluster.ContextWithClusterName(ctx, clusterName)
if err := c.Get(remoteCtx, k8stypes.NamespacedName{Name: createNamespace}, &corev1.Namespace{}); err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to check vela-system ")
}
if err = c.Create(remoteCtx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: createNamespace}}); err != nil {
return errors.Wrapf(err, "failed to create vela-system namespace")
}
}
return nil
}
// NewClusterJoinCommand create command to help user join cluster to multicluster management
func NewClusterJoinCommand(c *common.Args, ioStreams cmdutil.IOStreams) *cobra.Command {
cmd := &cobra.Command{
@@ -160,55 +145,20 @@ func NewClusterJoinCommand(c *common.Args, ioStreams cmdutil.IOStreams) *cobra.C
"> vela cluster join my-child-cluster.kubeconfig --name example-cluster",
Args: cobra.ExactValidArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
config, err := clientcmd.LoadFromFile(args[0])
if err != nil {
return errors.Wrapf(err, "failed to get kubeconfig")
}
if len(config.CurrentContext) == 0 {
return fmt.Errorf("current-context is not set")
}
ctx, ok := config.Contexts[config.CurrentContext]
if !ok {
return fmt.Errorf("current-context %s not found", config.CurrentContext)
}
cluster, ok := config.Clusters[ctx.Cluster]
if !ok {
return fmt.Errorf("cluster %s not found", ctx.Cluster)
}
authInfo, ok := config.AuthInfos[ctx.AuthInfo]
if !ok {
return fmt.Errorf("authInfo %s not found", ctx.AuthInfo)
}
// get ClusterName from flag or config
clusterName, err := cmd.Flags().GetString(FlagClusterName)
if err != nil {
return errors.Wrapf(err, "failed to get cluster name flag")
}
if clusterName == "" {
clusterName = ctx.Cluster
}
if clusterName == multicluster.ClusterLocalName {
return fmt.Errorf("cannot use `%s` as cluster name, it is reserved as the local cluster", multicluster.ClusterLocalName)
}
clusterManagementType, err := cmd.Flags().GetString(FlagClusterManagementEngine)
if err != nil {
return errors.Wrapf(err, "failed to get cluster management type flag")
}
if clusterManagementType == "" {
clusterManagementType = ClusterGateWayClusterManagement
}
// get need created namespace in managed cluster
createNamespace, err := cmd.Flags().GetString(CreateNamespace)
if err != nil {
return errors.Wrapf(err, "failed to get create namespace")
}
if createNamespace == "" {
createNamespace = types.DefaultKubeVelaNS
}
client, err := c.GetClient()
if err != nil {
return err
@@ -217,175 +167,38 @@ func NewClusterJoinCommand(c *common.Args, ioStreams cmdutil.IOStreams) *cobra.C
if err != nil {
return err
}
switch clusterManagementType {
case ClusterGateWayClusterManagement:
if endpoint, err := utils.ParseAPIServerEndpoint(cluster.Server); err == nil {
cluster.Server = endpoint
} else {
ioStreams.Infof("failed to parse server endpoint: %v", err)
}
if err = registerClusterManagedByVela(client, cluster, authInfo, clusterName, createNamespace); err != nil {
return err
}
case OCMClusterManagement:
inClusterBootstrap, err := cmd.Flags().GetBool(FlagInClusterBootstrap)
if err != nil {
return errors.Wrapf(err, "failed to determine the registration endpoint for the hub cluster "+
"when parsing --in-cluster-bootstrap flag")
}
if err = registerClusterManagedByOCM(ioStreams, restConfig, config, clusterName, inClusterBootstrap); err != nil {
return err
}
var inClusterBootstrap *bool
if _inClusterBootstrap, err := cmd.Flags().GetBool(FlagInClusterBootstrap); err == nil {
inClusterBootstrap = pointer.Bool(_inClusterBootstrap)
}
cmd.Printf("Successfully add cluster %s, endpoint: %s.\n", clusterName, cluster.Server)
managedClusterKubeConfig := args[0]
clusterConfig, err := multicluster.JoinClusterByKubeConfig(context.Background(), client, managedClusterKubeConfig, clusterName,
multicluster.JoinClusterCreateNamespaceOption(createNamespace),
multicluster.JoinClusterEngineOption(clusterManagementType),
multicluster.JoinClusterOCMOptions{
InClusterBootstrap: inClusterBootstrap,
IoStreams: ioStreams,
HubConfig: restConfig,
TrackingSpinnerFactory: newTrackingSpinner,
})
if err != nil {
return err
}
cmd.Printf("Successfully add cluster %s, endpoint: %s.\n", clusterName, clusterConfig.Cluster.Server)
return nil
},
}
cmd.Flags().StringP(FlagClusterName, "n", "", "Specify the cluster name. If empty, it will use the cluster name in config file. Default to be empty.")
cmd.Flags().StringP(FlagClusterManagementEngine, "t", "", "Specify the cluster management engine. If empty, it will use cluster-gateway cluster management solution. Default to be empty.")
cmd.Flags().StringP(CreateNamespace, "", "", "Specifies the namespace need to create in managedCluster")
cmd.Flags().StringP(FlagClusterManagementEngine, "t", multicluster.ClusterGateWayEngine, "Specify the cluster management engine. If empty, it will use cluster-gateway cluster management solution. Default to be empty.")
cmd.Flags().StringP(CreateNamespace, "", types.DefaultKubeVelaNS, "Specifies the namespace need to create in managedCluster")
cmd.Flags().BoolP(FlagInClusterBootstrap, "", true, "If true, the registering managed cluster "+
`will use the internal endpoint prescribed in the hub cluster's configmap "kube-public/cluster-info to register "`+
"itself to the hub cluster. Otherwise use the original endpoint from the hub kubeconfig.")
return cmd
}
func registerClusterManagedByVela(k8sClient client.Client, cluster *clientcmdapi.Cluster, authInfo *clientcmdapi.AuthInfo, clusterName string, createNamespace string) error {
if err := clustermanager.EnsureClusterNotExists(k8sClient, clusterName); err != nil {
return errors.Wrapf(err, "cannot use cluster name %s", clusterName)
}
var credentialType clusterv1alpha1.CredentialType
data := map[string][]byte{
"endpoint": []byte(cluster.Server),
"ca.crt": cluster.CertificateAuthorityData,
}
if len(authInfo.Token) > 0 {
credentialType = clusterv1alpha1.CredentialTypeServiceAccountToken
data["token"] = []byte(authInfo.Token)
} else {
credentialType = clusterv1alpha1.CredentialTypeX509Certificate
data["tls.crt"] = authInfo.ClientCertificateData
data["tls.key"] = authInfo.ClientKeyData
}
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: multicluster.ClusterGatewaySecretNamespace,
Labels: map[string]string{
clusterv1alpha1.LabelKeyClusterCredentialType: string(credentialType),
},
},
Type: corev1.SecretTypeOpaque,
Data: data,
}
if err := k8sClient.Create(context.Background(), secret); err != nil {
return errors.Wrapf(err, "failed to add cluster to kubernetes")
}
if err := ensureVelaSystemNamespaceInstalled(k8sClient, clusterName, createNamespace); err != nil {
_ = k8sClient.Delete(context.Background(), secret)
return errors.Wrapf(err, "failed to ensure vela-system namespace installed in cluster %s", clusterName)
}
return nil
}
func registerClusterManagedByOCM(ioStreams cmdutil.IOStreams, hubConfig *rest.Config, spokeConfig *clientcmdapi.Config, clusterName string, inClusterBootstrap bool) error {
ctx := context.Background()
hubCluster, err := hub.NewHubCluster(hubConfig)
if err != nil {
return errors.Wrap(err, "fail to create client connect to hub cluster")
}
hubTracker := newTrackingSpinner("Checking the environment of hub cluster..")
hubTracker.FinalMSG = "Hub cluster all set, continue registration.\n"
hubTracker.Start()
crdName := k8stypes.NamespacedName{Name: "managedclusters." + ocmclusterv1.GroupName}
if err := hubCluster.Client.Get(context.Background(), crdName, &apiextensionsv1.CustomResourceDefinition{}); err != nil {
return err
}
clusters, err := clustermanager.GetRegisteredClusters(hubCluster.Client)
if err != nil {
return err
}
for _, cluster := range clusters {
if cluster.Name == clusterName && cluster.Accepted {
return errors.Errorf("you have register a cluster named %s", clusterName)
}
}
hubTracker.Stop()
spokeRestConf, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return spokeConfig, nil
})
if err != nil {
return errors.Wrap(err, "fail to convert spoke-cluster kubeconfig")
}
spokeTracker := newTrackingSpinner("Building registration config for the managed cluster")
spokeTracker.FinalMSG = "Successfully prepared registration config.\n"
spokeTracker.Start()
overridingRegistrationEndpoint := ""
if !inClusterBootstrap {
ioStreams.Infof("Using the api endpoint from hub kubeconfig %q as registration entry.\n", hubConfig.Host)
overridingRegistrationEndpoint = hubConfig.Host
}
hubKubeToken, err := hubCluster.GenerateHubClusterKubeConfig(ctx, overridingRegistrationEndpoint)
if err != nil {
return errors.Wrap(err, "fail to generate the token for spoke-cluster")
}
spokeCluster, err := spoke.NewSpokeCluster(clusterName, spokeRestConf, hubKubeToken)
if err != nil {
return errors.Wrap(err, "fail to connect spoke cluster")
}
err = spokeCluster.InitSpokeClusterEnv(ctx)
if err != nil {
return errors.Wrap(err, "fail to prepare the env for spoke-cluster")
}
spokeTracker.Stop()
registrationOperatorTracker := newTrackingSpinner("Waiting for registration operators running: (`kubectl -n open-cluster-management get pod -l app=klusterlet`)")
registrationOperatorTracker.FinalMSG = "Registration operator successfully deployed.\n"
registrationOperatorTracker.Start()
if err := spokeCluster.WaitForRegistrationOperatorReady(ctx); err != nil {
return errors.Wrap(err, "fail to setup registration operator for spoke-cluster")
}
registrationOperatorTracker.Stop()
registrationAgentTracker := newTrackingSpinner("Waiting for registration agent running: (`kubectl -n open-cluster-management-agent get pod -l app=klusterlet-registration-agent`)")
registrationAgentTracker.FinalMSG = "Registration agent successfully deployed.\n"
registrationAgentTracker.Start()
if err := spokeCluster.WaitForRegistrationAgentReady(ctx); err != nil {
return errors.Wrap(err, "fail to setup registration agent for spoke-cluster")
}
registrationAgentTracker.Stop()
csrCreationTracker := newTrackingSpinner("Waiting for CSRs created (`kubectl get csr -l open-cluster-management.io/cluster-name=" + spokeCluster.Name + "`)")
csrCreationTracker.FinalMSG = "Successfully found corresponding CSR from the agent.\n"
csrCreationTracker.Start()
if err := hubCluster.WaitForCSRCreated(ctx, spokeCluster.Name); err != nil {
return errors.Wrap(err, "failed found CSR created by registration agent")
}
csrCreationTracker.Stop()
ioStreams.Infof("Approving the CSR for cluster %q.\n", spokeCluster.Name)
if err := hubCluster.ApproveCSR(ctx, spokeCluster.Name); err != nil {
return errors.Wrap(err, "failed found CSR created by registration agent")
}
ready, err := hubCluster.WaitForSpokeClusterReady(ctx, clusterName)
if err != nil || !ready {
return errors.Errorf("fail to waiting for register request")
}
if err = hubCluster.RegisterSpokeCluster(ctx, spokeCluster.Name); err != nil {
return errors.Wrap(err, "fail to approve spoke cluster")
}
return nil
}
// NewClusterRenameCommand create command to help user rename cluster
func NewClusterRenameCommand(c *common.Args) *cobra.Command {
cmd := &cobra.Command{
@@ -396,31 +209,12 @@ func NewClusterRenameCommand(c *common.Args) *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
oldClusterName := args[0]
newClusterName := args[1]
if newClusterName == multicluster.ClusterLocalName {
return fmt.Errorf("cannot use `%s` as cluster name, it is reserved as the local cluster", multicluster.ClusterLocalName)
}
client, err := c.GetClient()
k8sClient, err := c.GetClient()
if err != nil {
return err
}
clusterSecret, err := multicluster.GetMutableClusterSecret(context.Background(), client, oldClusterName)
if err != nil {
return errors.Wrapf(err, "cluster %s is not mutable now", oldClusterName)
}
if err := clustermanager.EnsureClusterNotExists(client, newClusterName); err != nil {
return errors.Wrapf(err, "cannot set cluster name to %s", newClusterName)
}
if err := client.Delete(context.Background(), clusterSecret); err != nil {
return errors.Wrapf(err, "failed to rename cluster from %s to %s", oldClusterName, newClusterName)
}
clusterSecret.ObjectMeta = metav1.ObjectMeta{
Name: newClusterName,
Namespace: multicluster.ClusterGatewaySecretNamespace,
Labels: clusterSecret.Labels,
Annotations: clusterSecret.Annotations,
}
if err := client.Create(context.Background(), clusterSecret); err != nil {
return errors.Wrapf(err, "failed to rename cluster from %s to %s", oldClusterName, newClusterName)
if err := multicluster.RenameCluster(context.Background(), k8sClient, oldClusterName, newClusterName); err != nil {
return err
}
cmd.Printf("Rename cluster %s to %s successfully.\n", oldClusterName, newClusterName)
return nil
@@ -438,68 +232,16 @@ func NewClusterDetachCommand(c *common.Args) *cobra.Command {
Args: cobra.ExactValidArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
clusterName := args[0]
if clusterName == multicluster.ClusterLocalName {
return fmt.Errorf("cannot delete `%s` cluster, it is reserved as the local cluster", multicluster.ClusterLocalName)
}
client, err := c.GetClient()
configPath, _ := cmd.Flags().GetString(FlagKubeConfigPath)
cli, err := c.GetClient()
if err != nil {
return err
}
clusters, err := clustermanager.GetRegisteredClusters(client)
err = multicluster.DetachCluster(context.Background(), cli, clusterName,
multicluster.DetachClusterManagedClusterKubeConfigPathOption(configPath))
if err != nil {
return err
}
var clusterType string
for _, cluster := range clusters {
if cluster.Name == clusterName {
clusterType = cluster.Type
}
}
if clusterType == "" {
return errors.Errorf("cluster %s is not regitsered", clusterName)
}
switch clusterType {
case string(clusterv1alpha1.CredentialTypeX509Certificate), string(clusterv1alpha1.CredentialTypeServiceAccountToken):
clusterSecret, err := multicluster.GetMutableClusterSecret(context.Background(), client, clusterName)
if err != nil {
return errors.Wrapf(err, "cluster %s is not mutable now", clusterName)
}
if err := client.Delete(context.Background(), clusterSecret); err != nil {
return errors.Wrapf(err, "failed to detach cluster %s", clusterName)
}
case "ManagedCluster":
configPath, err := cmd.Flags().GetString(FlagKubeConfigPath)
if err != nil {
return errors.Wrapf(err, "failed to get cluster management type flag")
}
if configPath == "" {
return errors.New("kubeconfig-path shouldn't be empty")
}
config, err := clientcmd.LoadFromFile(configPath)
if err != nil {
return err
}
restConfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return config, nil
})
if err != nil {
return err
}
if err = spoke.CleanSpokeClusterEnv(restConfig); err != nil {
return err
}
managedCluster := ocmclusterv1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
},
}
if err = client.Delete(context.Background(), &managedCluster); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
}
}
cmd.Printf("Detach cluster %s successfully.\n", clusterName)
return nil
},
@@ -509,6 +251,7 @@ func NewClusterDetachCommand(c *common.Args) *cobra.Command {
}
// NewClusterProbeCommand create command to help user try health probe for existing cluster
// TODO(somefive): move prob logic into cluster management
func NewClusterProbeCommand(c *common.Args) *cobra.Command {
cmd := &cobra.Command{
Use: "probe [CLUSTER_NAME]",