From d19f0f9ca602e1cab1e91dcb5d3bc1a3f8f46638 Mon Sep 17 00:00:00 2001 From: Hussein Galal Date: Mon, 21 Oct 2024 22:54:08 +0300 Subject: [PATCH] virtual-kubelet controller integration (#130) * Virtual kubelet controller integration Signed-off-by: galal-hussein * Add k3k-kubelet image to the release workflow Signed-off-by: galal-hussein * Add k3k-kubelet image to the release workflow Signed-off-by: galal-hussein * Fix build/release workflow Signed-off-by: galal-hussein * Remove pkg directory in k3k-kubelet Signed-off-by: galal-hussein * rename Type to Config Signed-off-by: galal-hussein * Move the kubelet and config outside of pkg Signed-off-by: galal-hussein * fix comments Signed-off-by: galal-hussein * Fix naming throughout the package Signed-off-by: galal-hussein * Fix comments Signed-off-by: galal-hussein * more fixes to naming Signed-off-by: galal-hussein * fixes Signed-off-by: galal-hussein * fixes Signed-off-by: galal-hussein * fixes Signed-off-by: galal-hussein * fixes Signed-off-by: galal-hussein --------- Signed-off-by: galal-hussein --- .github/workflows/release.yml | 11 +- charts/k3k/crds/k3k.io_clusters.yaml | 10 + charts/k3k/templates/deployment.yaml | 4 +- charts/k3k/values.yaml | 6 + cli/cmds/cluster/create.go | 22 +- cli/cmds/kubeconfig/kubeconfig.go | 27 +- go.mod | 2 +- {virtual-kubelet => k3k-kubelet}/README.md | 0 k3k-kubelet/config.go | 76 +++++ k3k-kubelet/kubelet.go | 273 ++++++++++++++++++ k3k-kubelet/main.go | 102 +++++++ .../pkg => k3k-kubelet}/provider/configure.go | 6 +- .../pkg => k3k-kubelet}/provider/node.go | 0 .../pkg => k3k-kubelet}/provider/provider.go | 26 +- .../pkg => k3k-kubelet}/provider/util.go | 0 main.go | 22 +- ops/build | 2 +- pkg/apis/k3k.io/v1alpha1/types.go | 4 + pkg/controller/cluster/agent/agent.go | 263 +---------------- pkg/controller/cluster/agent/service.go | 30 -- pkg/controller/cluster/agent/shared.go | 247 ++++++++++++++++ pkg/controller/cluster/agent/virtual.go | 221 ++++++++++++++ pkg/controller/cluster/cluster.go | 128 ++++---- pkg/controller/cluster/config/agent.go | 34 --- pkg/controller/cluster/pod.go | 21 +- .../cluster/server/bootstrap/bootstrap.go | 8 +- .../{config/server.go => server/config.go} | 37 ++- pkg/controller/cluster/server/ingress.go | 15 +- pkg/controller/cluster/server/server.go | 52 ++-- pkg/controller/cluster/server/service.go | 23 +- pkg/controller/clusterset/clusterset.go | 4 +- pkg/controller/clusterset/node.go | 4 +- pkg/controller/controller.go | 96 ++++++ pkg/controller/kubeconfig/kubeconfig.go | 13 +- pkg/controller/util/util.go | 86 ------ virtual-kubelet/main.go | 214 -------------- 36 files changed, 1252 insertions(+), 837 deletions(-) rename {virtual-kubelet => k3k-kubelet}/README.md (100%) create mode 100644 k3k-kubelet/config.go create mode 100644 k3k-kubelet/kubelet.go create mode 100644 k3k-kubelet/main.go rename {virtual-kubelet/pkg => k3k-kubelet}/provider/configure.go (94%) rename {virtual-kubelet/pkg => k3k-kubelet}/provider/node.go (100%) rename {virtual-kubelet/pkg => k3k-kubelet}/provider/provider.go (92%) rename {virtual-kubelet/pkg => k3k-kubelet}/provider/util.go (100%) delete mode 100644 pkg/controller/cluster/agent/service.go create mode 100644 pkg/controller/cluster/agent/shared.go create mode 100644 pkg/controller/cluster/agent/virtual.go delete mode 100644 pkg/controller/cluster/config/agent.go rename pkg/controller/cluster/{config/server.go => server/config.go} (62%) create mode 100644 pkg/controller/controller.go delete mode 100644 pkg/controller/util/util.go delete mode 100644 virtual-kubelet/main.go diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 44c8c0e..91e7bdd 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -40,7 +40,7 @@ jobs: username: ${{ env.DOCKER_USERNAME }} password: ${{ env.DOCKER_PASSWORD }} - - name: Build container image + - name: Build controller image uses: docker/build-push-action@v5 with: context: . @@ -48,6 +48,15 @@ jobs: tags: rancher/k3k:${{ github.ref_name }} file: package/Dockerfile platforms: linux/amd64 + + - name: Build Virtual Kubelet image + uses: docker/build-push-action@v5 + with: + context: . + push: true + tags: rancher/k3k:k3k-kubelet-dev + file: package/Dockerfile.kubelet + platforms: linux/amd64 \ No newline at end of file diff --git a/charts/k3k/crds/k3k.io_clusters.yaml b/charts/k3k/crds/k3k.io_clusters.yaml index 41402b8..feecf19 100644 --- a/charts/k3k/crds/k3k.io_clusters.yaml +++ b/charts/k3k/crds/k3k.io_clusters.yaml @@ -135,6 +135,15 @@ spec: description: NodeSelector is the node selector that will be applied to all server/agent pods type: object + mode: + description: Mode is the cluster provisioning mode which can be either + "virtual" or "shared". Defaults to "shared" + type: string + x-kubernetes-validations: + - message: mode is immutable + rule: self == oldSelf + - message: invalid value for mode + rule: self == "virtual" || self == "shared" persistence: description: |- Persistence contains options controlling how the etcd data of the virtual cluster is persisted. By default, no data @@ -191,6 +200,7 @@ spec: type: string required: - agents + - mode - servers - token - version diff --git a/charts/k3k/templates/deployment.yaml b/charts/k3k/templates/deployment.yaml index fa42807..9e749d9 100644 --- a/charts/k3k/templates/deployment.yaml +++ b/charts/k3k/templates/deployment.yaml @@ -19,9 +19,11 @@ spec: - image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} name: {{ .Chart.Name }} - environment: + env: - name: CLUSTER_CIDR value: {{ .Values.host.clusterCIDR }} + - name: SHARED_AGENT_IMAGE + value: "{{ .Values.sharedAgent.image.repository }}:{{ .Values.sharedAgent.image.tag }}" ports: - containerPort: 8080 name: https diff --git a/charts/k3k/values.yaml b/charts/k3k/values.yaml index 224bdf3..4fcb08a 100644 --- a/charts/k3k/values.yaml +++ b/charts/k3k/values.yaml @@ -22,3 +22,9 @@ serviceAccount: # The name of the service account to use. # If not set and create is true, a name is generated using the fullname template name: "" + +# configuration related to the shared agent mode in k3k +sharedAgent: + image: + repository: "rancher/k3k" + tag: "k3k-kubelet-dev" \ No newline at end of file diff --git a/cli/cmds/cluster/create.go b/cli/cmds/cluster/create.go index bd2b9b8..6c95374 100644 --- a/cli/cmds/cluster/create.go +++ b/cli/cmds/cluster/create.go @@ -7,20 +7,18 @@ import ( "os" "path/filepath" "strings" - "time" "github.com/rancher/k3k/cli/cmds" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + "github.com/rancher/k3k/pkg/controller" "github.com/rancher/k3k/pkg/controller/cluster" "github.com/rancher/k3k/pkg/controller/cluster/server" "github.com/rancher/k3k/pkg/controller/kubeconfig" - "github.com/rancher/k3k/pkg/controller/util" "github.com/sirupsen/logrus" "github.com/urfave/cli" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/authentication/user" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" @@ -28,15 +26,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -var ( - Scheme = runtime.NewScheme() - backoff = wait.Backoff{ - Steps: 5, - Duration: 20 * time.Second, - Factor: 2, - Jitter: 0.1, - } -) +var Scheme = runtime.NewScheme() func init() { _ = clientgoscheme.AddToScheme(Scheme) @@ -120,7 +110,7 @@ var ( func create(clx *cli.Context) error { ctx := context.Background() - if err := validateCreateFlags(clx); err != nil { + if err := validateCreateFlags(); err != nil { return err } @@ -173,13 +163,13 @@ func create(clx *cli.Context) error { logrus.Infof("Extracting Kubeconfig for [%s] cluster", name) cfg := &kubeconfig.KubeConfig{ - CN: util.AdminCommonName, + CN: controller.AdminCommonName, ORG: []string{user.SystemPrivilegedGroup}, ExpiryDate: 0, } logrus.Infof("waiting for cluster to be available..") var kubeconfig []byte - if err := retry.OnError(backoff, apierrors.IsNotFound, func() error { + if err := retry.OnError(controller.Backoff, apierrors.IsNotFound, func() error { kubeconfig, err = cfg.Extract(ctx, ctrlClient, cluster, host[0]) if err != nil { return err @@ -203,7 +193,7 @@ func create(clx *cli.Context) error { return os.WriteFile(cluster.Name+"-kubeconfig.yaml", kubeconfig, 0644) } -func validateCreateFlags(clx *cli.Context) error { +func validateCreateFlags() error { if persistenceType != server.EphermalNodesType && persistenceType != server.DynamicNodesType { return errors.New("invalid persistence type") diff --git a/cli/cmds/kubeconfig/kubeconfig.go b/cli/cmds/kubeconfig/kubeconfig.go index a78e15d..d527abb 100644 --- a/cli/cmds/kubeconfig/kubeconfig.go +++ b/cli/cmds/kubeconfig/kubeconfig.go @@ -10,14 +10,13 @@ import ( "github.com/rancher/k3k/cli/cmds" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + "github.com/rancher/k3k/pkg/controller" "github.com/rancher/k3k/pkg/controller/kubeconfig" - "github.com/rancher/k3k/pkg/controller/util" "github.com/sirupsen/logrus" "github.com/urfave/cli" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/authentication/user" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" @@ -31,19 +30,13 @@ func init() { } var ( - Scheme = runtime.NewScheme() - name string - cn string - org cli.StringSlice - altNames cli.StringSlice - expirationDays int64 - configName string - backoff = wait.Backoff{ - Steps: 5, - Duration: 20 * time.Second, - Factor: 2, - Jitter: 0.1, - } + Scheme = runtime.NewScheme() + name string + cn string + org cli.StringSlice + altNames cli.StringSlice + expirationDays int64 + configName string generateKubeconfigFlags = []cli.Flag{ cli.StringFlag{ Name: "name", @@ -59,7 +52,7 @@ var ( Name: "cn", Usage: "Common name (CN) of the generated certificates for the kubeconfig", Destination: &cn, - Value: util.AdminCommonName, + Value: controller.AdminCommonName, }, cli.StringSliceFlag{ Name: "org", @@ -141,7 +134,7 @@ func generate(clx *cli.Context) error { } logrus.Infof("waiting for cluster to be available..") var kubeconfig []byte - if err := retry.OnError(backoff, apierrors.IsNotFound, func() error { + if err := retry.OnError(controller.Backoff, apierrors.IsNotFound, func() error { kubeconfig, err = cfg.Extract(ctx, ctrlClient, &cluster, host[0]) if err != nil { return err diff --git a/go.mod b/go.mod index 5651293..b0ebaf5 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( go.etcd.io/etcd/api/v3 v3.5.14 go.etcd.io/etcd/client/v3 v3.5.14 go.uber.org/zap v1.26.0 + gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 k8s.io/apiserver v0.31.0 @@ -119,7 +120,6 @@ require ( google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.29.2 // indirect k8s.io/component-base v0.29.2 // indirect diff --git a/virtual-kubelet/README.md b/k3k-kubelet/README.md similarity index 100% rename from virtual-kubelet/README.md rename to k3k-kubelet/README.md diff --git a/k3k-kubelet/config.go b/k3k-kubelet/config.go new file mode 100644 index 0000000..7dd9a53 --- /dev/null +++ b/k3k-kubelet/config.go @@ -0,0 +1,76 @@ +package main + +import ( + "errors" + "os" + + "gopkg.in/yaml.v2" +) + +// config has all virtual-kubelet startup options +type config struct { + ClusterName string `yaml:"clusterName,omitempty"` + ClusterNamespace string `yaml:"clusterNamespace,omitempty"` + NodeName string `yaml:"nodeName,omitempty"` + Token string `yaml:"token,omitempty"` + AgentHostname string `yaml:"agentHostname,omitempty"` + HostConfigPath string `yaml:"hostConfigPath,omitempty"` + VirtualConfigPath string `yaml:"virtualConfigPath,omitempty"` + KubeletPort string `yaml:"kubeletPort,omitempty"` +} + +func (c *config) unmarshalYAML(data []byte) error { + var conf config + + if err := yaml.Unmarshal(data, &conf); err != nil { + return err + } + + if c.ClusterName == "" { + c.ClusterName = conf.ClusterName + } + if c.ClusterNamespace == "" { + c.ClusterNamespace = conf.ClusterNamespace + } + if c.HostConfigPath == "" { + c.HostConfigPath = conf.HostConfigPath + } + if c.VirtualConfigPath == "" { + c.VirtualConfigPath = conf.VirtualConfigPath + } + if c.KubeletPort == "" { + c.KubeletPort = conf.KubeletPort + } + if c.AgentHostname == "" { + c.AgentHostname = conf.AgentHostname + } + if c.NodeName == "" { + c.NodeName = conf.NodeName + } + return nil +} + +func (c *config) validate() error { + if c.ClusterName == "" { + return errors.New("cluster name is not provided") + } + if c.ClusterNamespace == "" { + return errors.New("cluster namespace is not provided") + } + if c.AgentHostname == "" { + return errors.New("agent Hostname is not provided") + } + return nil +} + +func (c *config) parse(path string) error { + if _, err := os.Stat(path); os.IsNotExist(err) { + return nil + } + + b, err := os.ReadFile(path) + if err != nil { + return err + } + return c.unmarshalYAML(b) +} diff --git a/k3k-kubelet/kubelet.go b/k3k-kubelet/kubelet.go new file mode 100644 index 0000000..6a04b3e --- /dev/null +++ b/k3k-kubelet/kubelet.go @@ -0,0 +1,273 @@ +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "net/http" + "os" + "time" + + certutil "github.com/rancher/dynamiclistener/cert" + "github.com/rancher/k3k/k3k-kubelet/provider" + "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + "github.com/rancher/k3k/pkg/controller" + "github.com/rancher/k3k/pkg/controller/cluster/server" + "github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap" + "github.com/rancher/k3k/pkg/controller/kubeconfig" + "github.com/virtual-kubelet/virtual-kubelet/log" + "github.com/virtual-kubelet/virtual-kubelet/node" + "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/client-go/kubernetes" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/util/retry" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +var scheme = runtime.NewScheme() + +func init() { + _ = clientgoscheme.AddToScheme(scheme) + _ = v1alpha1.AddToScheme(scheme) +} + +type kubelet struct { + name string + port int + hostConfig *rest.Config + hostClient ctrlruntimeclient.Client + virtClient kubernetes.Interface + node *nodeutil.Node +} + +func newKubelet(ctx context.Context, c *config) (*kubelet, error) { + hostConfig, err := clientcmd.BuildConfigFromFlags("", c.HostConfigPath) + if err != nil { + return nil, err + } + + hostClient, err := ctrlruntimeclient.New(hostConfig, ctrlruntimeclient.Options{ + Scheme: scheme, + }) + if err != nil { + return nil, err + } + + virtConfig, err := virtRestConfig(ctx, c.VirtualConfigPath, hostClient, c.ClusterName, c.ClusterNamespace) + if err != nil { + return nil, err + } + + virtClient, err := kubernetes.NewForConfig(virtConfig) + if err != nil { + return nil, err + } + return &kubelet{ + name: c.NodeName, + hostConfig: hostConfig, + hostClient: hostClient, + virtClient: virtClient, + }, nil +} + +func (k *kubelet) registerNode(ctx context.Context, srvPort, namespace, name, hostname string) error { + providerFunc := k.newProviderFunc(namespace, name, hostname) + nodeOpts := k.nodeOpts(ctx, srvPort, namespace, name, hostname) + + var err error + k.node, err = nodeutil.NewNode(k.name, providerFunc, nodeutil.WithClient(k.virtClient), nodeOpts) + if err != nil { + return fmt.Errorf("unable to start kubelet: %v", err) + } + return nil +} + +func (k *kubelet) start(ctx context.Context) { + go func() { + logger, err := zap.NewProduction() + if err != nil { + fmt.Println("unable to create logger:", err.Error()) + os.Exit(1) + } + wrapped := LogWrapper{ + *logger.Sugar(), + } + ctx = log.WithLogger(ctx, &wrapped) + if err := k.node.Run(ctx); err != nil { + fmt.Println("node errored when running:", err.Error()) + os.Exit(1) + } + }() + if err := k.node.WaitReady(context.Background(), time.Minute*1); err != nil { + fmt.Println("node was not ready within timeout of 1 minute:", err.Error()) + os.Exit(1) + } + <-k.node.Done() + if err := k.node.Err(); err != nil { + fmt.Println("node stopped with an error:", err.Error()) + os.Exit(1) + } + fmt.Println("node exited without an error") +} + +func (k *kubelet) newProviderFunc(namespace, name, hostname string) nodeutil.NewProviderFunc { + return func(pc nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) { + utilProvider, err := provider.New(*k.hostConfig, namespace, name) + if err != nil { + return nil, nil, fmt.Errorf("unable to make nodeutil provider %w", err) + } + nodeProvider := provider.Node{} + + provider.ConfigureNode(pc.Node, hostname, k.port) + return utilProvider, &nodeProvider, nil + } +} + +func (k *kubelet) nodeOpts(ctx context.Context, srvPort, namespace, name, hostname string) nodeutil.NodeOpt { + return func(c *nodeutil.NodeConfig) error { + c.HTTPListenAddr = fmt.Sprintf(":%s", srvPort) + // set up the routes + mux := http.NewServeMux() + if err := nodeutil.AttachProviderRoutes(mux)(c); err != nil { + return fmt.Errorf("unable to attach routes: %w", err) + } + c.Handler = mux + + tlsConfig, err := loadTLSConfig(ctx, k.hostClient, name, namespace, k.name, hostname) + if err != nil { + return fmt.Errorf("unable to get tls config: %w", err) + } + c.TLSConfig = tlsConfig + return nil + } +} + +func virtRestConfig(ctx context.Context, virtualConfigPath string, hostClient ctrlruntimeclient.Client, clusterName, clusterNamespace string) (*rest.Config, error) { + if virtualConfigPath != "" { + return clientcmd.BuildConfigFromFlags("", virtualConfigPath) + } + // virtual kubeconfig file is empty, trying to fetch the k3k cluster kubeconfig + var cluster v1alpha1.Cluster + if err := hostClient.Get(ctx, types.NamespacedName{Namespace: clusterNamespace, Name: clusterName}, &cluster); err != nil { + return nil, err + } + endpoint := server.ServiceName(cluster.Name) + "." + cluster.Namespace + var b *bootstrap.ControlRuntimeBootstrap + if err := retry.OnError(controller.Backoff, func(err error) bool { + return err != nil + }, func() error { + var err error + b, err = bootstrap.DecodedBootstrap(cluster.Spec.Token, endpoint) + return err + }); err != nil { + return nil, fmt.Errorf("unable to decode bootstrap: %w", err) + } + adminCert, adminKey, err := kubeconfig.CreateClientCertKey( + controller.AdminCommonName, []string{user.SystemPrivilegedGroup}, + nil, []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, time.Hour*24*time.Duration(356), + b.ClientCA.Content, + b.ClientCAKey.Content) + if err != nil { + return nil, err + } + + url := fmt.Sprintf("https://%s:%d", server.ServiceName(cluster.Name), server.ServerPort) + kubeconfigData, err := kubeconfigBytes(url, []byte(b.ServerCA.Content), adminCert, adminKey) + if err != nil { + return nil, err + } + return clientcmd.RESTConfigFromKubeConfig(kubeconfigData) +} + +func kubeconfigBytes(url string, serverCA, clientCert, clientKey []byte) ([]byte, error) { + config := clientcmdapi.NewConfig() + + cluster := clientcmdapi.NewCluster() + cluster.CertificateAuthorityData = serverCA + cluster.Server = url + + authInfo := clientcmdapi.NewAuthInfo() + authInfo.ClientCertificateData = clientCert + authInfo.ClientKeyData = clientKey + + context := clientcmdapi.NewContext() + context.AuthInfo = "default" + context.Cluster = "default" + + config.Clusters["default"] = cluster + config.AuthInfos["default"] = authInfo + config.Contexts["default"] = context + config.CurrentContext = "default" + + return clientcmd.Write(*config) +} + +func loadTLSConfig(ctx context.Context, hostClient ctrlruntimeclient.Client, clusterName, clusterNamespace, nodeName, hostname string) (*tls.Config, error) { + var ( + cluster v1alpha1.Cluster + b *bootstrap.ControlRuntimeBootstrap + ) + if err := hostClient.Get(ctx, types.NamespacedName{Name: clusterName, Namespace: clusterNamespace}, &cluster); err != nil { + return nil, err + } + endpoint := fmt.Sprintf("%s.%s", server.ServiceName(cluster.Name), cluster.Namespace) + if err := retry.OnError(controller.Backoff, func(err error) bool { + return err != nil + }, func() error { + var err error + b, err = bootstrap.DecodedBootstrap(cluster.Spec.Token, endpoint) + return err + }); err != nil { + return nil, fmt.Errorf("unable to decode bootstrap: %w", err) + } + altNames := certutil.AltNames{ + DNSNames: []string{hostname}, + } + cert, key, err := kubeconfig.CreateClientCertKey(nodeName, nil, &altNames, []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, 0, b.ServerCA.Content, b.ServerCAKey.Content) + if err != nil { + return nil, fmt.Errorf("unable to get cert and key: %w", err) + } + clientCert, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, fmt.Errorf("unable to get key pair: %w", err) + } + // create rootCA CertPool + certs, err := certutil.ParseCertsPEM([]byte(b.ServerCA.Content)) + if err != nil { + return nil, fmt.Errorf("unable to create ca certs: %w", err) + } + if len(certs) < 1 { + return nil, fmt.Errorf("ca cert is not parsed correctly") + } + pool := x509.NewCertPool() + pool.AddCert(certs[0]) + + return &tls.Config{ + RootCAs: pool, + Certificates: []tls.Certificate{clientCert}, + }, nil +} + +type LogWrapper struct { + zap.SugaredLogger +} + +func (l *LogWrapper) WithError(err error) log.Logger { + return l +} + +func (l *LogWrapper) WithField(string, interface{}) log.Logger { + return l +} + +func (l *LogWrapper) WithFields(field log.Fields) log.Logger { + return l +} diff --git a/k3k-kubelet/main.go b/k3k-kubelet/main.go new file mode 100644 index 0000000..f653344 --- /dev/null +++ b/k3k-kubelet/main.go @@ -0,0 +1,102 @@ +package main + +import ( + "context" + "fmt" + "os" + + "github.com/sirupsen/logrus" + "github.com/urfave/cli" +) + +var ( + configFile string + cfg config +) + +func main() { + app := cli.NewApp() + app.Name = "k3k-kubelet" + app.Usage = "virtual kubelet implementation k3k" + app.Flags = []cli.Flag{ + cli.StringFlag{ + Name: "cluster-name", + Usage: "Name of the k3k cluster", + Destination: &cfg.ClusterName, + EnvVar: "CLUSTER_NAME", + }, + cli.StringFlag{ + Name: "cluster-namespace", + Usage: "Namespace of the k3k cluster", + Destination: &cfg.ClusterNamespace, + EnvVar: "CLUSTER_NAMESPACE", + }, + cli.StringFlag{ + Name: "cluster-token", + Usage: "K3S token of the k3k cluster", + Destination: &cfg.Token, + EnvVar: "CLUSTER_TOKEN", + }, + cli.StringFlag{ + Name: "host-config-path", + Usage: "Path to the host kubeconfig, if empty then virtual-kubelet will use incluster config", + Destination: &cfg.HostConfigPath, + EnvVar: "HOST_KUBECONFIG", + }, + cli.StringFlag{ + Name: "virtual-config-path", + Usage: "Path to the k3k cluster kubeconfig, if empty then virtual-kubelet will create its own config from k3k cluster", + Destination: &cfg.VirtualConfigPath, + EnvVar: "CLUSTER_NAME", + }, + cli.StringFlag{ + Name: "kubelet-port", + Usage: "kubelet API port number", + Destination: &cfg.KubeletPort, + EnvVar: "SERVER_PORT", + Value: "9443", + }, + cli.StringFlag{ + Name: "agent-hostname", + Usage: "Agent Hostname used for TLS SAN for the kubelet server", + Destination: &cfg.AgentHostname, + EnvVar: "AGENT_HOSTNAME", + }, + cli.StringFlag{ + Name: "config", + Usage: "Path to k3k-kubelet config file", + Destination: &configFile, + EnvVar: "CONFIG_FILE", + Value: "/etc/rancher/k3k/config.yaml", + }, + } + app.Action = run + if err := app.Run(os.Args); err != nil { + logrus.Fatal(err) + } +} + +func run(clx *cli.Context) { + if err := cfg.parse(configFile); err != nil { + fmt.Printf("failed to parse config file %s: %v", configFile, err) + os.Exit(1) + } + + if err := cfg.validate(); err != nil { + fmt.Printf("failed to validate config: %v", err) + os.Exit(1) + } + ctx := context.Background() + k, err := newKubelet(ctx, &cfg) + if err != nil { + fmt.Printf("failed to create new virtual kubelet instance: %v", err) + os.Exit(1) + } + + if err := k.registerNode(ctx, cfg.KubeletPort, cfg.ClusterNamespace, cfg.ClusterName, cfg.AgentHostname); err != nil { + fmt.Printf("failed to register new node: %v", err) + os.Exit(1) + } + + k.start(ctx) +} diff --git a/virtual-kubelet/pkg/provider/configure.go b/k3k-kubelet/provider/configure.go similarity index 94% rename from virtual-kubelet/pkg/provider/configure.go rename to k3k-kubelet/provider/configure.go index 44db2f5..45d93a4 100644 --- a/virtual-kubelet/pkg/provider/configure.go +++ b/k3k-kubelet/provider/configure.go @@ -6,13 +6,13 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func ConfigureNode(node *v1.Node, podIP string, servicePort int) { +func ConfigureNode(node *v1.Node, hostname string, servicePort int) { node.Status.Conditions = nodeConditions() node.Status.DaemonEndpoints.KubeletEndpoint.Port = int32(servicePort) node.Status.Addresses = []v1.NodeAddress{ { - Type: v1.NodeExternalIP, - Address: podIP, + Type: v1.NodeHostName, + Address: hostname, }, } node.Status.Capacity = v1.ResourceList{ diff --git a/virtual-kubelet/pkg/provider/node.go b/k3k-kubelet/provider/node.go similarity index 100% rename from virtual-kubelet/pkg/provider/node.go rename to k3k-kubelet/provider/node.go diff --git a/virtual-kubelet/pkg/provider/provider.go b/k3k-kubelet/provider/provider.go similarity index 92% rename from virtual-kubelet/pkg/provider/provider.go rename to k3k-kubelet/provider/provider.go index f836dcc..3ee22bd 100644 --- a/virtual-kubelet/pkg/provider/provider.go +++ b/k3k-kubelet/provider/provider.go @@ -2,15 +2,13 @@ package provider import ( "context" - "crypto/sha256" - "encoding/hex" "fmt" "io" "net/http" "strconv" - "strings" dto "github.com/prometheus/client_model/go" + "github.com/rancher/k3k/pkg/controller" "github.com/virtual-kubelet/virtual-kubelet/node/api" "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1" "go.uber.org/zap" @@ -312,24 +310,6 @@ func (p *Provider) translateFrom(hostPod *corev1.Pod) *corev1.Pod { return virtualPod } -func (p *Provider) hostName(virtualNamespace string, virtualName string) string { - return safeConcatName(p.ClusterName, p.ClusterNamespace, virtualNamespace, virtualName) -} - -// safeConcatName concatenates the given strings and ensures the returned name is under 64 characters -// by cutting the string off at 57 characters and setting the last 6 with an encoded version of the concatenated string. -func safeConcatName(name ...string) string { - fullPath := strings.Join(name, "-") - if len(fullPath) < 64 { - return fullPath - } - digest := sha256.Sum256([]byte(fullPath)) - // since we cut the string in the middle, the last char may not be compatible with what is expected in k8s - // we are checking and if necessary removing the last char - c := fullPath[56] - if 'a' <= c && c <= 'z' || '0' <= c && c <= '9' { - return fullPath[0:57] + "-" + hex.EncodeToString(digest[0:])[0:5] - } - - return fullPath[0:56] + "-" + hex.EncodeToString(digest[0:])[0:6] +func (p *Provider) hostName(virtualNamespace, virtualName string) string { + return controller.SafeConcatName(p.ClusterName, p.ClusterNamespace, virtualNamespace, virtualName) } diff --git a/virtual-kubelet/pkg/provider/util.go b/k3k-kubelet/provider/util.go similarity index 100% rename from virtual-kubelet/pkg/provider/util.go rename to k3k-kubelet/provider/util.go diff --git a/main.go b/main.go index caade45..6b23ec0 100644 --- a/main.go +++ b/main.go @@ -26,10 +26,11 @@ const ( ) var ( - scheme = runtime.NewScheme() - clusterCIDR string - kubeconfig string - flags = []cli.Flag{ + scheme = runtime.NewScheme() + clusterCIDR string + sharedAgentImage string + kubeconfig string + flags = []cli.Flag{ cli.StringFlag{ Name: "kubeconfig", EnvVar: "KUBECONFIG", @@ -41,7 +42,15 @@ var ( EnvVar: "CLUSTER_CIDR", Usage: "Cluster CIDR to be added to the networkpolicy of the clustersets", Destination: &clusterCIDR, - }} + }, + cli.StringFlag{ + Name: "shared-agent-image", + EnvVar: "SHARED_AGENT_IMAGE", + Usage: "K3K Virtual Kubelet image ", + Value: "rancher/k3k:k3k-kubelet-dev", + Destination: &sharedAgentImage, + }, + } ) func init() { @@ -76,8 +85,7 @@ func run(clx *cli.Context) error { if err != nil { return fmt.Errorf("Failed to create new controller runtime manager: %v", err) } - - if err := cluster.Add(ctx, mgr); err != nil { + if err := cluster.Add(ctx, mgr, sharedAgentImage); err != nil { return fmt.Errorf("Failed to add the new cluster controller: %v", err) } diff --git a/ops/build b/ops/build index 1a9d9ca..7bbf7cc 100755 --- a/ops/build +++ b/ops/build @@ -24,7 +24,7 @@ if [ "$CROSS" = "true" ] && [ "$ARCH" = "amd64" ]; then fi # build k3k-kubelet -CGO_ENABLED=0 go build -ldflags "$LINKFLAGS $OTHER_LINKFLAGS" -o bin/k3k-kubelet ./virtual-kubelet +CGO_ENABLED=0 go build -ldflags "$LINKFLAGS $OTHER_LINKFLAGS" -o bin/k3k-kubelet ./k3k-kubelet if [ "$CROSS" = "true" ] && [ "$ARCH" = "amd64" ]; then CGO_ENABLED=0 GOOS=linux GOARCH=s390x go build -ldflags "$LINKFLAGS $OTHER_LINKFLAGS" -o bin/k3k-kubelet-s390x CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -ldflags "$LINKFLAGS $OTHER_LINKFLAGS" -o bin/k3k-kubelet-arm64 diff --git a/pkg/apis/k3k.io/v1alpha1/types.go b/pkg/apis/k3k.io/v1alpha1/types.go index 665a977..e971ca3 100644 --- a/pkg/apis/k3k.io/v1alpha1/types.go +++ b/pkg/apis/k3k.io/v1alpha1/types.go @@ -52,6 +52,10 @@ type ClusterSpec struct { TLSSANs []string `json:"tlsSANs,omitempty"` // Addons is a list of secrets containing raw YAML which will be deployed in the virtual K3k cluster on startup. Addons []Addon `json:"addons,omitempty"` + // +kubebuilder:validation:XValidation:message="mode is immutable",rule="self == oldSelf" + // +kubebuilder:validation:XValidation:message="invalid value for mode",rule="self == virtual || self == shared" + // Mode is the cluster provisioning mode which can be either "virtual" or "shared". Defaults to "shared" + Mode string `json:"mode"` // Persistence contains options controlling how the etcd data of the virtual cluster is persisted. By default, no data // persistence is guaranteed, so restart of a virtual cluster pod may result in data loss without this field. diff --git a/pkg/controller/cluster/agent/agent.go b/pkg/controller/cluster/agent/agent.go index 4b96f98..6d6ad6b 100644 --- a/pkg/controller/cluster/agent/agent.go +++ b/pkg/controller/cluster/agent/agent.go @@ -2,260 +2,27 @@ package agent import ( "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" - "github.com/rancher/k3k/pkg/controller/util" - apps "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/utils/ptr" + "github.com/rancher/k3k/pkg/controller" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" ) -const agentName = "k3k-agent" +const ( + configName = "agent-config" +) -type Agent struct { - cluster *v1alpha1.Cluster +type Agent interface { + Name() string + Config() (ctrlruntimeclient.Object, error) + Resources() []ctrlruntimeclient.Object } -func New(cluster *v1alpha1.Cluster) *Agent { - return &Agent{ - cluster: cluster, +func New(cluster *v1alpha1.Cluster, serviceIP, sharedAgentImage string) Agent { + if cluster.Spec.Mode == VirtualNodeMode { + return NewVirtualAgent(cluster, serviceIP) } + return NewSharedAgent(cluster, serviceIP, sharedAgentImage) } -func (a *Agent) Deploy() *apps.Deployment { - image := util.K3SImage(a.cluster) - - const name = "k3k-agent" - selector := metav1.LabelSelector{ - MatchLabels: map[string]string{ - "cluster": a.cluster.Name, - "type": "agent", - }, - } - return &apps.Deployment{ - TypeMeta: metav1.TypeMeta{ - Kind: "Deployment", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: a.cluster.Name + "-" + name, - Namespace: util.ClusterNamespace(a.cluster), - Labels: selector.MatchLabels, - }, - Spec: apps.DeploymentSpec{ - Replicas: a.cluster.Spec.Agents, - Selector: &selector, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: selector.MatchLabels, - }, - Spec: a.podSpec(image, name, a.cluster.Spec.AgentArgs, false, &selector), - }, - }, - } -} - -func (a *Agent) StatefulAgent(cluster *v1alpha1.Cluster) *apps.StatefulSet { - image := util.K3SImage(cluster) - - selector := metav1.LabelSelector{ - MatchLabels: map[string]string{ - "cluster": cluster.Name, - "type": "agent", - }, - } - return &apps.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "Statefulset", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: cluster.Name + "-" + agentName, - Namespace: util.ClusterNamespace(cluster), - Labels: selector.MatchLabels, - }, - Spec: apps.StatefulSetSpec{ - ServiceName: cluster.Name + "-" + agentName + "-headless", - Replicas: cluster.Spec.Agents, - Selector: &selector, - VolumeClaimTemplates: []v1.PersistentVolumeClaim{ - { - TypeMeta: metav1.TypeMeta{ - Kind: "PersistentVolumeClaim", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "varlibrancherk3s", - Namespace: util.ClusterNamespace(cluster), - }, - Spec: v1.PersistentVolumeClaimSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, - StorageClassName: &cluster.Status.Persistence.StorageClassName, - Resources: v1.VolumeResourceRequirements{ - Requests: v1.ResourceList{ - "storage": resource.MustParse(cluster.Status.Persistence.StorageRequestSize), - }, - }, - }, - }, - { - TypeMeta: metav1.TypeMeta{ - Kind: "PersistentVolumeClaim", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "varlibkubelet", - Namespace: util.ClusterNamespace(cluster), - }, - Spec: v1.PersistentVolumeClaimSpec{ - Resources: v1.VolumeResourceRequirements{ - Requests: v1.ResourceList{ - "storage": resource.MustParse(cluster.Status.Persistence.StorageRequestSize), - }, - }, - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, - StorageClassName: &cluster.Status.Persistence.StorageClassName, - }, - }, - }, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: selector.MatchLabels, - }, - Spec: a.podSpec(image, agentName, cluster.Spec.AgentArgs, true, &selector), - }, - }, - } -} - -func (a *Agent) podSpec(image, name string, args []string, statefulSet bool, affinitySelector *metav1.LabelSelector) v1.PodSpec { - var limit v1.ResourceList - if a.cluster.Spec.Limit != nil && a.cluster.Spec.Limit.ServerLimit != nil { - limit = a.cluster.Spec.Limit.ServerLimit - } - args = append([]string{"agent", "--config", "/opt/rancher/k3s/config.yaml"}, args...) - podSpec := v1.PodSpec{ - NodeSelector: a.cluster.Spec.NodeSelector, - Affinity: &v1.Affinity{ - PodAntiAffinity: &v1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ - { - LabelSelector: affinitySelector, - TopologyKey: "kubernetes.io/hostname", - }, - }, - }, - }, - Volumes: []v1.Volume{ - { - Name: "config", - VolumeSource: v1.VolumeSource{ - Secret: &v1.SecretVolumeSource{ - SecretName: util.AgentConfigName(a.cluster), - Items: []v1.KeyToPath{ - { - Key: "config.yaml", - Path: "config.yaml", - }, - }, - }, - }, - }, - { - Name: "run", - VolumeSource: v1.VolumeSource{ - EmptyDir: &v1.EmptyDirVolumeSource{}, - }, - }, - { - Name: "varrun", - VolumeSource: v1.VolumeSource{ - EmptyDir: &v1.EmptyDirVolumeSource{}, - }, - }, - { - Name: "varlibcni", - VolumeSource: v1.VolumeSource{ - EmptyDir: &v1.EmptyDirVolumeSource{}, - }, - }, - { - Name: "varlog", - VolumeSource: v1.VolumeSource{ - EmptyDir: &v1.EmptyDirVolumeSource{}, - }, - }, - }, - Containers: []v1.Container{ - { - Name: name, - Image: image, - SecurityContext: &v1.SecurityContext{ - Privileged: ptr.To(true), - }, - Args: args, - Command: []string{ - "/bin/k3s", - }, - Resources: v1.ResourceRequirements{ - Limits: limit, - }, - VolumeMounts: []v1.VolumeMount{ - { - Name: "config", - MountPath: "/opt/rancher/k3s/", - ReadOnly: false, - }, - { - Name: "run", - MountPath: "/run", - ReadOnly: false, - }, - { - Name: "varrun", - MountPath: "/var/run", - ReadOnly: false, - }, - { - Name: "varlibcni", - MountPath: "/var/lib/cni", - ReadOnly: false, - }, - { - Name: "varlibkubelet", - MountPath: "/var/lib/kubelet", - ReadOnly: false, - }, - { - Name: "varlibrancherk3s", - MountPath: "/var/lib/rancher/k3s", - ReadOnly: false, - }, - { - Name: "varlog", - MountPath: "/var/log", - ReadOnly: false, - }, - }, - }, - }, - } - - if !statefulSet { - podSpec.Volumes = append(podSpec.Volumes, v1.Volume{ - Name: "varlibkubelet", - VolumeSource: v1.VolumeSource{ - EmptyDir: &v1.EmptyDirVolumeSource{}, - }, - }, v1.Volume{ - - Name: "varlibrancherk3s", - VolumeSource: v1.VolumeSource{ - EmptyDir: &v1.EmptyDirVolumeSource{}, - }, - }, - ) - } - - return podSpec +func configSecretName(clusterName string) string { + return controller.SafeConcatNameWithPrefix(clusterName, configName) } diff --git a/pkg/controller/cluster/agent/service.go b/pkg/controller/cluster/agent/service.go deleted file mode 100644 index b4a1d8c..0000000 --- a/pkg/controller/cluster/agent/service.go +++ /dev/null @@ -1,30 +0,0 @@ -package agent - -import ( - "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" - "github.com/rancher/k3k/pkg/controller/util" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func (a *Agent) StatefulAgentService(cluster *v1alpha1.Cluster) *v1.Service { - return &v1.Service{ - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: cluster.Name + "-" + agentName + "-headless", - Namespace: util.ClusterNamespace(cluster), - }, - Spec: v1.ServiceSpec{ - Type: v1.ServiceTypeClusterIP, - ClusterIP: v1.ClusterIPNone, - Selector: map[string]string{ - "cluster": cluster.Name, - "role": "agent", - }, - Ports: []v1.ServicePort{}, - }, - } -} diff --git a/pkg/controller/cluster/agent/shared.go b/pkg/controller/cluster/agent/shared.go new file mode 100644 index 0000000..22393c4 --- /dev/null +++ b/pkg/controller/cluster/agent/shared.go @@ -0,0 +1,247 @@ +package agent + +import ( + "fmt" + + "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + "github.com/rancher/k3k/pkg/controller" + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + sharedKubeletConfigPath = "/opt/rancher/k3k/config.yaml" + sharedNodeAgentName = "kubelet" + SharedNodeMode = "shared" +) + +type SharedAgent struct { + cluster *v1alpha1.Cluster + serviceIP string + sharedAgentImage string +} + +func NewSharedAgent(cluster *v1alpha1.Cluster, serviceIP, sharedAgentImage string) Agent { + return &SharedAgent{ + cluster: cluster, + serviceIP: serviceIP, + sharedAgentImage: sharedAgentImage, + } +} + +func (s *SharedAgent) Config() (ctrlruntimeclient.Object, error) { + config := sharedAgentData(s.cluster) + + return &v1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: configSecretName(s.cluster.Name), + Namespace: s.cluster.Namespace, + }, + Data: map[string][]byte{ + "config.yaml": []byte(config), + }, + }, nil +} + +func sharedAgentData(cluster *v1alpha1.Cluster) string { + nodeName := cluster.Name + "-" + "k3k-kubelet" + return fmt.Sprintf(`clusterName: %s +clusterNamespace: %s +nodeName: %s +agentHostname: %s +token: %s`, + cluster.Name, cluster.Namespace, nodeName, nodeName, cluster.Spec.Token) +} + +func (s *SharedAgent) Resources() []ctrlruntimeclient.Object { + return []ctrlruntimeclient.Object{s.serviceAccount(), s.role(), s.roleBinding(), s.service(), s.deployment()} +} + +func (s *SharedAgent) deployment() *apps.Deployment { + selector := metav1.LabelSelector{ + MatchLabels: map[string]string{ + "cluster": s.cluster.Name, + "type": "agent", + "mode": "shared", + }, + } + name := s.Name() + return &apps.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: s.cluster.Namespace, + Labels: selector.MatchLabels, + }, + Spec: apps.DeploymentSpec{ + Selector: &selector, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: selector.MatchLabels, + }, + Spec: s.podSpec(s.sharedAgentImage, name, &selector), + }, + }, + } +} + +func (s *SharedAgent) podSpec(image, name string, affinitySelector *metav1.LabelSelector) v1.PodSpec { + args := []string{"--config", sharedKubeletConfigPath} + var limit v1.ResourceList + return v1.PodSpec{ + Affinity: &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: affinitySelector, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + ServiceAccountName: s.Name(), + Volumes: []v1.Volume{ + { + Name: "config", + VolumeSource: v1.VolumeSource{ + Secret: &v1.SecretVolumeSource{ + SecretName: configSecretName(s.cluster.Name), + Items: []v1.KeyToPath{ + { + Key: "config.yaml", + Path: "config.yaml", + }, + }, + }, + }, + }, + }, + Containers: []v1.Container{ + { + Name: name, + Image: image, + ImagePullPolicy: v1.PullAlways, + Resources: v1.ResourceRequirements{ + Limits: limit, + }, + Args: args, + VolumeMounts: []v1.VolumeMount{ + { + Name: "config", + MountPath: "/opt/rancher/k3k/", + ReadOnly: false, + }, + }, + }, + }} +} + +func (s *SharedAgent) service() *v1.Service { + return &v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: s.Name(), + Namespace: s.cluster.Namespace, + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeClusterIP, + Selector: map[string]string{ + "cluster": s.cluster.Name, + "type": "agent", + "mode": "shared", + }, + Ports: []v1.ServicePort{ + { + Name: "k3s-kubelet-port", + Protocol: v1.ProtocolTCP, + Port: 9443, + }, + }, + }, + } +} + +func (s *SharedAgent) serviceAccount() *v1.ServiceAccount { + return &v1.ServiceAccount{ + TypeMeta: metav1.TypeMeta{ + Kind: "ServiceAccount", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: s.Name(), + Namespace: s.cluster.Namespace, + }, + } +} + +func (s *SharedAgent) role() *rbacv1.Role { + return &rbacv1.Role{ + TypeMeta: metav1.TypeMeta{ + Kind: "Role", + APIVersion: "rbac.authorization.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: s.Name(), + Namespace: s.cluster.Namespace, + }, + Rules: []rbacv1.PolicyRule{ + { + Verbs: []string{"*"}, + APIGroups: []string{""}, + Resources: []string{"pods"}, + }, + { + Verbs: []string{"get", "watch", "list"}, + APIGroups: []string{""}, + Resources: []string{"secrets", "services"}, + }, + { + Verbs: []string{"get", "watch", "list"}, + APIGroups: []string{"k3k.io"}, + Resources: []string{"clusters"}, + }, + }, + } +} + +func (s *SharedAgent) roleBinding() *rbacv1.RoleBinding { + return &rbacv1.RoleBinding{ + TypeMeta: metav1.TypeMeta{ + Kind: "RoleBinding", + APIVersion: "rbac.authorization.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: s.Name(), + Namespace: s.cluster.Namespace, + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: s.Name(), + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: s.Name(), + Namespace: s.cluster.Namespace, + }, + }, + } +} + +func (s *SharedAgent) Name() string { + return controller.SafeConcatNameWithPrefix(s.cluster.Name, sharedNodeAgentName) +} diff --git a/pkg/controller/cluster/agent/virtual.go b/pkg/controller/cluster/agent/virtual.go new file mode 100644 index 0000000..65fb3eb --- /dev/null +++ b/pkg/controller/cluster/agent/virtual.go @@ -0,0 +1,221 @@ +package agent + +import ( + "fmt" + + "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + "github.com/rancher/k3k/pkg/controller" + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + VirtualNodeMode = "virtual" + virtualNodeAgentName = "kubelet" +) + +type VirtualAgent struct { + cluster *v1alpha1.Cluster + serviceIP string +} + +func NewVirtualAgent(cluster *v1alpha1.Cluster, serviceIP string) Agent { + return &VirtualAgent{ + cluster: cluster, + serviceIP: cluster.Spec.Mode, + } +} + +func (v *VirtualAgent) Config() (ctrlruntimeclient.Object, error) { + config := virtualAgentData(v.serviceIP, v.cluster.Spec.Token) + + return &v1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: configSecretName(v.cluster.Name), + Namespace: v.cluster.Namespace, + }, + Data: map[string][]byte{ + "config.yaml": []byte(config), + }, + }, nil +} + +func (v *VirtualAgent) Resources() []ctrlruntimeclient.Object { + return []ctrlruntimeclient.Object{v.deployment()} +} + +func virtualAgentData(serviceIP, token string) string { + return fmt.Sprintf(`server: https://%s:6443 +token: %s +with-node-id: true`, serviceIP, token) +} + +func (v *VirtualAgent) deployment() *apps.Deployment { + image := controller.K3SImage(v.cluster) + + const name = "k3k-agent" + selector := metav1.LabelSelector{ + MatchLabels: map[string]string{ + "cluster": v.cluster.Name, + "type": "agent", + "mode": "virtual", + }, + } + return &apps.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: v.Name(), + Namespace: v.cluster.Namespace, + Labels: selector.MatchLabels, + }, + Spec: apps.DeploymentSpec{ + Replicas: v.cluster.Spec.Agents, + Selector: &selector, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: selector.MatchLabels, + }, + Spec: v.podSpec(image, name, v.cluster.Spec.AgentArgs, &selector), + }, + }, + } +} + +func (v *VirtualAgent) podSpec(image, name string, args []string, affinitySelector *metav1.LabelSelector) v1.PodSpec { + var limit v1.ResourceList + args = append([]string{"agent", "--config", "/opt/rancher/k3s/config.yaml"}, args...) + podSpec := v1.PodSpec{ + Affinity: &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: affinitySelector, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + Volumes: []v1.Volume{ + { + Name: "config", + VolumeSource: v1.VolumeSource{ + Secret: &v1.SecretVolumeSource{ + SecretName: configSecretName(v.cluster.Name), + Items: []v1.KeyToPath{ + { + Key: "config.yaml", + Path: "config.yaml", + }, + }, + }, + }, + }, + { + Name: "run", + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "varrun", + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "varlibcni", + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "varlog", + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "varlibkubelet", + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + }, + { + Name: "varlibrancherk3s", + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + }, + }, + Containers: []v1.Container{ + { + Name: name, + Image: image, + ImagePullPolicy: v1.PullAlways, + SecurityContext: &v1.SecurityContext{ + Privileged: ptr.To(true), + }, + Args: args, + Command: []string{ + "/bin/k3s", + }, + Resources: v1.ResourceRequirements{ + Limits: limit, + }, + VolumeMounts: []v1.VolumeMount{ + { + Name: "config", + MountPath: "/opt/rancher/k3s/", + ReadOnly: false, + }, + { + Name: "run", + MountPath: "/run", + ReadOnly: false, + }, + { + Name: "varrun", + MountPath: "/var/run", + ReadOnly: false, + }, + { + Name: "varlibcni", + MountPath: "/var/lib/cni", + ReadOnly: false, + }, + { + Name: "varlibkubelet", + MountPath: "/var/lib/kubelet", + ReadOnly: false, + }, + { + Name: "varlibrancherk3s", + MountPath: "/var/lib/rancher/k3s", + ReadOnly: false, + }, + { + Name: "varlog", + MountPath: "/var/log", + ReadOnly: false, + }, + }, + }, + }, + } + + return podSpec +} + +func (v *VirtualAgent) Name() string { + return controller.SafeConcatNameWithPrefix(v.cluster.Name, virtualNodeAgentName) +} diff --git a/pkg/controller/cluster/cluster.go b/pkg/controller/cluster/cluster.go index 6c4b03f..f1657c6 100644 --- a/pkg/controller/cluster/cluster.go +++ b/pkg/controller/cluster/cluster.go @@ -8,11 +8,10 @@ import ( "time" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + k3kcontroller "github.com/rancher/k3k/pkg/controller" "github.com/rancher/k3k/pkg/controller/cluster/agent" - "github.com/rancher/k3k/pkg/controller/cluster/config" "github.com/rancher/k3k/pkg/controller/cluster/server" "github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap" - "github.com/rancher/k3k/pkg/controller/util" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,6 +27,7 @@ import ( ) const ( + namePrefix = "k3k" clusterController = "k3k-cluster-controller" clusterFinalizerName = "cluster.k3k.io/finalizer" etcdPodFinalizerName = "etcdpod.k3k.io/finalizer" @@ -42,16 +42,18 @@ const ( ) type ClusterReconciler struct { - Client ctrlruntimeclient.Client - Scheme *runtime.Scheme + Client ctrlruntimeclient.Client + Scheme *runtime.Scheme + SharedAgentImage string } // Add adds a new controller to the manager -func Add(ctx context.Context, mgr manager.Manager) error { +func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage string) error { // initialize a new Reconciler reconciler := ClusterReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + SharedAgentImage: sharedAgentImage, } return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.Cluster{}). @@ -76,20 +78,20 @@ func (c *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request if !controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) { controllerutil.AddFinalizer(&cluster, clusterFinalizerName) if err := c.Client.Update(ctx, &cluster); err != nil { - return reconcile.Result{}, util.LogAndReturnErr("failed to add cluster finalizer", err) + return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to add cluster finalizer", err) } } klog.Infof("enqueue cluster [%s]", cluster.Name) if err := c.createCluster(ctx, &cluster); err != nil { - return reconcile.Result{}, util.LogAndReturnErr("failed to create cluster", err) + return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to create cluster", err) } return reconcile.Result{}, nil } // remove finalizer from the server pods and update them. matchingLabels := ctrlruntimeclient.MatchingLabels(map[string]string{"role": "server"}) - listOpts := &ctrlruntimeclient.ListOptions{Namespace: util.ClusterNamespace(&cluster)} + listOpts := &ctrlruntimeclient.ListOptions{Namespace: cluster.Namespace} matchingLabels.ApplyToList(listOpts) if err := c.Client.List(ctx, &podList, listOpts); err != nil { @@ -99,7 +101,7 @@ func (c *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request if controllerutil.ContainsFinalizer(&pod, etcdPodFinalizerName) { controllerutil.RemoveFinalizer(&pod, etcdPodFinalizerName) if err := c.Client.Update(ctx, &pod); err != nil { - return reconcile.Result{}, util.LogAndReturnErr("failed to remove etcd finalizer", err) + return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to remove etcd finalizer", err) } } } @@ -108,7 +110,7 @@ func (c *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request // remove finalizer from the cluster and update it. controllerutil.RemoveFinalizer(&cluster, clusterFinalizerName) if err := c.Client.Update(ctx, &cluster); err != nil { - return reconcile.Result{}, util.LogAndReturnErr("failed to remove cluster finalizer", err) + return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to remove cluster finalizer", err) } } klog.Infof("deleting cluster [%s]", cluster.Name) @@ -131,7 +133,7 @@ func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1 } } if err := c.Client.Update(ctx, cluster); err != nil { - return util.LogAndReturnErr("failed to update cluster with persistence type", err) + return k3kcontroller.LogAndReturnErr("failed to update cluster with persistence type", err) } cluster.Status.ClusterCIDR = cluster.Spec.ClusterCIDR @@ -147,32 +149,32 @@ func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1 klog.Infof("creating cluster service") serviceIP, err := c.createClusterService(ctx, cluster, s) if err != nil { - return util.LogAndReturnErr("failed to create cluster service", err) + return k3kcontroller.LogAndReturnErr("failed to create cluster service", err) } - if err := c.createClusterConfigs(ctx, cluster, serviceIP); err != nil { - return util.LogAndReturnErr("failed to create cluster configs", err) + if err := c.createClusterConfigs(ctx, cluster, s, serviceIP); err != nil { + return k3kcontroller.LogAndReturnErr("failed to create cluster configs", err) } // creating statefulsets in case the user chose a persistence type other than ephermal if err := c.server(ctx, cluster, s); err != nil { - return util.LogAndReturnErr("failed to create servers", err) + return k3kcontroller.LogAndReturnErr("failed to create servers", err) } - if err := c.agent(ctx, cluster); err != nil { - return util.LogAndReturnErr("failed to create agents", err) + if err := c.agent(ctx, cluster, serviceIP); err != nil { + return k3kcontroller.LogAndReturnErr("failed to create agents", err) } if cluster.Spec.Expose != nil { if cluster.Spec.Expose.Ingress != nil { serverIngress, err := s.Ingress(ctx, c.Client) if err != nil { - return util.LogAndReturnErr("failed to create ingress object", err) + return k3kcontroller.LogAndReturnErr("failed to create ingress object", err) } if err := c.Client.Create(ctx, serverIngress); err != nil { if !apierrors.IsAlreadyExists(err) { - return util.LogAndReturnErr("failed to create server ingress", err) + return k3kcontroller.LogAndReturnErr("failed to create server ingress", err) } } } @@ -180,21 +182,21 @@ func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1 bootstrapSecret, err := bootstrap.Generate(ctx, cluster, serviceIP) if err != nil { - return util.LogAndReturnErr("failed to generate new kubeconfig", err) + return k3kcontroller.LogAndReturnErr("failed to generate new kubeconfig", err) } if err := c.Client.Create(ctx, bootstrapSecret); err != nil { if !apierrors.IsAlreadyExists(err) { - return util.LogAndReturnErr("failed to create kubeconfig secret", err) + return k3kcontroller.LogAndReturnErr("failed to create kubeconfig secret", err) } } return c.Client.Update(ctx, cluster) } -func (c *ClusterReconciler) createClusterConfigs(ctx context.Context, cluster *v1alpha1.Cluster, serviceIP string) error { +func (c *ClusterReconciler) createClusterConfigs(ctx context.Context, cluster *v1alpha1.Cluster, server *server.Server, serviceIP string) error { // create init node config - initServerConfig, err := config.Server(cluster, true, serviceIP) + initServerConfig, err := server.Config(true, serviceIP) if err != nil { return err } @@ -210,7 +212,7 @@ func (c *ClusterReconciler) createClusterConfigs(ctx context.Context, cluster *v } // create servers configuration - serverConfig, err := config.Server(cluster, false, serviceIP) + serverConfig, err := server.Config(false, serviceIP) if err != nil { return err } @@ -223,23 +225,12 @@ func (c *ClusterReconciler) createClusterConfigs(ctx context.Context, cluster *v } } - // create agents configuration - agentsConfig := agentConfig(cluster, serviceIP) - if err := controllerutil.SetControllerReference(cluster, &agentsConfig, c.Scheme); err != nil { - return err - } - if err := c.Client.Create(ctx, &agentsConfig); err != nil { - if !apierrors.IsAlreadyExists(err) { - return err - } - } - return nil } -func (c *ClusterReconciler) createClusterService(ctx context.Context, cluster *v1alpha1.Cluster, server *server.Server) (string, error) { +func (c *ClusterReconciler) createClusterService(ctx context.Context, cluster *v1alpha1.Cluster, s *server.Server) (string, error) { // create cluster service - clusterService := server.Service(cluster) + clusterService := s.Service(cluster) if err := controllerutil.SetControllerReference(cluster, clusterService, c.Scheme); err != nil { return "", err @@ -253,8 +244,8 @@ func (c *ClusterReconciler) createClusterService(ctx context.Context, cluster *v var service v1.Service objKey := ctrlruntimeclient.ObjectKey{ - Namespace: util.ClusterNamespace(cluster), - Name: util.ServerSvcName(cluster), + Namespace: cluster.Namespace, + Name: server.ServiceName(cluster.Name), } if err := c.Client.Get(ctx, objKey, &service); err != nil { return "", err @@ -265,7 +256,7 @@ func (c *ClusterReconciler) createClusterService(ctx context.Context, cluster *v func (c *ClusterReconciler) server(ctx context.Context, cluster *v1alpha1.Cluster, server *server.Server) error { // create headless service for the statefulset - serverStatefulService := server.StatefulServerService(cluster) + serverStatefulService := server.StatefulServerService() if err := controllerutil.SetControllerReference(cluster, serverStatefulService, c.Scheme); err != nil { return err } @@ -274,7 +265,7 @@ func (c *ClusterReconciler) server(ctx context.Context, cluster *v1alpha1.Cluste return err } } - ServerStatefulSet, err := server.StatefulServer(ctx, cluster) + ServerStatefulSet, err := server.StatefulServer(ctx) if err != nil { return err } @@ -290,41 +281,16 @@ func (c *ClusterReconciler) server(ctx context.Context, cluster *v1alpha1.Cluste return nil } -func (c *ClusterReconciler) agent(ctx context.Context, cluster *v1alpha1.Cluster) error { - agent := agent.New(cluster) - - agentsDeployment := agent.Deploy() - if err := controllerutil.SetControllerReference(cluster, agentsDeployment, c.Scheme); err != nil { +func (c *ClusterReconciler) agent(ctx context.Context, cluster *v1alpha1.Cluster, serviceIP string) error { + agent := agent.New(cluster, serviceIP, c.SharedAgentImage) + agentsConfig, err := agent.Config() + if err != nil { return err } + agentResources := agent.Resources() + agentResources = append(agentResources, agentsConfig) - if err := c.ensure(ctx, agentsDeployment, false); err != nil { - return err - } - return nil -} - -func agentConfig(cluster *v1alpha1.Cluster, serviceIP string) v1.Secret { - config := agentData(serviceIP, cluster.Spec.Token) - - return v1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: "Secret", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: util.AgentConfigName(cluster), - Namespace: util.ClusterNamespace(cluster), - }, - Data: map[string][]byte{ - "config.yaml": []byte(config), - }, - } -} - -func agentData(serviceIP, token string) string { - return fmt.Sprintf(`server: https://%s:6443 -token: %s`, serviceIP, token) + return c.ensureAll(ctx, cluster, agentResources) } func (c *ClusterReconciler) validate(cluster *v1alpha1.Cluster) error { @@ -334,6 +300,18 @@ func (c *ClusterReconciler) validate(cluster *v1alpha1.Cluster) error { return nil } +func (c *ClusterReconciler) ensureAll(ctx context.Context, cluster *v1alpha1.Cluster, objs []ctrlruntimeclient.Object) error { + for _, obj := range objs { + if err := controllerutil.SetControllerReference(cluster, obj, c.Scheme); err != nil { + return err + } + if err := c.ensure(ctx, obj, false); err != nil { + return err + } + } + return nil +} + func (c *ClusterReconciler) ensure(ctx context.Context, obj ctrlruntimeclient.Object, requiresRecreate bool) error { exists := true existingObject := obj.DeepCopyObject().(ctrlruntimeclient.Object) diff --git a/pkg/controller/cluster/config/agent.go b/pkg/controller/cluster/config/agent.go deleted file mode 100644 index 4f139de..0000000 --- a/pkg/controller/cluster/config/agent.go +++ /dev/null @@ -1,34 +0,0 @@ -package config - -import ( - "fmt" - - "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" - "github.com/rancher/k3k/pkg/controller/util" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func Agent(cluster *v1alpha1.Cluster, serviceIP string) v1.Secret { - config := agentData(serviceIP, cluster.Spec.Token) - - return v1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: "Secret", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: util.AgentConfigName(cluster), - Namespace: util.ClusterNamespace(cluster), - }, - Data: map[string][]byte{ - "config.yaml": []byte(config), - }, - } -} - -func agentData(serviceIP, token string) string { - return fmt.Sprintf(`server: https://%s:6443 -token: %s -with-node-id: true`, serviceIP, token) -} diff --git a/pkg/controller/cluster/pod.go b/pkg/controller/cluster/pod.go index 77c9c7c..f087869 100644 --- a/pkg/controller/cluster/pod.go +++ b/pkg/controller/cluster/pod.go @@ -11,9 +11,10 @@ import ( certutil "github.com/rancher/dynamiclistener/cert" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + k3kcontroller "github.com/rancher/k3k/pkg/controller" + "github.com/rancher/k3k/pkg/controller/cluster/server" "github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap" "github.com/rancher/k3k/pkg/controller/kubeconfig" - "github.com/rancher/k3k/pkg/controller/util" "github.com/sirupsen/logrus" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" @@ -60,11 +61,13 @@ func AddPodController(ctx context.Context, mgr manager.Manager) error { } func (p *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - s := strings.Split(req.Namespace, "-") - if len(s) <= 1 { - return reconcile.Result{}, util.LogAndReturnErr("failed to get cluster namespace", nil) + s := strings.Split(req.Name, "-") + if len(s) < 1 { + return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to get cluster namespace", nil) + } + if s[0] != "k3k" { + return reconcile.Result{}, nil } - clusterName := s[1] var cluster v1alpha1.Cluster if err := p.Client.Get(ctx, types.NamespacedName{Name: clusterName}, &cluster); err != nil { @@ -83,7 +86,7 @@ func (p *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r for _, pod := range podList.Items { klog.Infof("Handle etcd server pod [%s/%s]", pod.Namespace, pod.Name) if err := p.handleServerPod(ctx, cluster, &pod); err != nil { - return reconcile.Result{}, util.LogAndReturnErr("failed to handle etcd pod", err) + return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to handle etcd pod", err) } } return reconcile.Result{}, nil @@ -116,7 +119,7 @@ func (p *PodReconciler) handleServerPod(ctx context.Context, cluster v1alpha1.Cl // remove server from etcd client, err := clientv3.New(clientv3.Config{ Endpoints: []string{ - fmt.Sprintf("https://%s.%s:2379", util.ServerSvcName(&cluster), pod.Namespace), + fmt.Sprintf("https://%s.%s:2379", server.ServiceName(cluster.Name), pod.Namespace), }, TLS: tlsConfig, }) @@ -146,9 +149,9 @@ func (p *PodReconciler) handleServerPod(ctx context.Context, cluster v1alpha1.Cl func (p *PodReconciler) getETCDTLS(cluster *v1alpha1.Cluster) (*tls.Config, error) { klog.Infof("generating etcd TLS client certificate for cluster [%s]", cluster.Name) token := cluster.Spec.Token - endpoint := fmt.Sprintf("%s.%s", util.ServerSvcName(cluster), util.ClusterNamespace(cluster)) + endpoint := fmt.Sprintf("%s.%s", server.ServiceName(cluster.Name), cluster.Namespace) var b *bootstrap.ControlRuntimeBootstrap - if err := retry.OnError(retry.DefaultBackoff, func(err error) bool { + if err := retry.OnError(k3kcontroller.Backoff, func(err error) bool { return true }, func() error { var err error diff --git a/pkg/controller/cluster/server/bootstrap/bootstrap.go b/pkg/controller/cluster/server/bootstrap/bootstrap.go index a8372f4..9e94dc0 100644 --- a/pkg/controller/cluster/server/bootstrap/bootstrap.go +++ b/pkg/controller/cluster/server/bootstrap/bootstrap.go @@ -9,7 +9,7 @@ import ( "time" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" - "github.com/rancher/k3k/pkg/controller/util" + "github.com/rancher/k3k/pkg/controller" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/retry" @@ -36,7 +36,7 @@ func Generate(ctx context.Context, cluster *v1alpha1.Cluster, ip string) (*v1.Se token := cluster.Spec.Token var bootstrap *ControlRuntimeBootstrap - if err := retry.OnError(retry.DefaultBackoff, func(err error) bool { + if err := retry.OnError(controller.Backoff, func(err error) bool { return true }, func() error { var err error @@ -60,8 +60,8 @@ func Generate(ctx context.Context, cluster *v1alpha1.Cluster, ip string) (*v1.Se APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: cluster.Name + "-bootstrap", - Namespace: util.ClusterNamespace(cluster), + Name: controller.SafeConcatNameWithPrefix(cluster.Name, "bootstrap"), + Namespace: cluster.Namespace, OwnerReferences: []metav1.OwnerReference{ { APIVersion: cluster.APIVersion, diff --git a/pkg/controller/cluster/config/server.go b/pkg/controller/cluster/server/config.go similarity index 62% rename from pkg/controller/cluster/config/server.go rename to pkg/controller/cluster/server/config.go index 18993b8..b5203c4 100644 --- a/pkg/controller/cluster/config/server.go +++ b/pkg/controller/cluster/server/config.go @@ -1,31 +1,26 @@ -package config +package server import ( "fmt" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" - "github.com/rancher/k3k/pkg/controller/util" + "github.com/rancher/k3k/pkg/controller" + "github.com/rancher/k3k/pkg/controller/cluster/agent" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// Server returns the secret for the server's config. Note that this doesn't set the ownerRef on the secret -// to tie it back to the cluster. -func Server(cluster *v1alpha1.Cluster, init bool, serviceIP string) (*v1.Secret, error) { - name := util.ServerConfigName(cluster) - if init { - name = util.ServerInitConfigName(cluster) - } - - cluster.Status.TLSSANs = append(cluster.Spec.TLSSANs, +func (s *Server) Config(init bool, serviceIP string) (*v1.Secret, error) { + name := configSecretName(s.cluster.Name, init) + s.cluster.Status.TLSSANs = append(s.cluster.Spec.TLSSANs, serviceIP, - util.ServerSvcName(cluster), - fmt.Sprintf("%s.%s", util.ServerSvcName(cluster), util.ClusterNamespace(cluster)), + ServiceName(s.cluster.Name), + fmt.Sprintf("%s.%s", ServiceName(s.cluster.Name), s.cluster.Namespace), ) - config := serverConfigData(serviceIP, cluster) + config := serverConfigData(serviceIP, s.cluster) if init { - config = initConfigData(cluster) + config = initConfigData(s.cluster) } return &v1.Secret{ TypeMeta: metav1.TypeMeta{ @@ -34,7 +29,7 @@ func Server(cluster *v1alpha1.Cluster, init bool, serviceIP string) (*v1.Secret, }, ObjectMeta: metav1.ObjectMeta{ Name: name, - Namespace: util.ClusterNamespace(cluster), + Namespace: s.cluster.Namespace, }, Data: map[string][]byte{ "config.yaml": []byte(config), @@ -72,7 +67,17 @@ func serverOptions(cluster *v1alpha1.Cluster) string { opts = opts + "- " + addr + "\n" } } + if cluster.Spec.Mode != agent.VirtualNodeMode { + opts = opts + "disable-agent: true\negress-selector-mode: disabled\n" + } // TODO: Add extra args to the options return opts } + +func configSecretName(clusterName string, init bool) string { + if !init { + return controller.SafeConcatNameWithPrefix(clusterName, configName) + } + return controller.SafeConcatNameWithPrefix(clusterName, initConfigName) +} diff --git a/pkg/controller/cluster/server/ingress.go b/pkg/controller/cluster/server/ingress.go index 154cb5f..c29de26 100644 --- a/pkg/controller/cluster/server/ingress.go +++ b/pkg/controller/cluster/server/ingress.go @@ -3,7 +3,7 @@ package server import ( "context" - "github.com/rancher/k3k/pkg/controller/util" + "github.com/rancher/k3k/pkg/controller" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -15,12 +15,13 @@ const ( nginxSSLPassthroughAnnotation = "nginx.ingress.kubernetes.io/ssl-passthrough" nginxBackendProtocolAnnotation = "nginx.ingress.kubernetes.io/backend-protocol" nginxSSLRedirectAnnotation = "nginx.ingress.kubernetes.io/ssl-redirect" - serverPort = 6443 - etcdPort = 2379 + + serverPort = 6443 + etcdPort = 2379 ) func (s *Server) Ingress(ctx context.Context, client client.Client) (*networkingv1.Ingress, error) { - addresses, err := util.Addresses(ctx, client) + addresses, err := controller.Addresses(ctx, client) if err != nil { return nil, err } @@ -31,8 +32,8 @@ func (s *Server) Ingress(ctx context.Context, client client.Client) (*networking APIVersion: "networking.k8s.io/v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: s.cluster.Name + "-server-ingress", - Namespace: util.ClusterNamespace(s.cluster), + Name: controller.SafeConcatNameWithPrefix(s.cluster.Name, "ingress"), + Namespace: s.cluster.Namespace, }, Spec: networkingv1.IngressSpec{ IngressClassName: &s.cluster.Spec.Expose.Ingress.IngressClassName, @@ -59,7 +60,7 @@ func (s *Server) ingressRules(addresses []string) []networkingv1.IngressRule { PathType: &pathTypePrefix, Backend: networkingv1.IngressBackend{ Service: &networkingv1.IngressServiceBackend{ - Name: util.ServerSvcName(s.cluster), + Name: ServiceName(s.cluster.Name), Port: networkingv1.ServiceBackendPort{ Number: serverPort, }, diff --git a/pkg/controller/cluster/server/server.go b/pkg/controller/cluster/server/server.go index c8e382b..72470bd 100644 --- a/pkg/controller/cluster/server/server.go +++ b/pkg/controller/cluster/server/server.go @@ -5,7 +5,7 @@ import ( "strings" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" - "github.com/rancher/k3k/pkg/controller/util" + "github.com/rancher/k3k/pkg/controller" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -17,12 +17,12 @@ import ( ) const ( - serverName = "k3k-" - k3kSystemNamespace = serverName + "system" - initServerName = serverName + "init-server" - initContainerName = serverName + "server-check" - initContainerImage = "alpine/curl" + k3kSystemNamespace = "k3k-system" + serverName = "server" + configName = "server-config" + initConfigName = "init-server-config" + ServerPort = 6443 EphermalNodesType = "ephermal" DynamicNodesType = "dynamic" ) @@ -40,7 +40,7 @@ func New(cluster *v1alpha1.Cluster, client client.Client) *Server { } } -func (s *Server) podSpec(ctx context.Context, image, name string, persistent bool, affinitySelector *metav1.LabelSelector) v1.PodSpec { +func (s *Server) podSpec(image, name string, persistent bool, affinitySelector *metav1.LabelSelector) v1.PodSpec { var limit v1.ResourceList if s.cluster.Spec.Limit != nil && s.cluster.Spec.Limit.ServerLimit != nil { limit = s.cluster.Spec.Limit.ServerLimit @@ -62,7 +62,7 @@ func (s *Server) podSpec(ctx context.Context, image, name string, persistent boo Name: "initconfig", VolumeSource: v1.VolumeSource{ Secret: &v1.SecretVolumeSource{ - SecretName: util.ServerInitConfigName(s.cluster), + SecretName: configSecretName(s.cluster.Name, true), Items: []v1.KeyToPath{ { Key: "config.yaml", @@ -76,7 +76,7 @@ func (s *Server) podSpec(ctx context.Context, image, name string, persistent boo Name: "config", VolumeSource: v1.VolumeSource{ Secret: &v1.SecretVolumeSource{ - SecretName: util.ServerConfigName(s.cluster), + SecretName: configSecretName(s.cluster.Name, false), Items: []v1.KeyToPath{ { Key: "config.yaml", @@ -220,18 +220,18 @@ func (s *Server) podSpec(ctx context.Context, image, name string, persistent boo return podSpec } -func (s *Server) StatefulServer(ctx context.Context, cluster *v1alpha1.Cluster) (*apps.StatefulSet, error) { +func (s *Server) StatefulServer(ctx context.Context) (*apps.StatefulSet, error) { var ( replicas int32 pvClaims []v1.PersistentVolumeClaim persistent bool ) - image := util.K3SImage(cluster) - name := serverName + "server" + image := controller.K3SImage(s.cluster) + name := controller.SafeConcatNameWithPrefix(s.cluster.Name, serverName) - replicas = *cluster.Spec.Servers + replicas = *s.cluster.Spec.Servers - if cluster.Spec.Persistence != nil && cluster.Spec.Persistence.Type != EphermalNodesType { + if s.cluster.Spec.Persistence != nil && s.cluster.Spec.Persistence.Type != EphermalNodesType { persistent = true pvClaims = []v1.PersistentVolumeClaim{ { @@ -241,14 +241,14 @@ func (s *Server) StatefulServer(ctx context.Context, cluster *v1alpha1.Cluster) }, ObjectMeta: metav1.ObjectMeta{ Name: "varlibrancherk3s", - Namespace: util.ClusterNamespace(cluster), + Namespace: s.cluster.Namespace, }, Spec: v1.PersistentVolumeClaimSpec{ AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, - StorageClassName: &cluster.Spec.Persistence.StorageClassName, + StorageClassName: &s.cluster.Spec.Persistence.StorageClassName, Resources: v1.VolumeResourceRequirements{ Requests: v1.ResourceList{ - "storage": resource.MustParse(cluster.Spec.Persistence.StorageRequestSize), + "storage": resource.MustParse(s.cluster.Spec.Persistence.StorageRequestSize), }, }, }, @@ -260,16 +260,16 @@ func (s *Server) StatefulServer(ctx context.Context, cluster *v1alpha1.Cluster) }, ObjectMeta: metav1.ObjectMeta{ Name: "varlibkubelet", - Namespace: util.ClusterNamespace(cluster), + Namespace: s.cluster.Namespace, }, Spec: v1.PersistentVolumeClaimSpec{ Resources: v1.VolumeResourceRequirements{ Requests: v1.ResourceList{ - "storage": resource.MustParse(cluster.Spec.Persistence.StorageRequestSize), + "storage": resource.MustParse(s.cluster.Spec.Persistence.StorageRequestSize), }, }, AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, - StorageClassName: &cluster.Spec.Persistence.StorageClassName, + StorageClassName: &s.cluster.Spec.Persistence.StorageClassName, }, }, } @@ -301,7 +301,7 @@ func (s *Server) StatefulServer(ctx context.Context, cluster *v1alpha1.Cluster) }, ObjectMeta: metav1.ObjectMeta{ Name: addons.Name, - Namespace: util.ClusterNamespace(s.cluster), + Namespace: s.cluster.Namespace, }, Data: make(map[string][]byte, len(addons.Data)), } @@ -335,12 +335,12 @@ func (s *Server) StatefulServer(ctx context.Context, cluster *v1alpha1.Cluster) selector := metav1.LabelSelector{ MatchLabels: map[string]string{ - "cluster": cluster.Name, + "cluster": s.cluster.Name, "role": "server", }, } - podSpec := s.podSpec(ctx, image, name, persistent, &selector) + podSpec := s.podSpec(image, name, persistent, &selector) podSpec.Volumes = append(podSpec.Volumes, volumes...) podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, volumeMounts...) @@ -350,13 +350,13 @@ func (s *Server) StatefulServer(ctx context.Context, cluster *v1alpha1.Cluster) APIVersion: "apps/v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: cluster.Name + "-" + name, - Namespace: util.ClusterNamespace(cluster), + Name: name, + Namespace: s.cluster.Namespace, Labels: selector.MatchLabels, }, Spec: apps.StatefulSetSpec{ Replicas: &replicas, - ServiceName: cluster.Name + "-" + name + "-headless", + ServiceName: headlessServiceName(s.cluster.Name), Selector: &selector, VolumeClaimTemplates: pvClaims, Template: v1.PodTemplateSpec{ diff --git a/pkg/controller/cluster/server/service.go b/pkg/controller/cluster/server/service.go index 1ecbf6c..98a0560 100644 --- a/pkg/controller/cluster/server/service.go +++ b/pkg/controller/cluster/server/service.go @@ -2,7 +2,7 @@ package server import ( "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" - "github.com/rancher/k3k/pkg/controller/util" + "github.com/rancher/k3k/pkg/controller" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -23,8 +23,8 @@ func (s *Server) Service(cluster *v1alpha1.Cluster) *v1.Service { APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: util.ServerSvcName(cluster), - Namespace: util.ClusterNamespace(cluster), + Name: ServiceName(s.cluster.Name), + Namespace: cluster.Namespace, }, Spec: v1.ServiceSpec{ Type: serviceType, @@ -48,22 +48,21 @@ func (s *Server) Service(cluster *v1alpha1.Cluster) *v1.Service { } } -func (s *Server) StatefulServerService(cluster *v1alpha1.Cluster) *v1.Service { - name := serverName +func (s *Server) StatefulServerService() *v1.Service { return &v1.Service{ TypeMeta: metav1.TypeMeta{ Kind: "Service", APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: cluster.Name + "-" + name + "headless", - Namespace: util.ClusterNamespace(cluster), + Name: headlessServiceName(s.cluster.Name), + Namespace: s.cluster.Namespace, }, Spec: v1.ServiceSpec{ Type: v1.ServiceTypeClusterIP, ClusterIP: v1.ClusterIPNone, Selector: map[string]string{ - "cluster": cluster.Name, + "cluster": s.cluster.Name, "role": "server", }, Ports: []v1.ServicePort{ @@ -81,3 +80,11 @@ func (s *Server) StatefulServerService(cluster *v1alpha1.Cluster) *v1.Service { }, } } + +func ServiceName(clusterName string) string { + return controller.SafeConcatNameWithPrefix(clusterName, "service") +} + +func headlessServiceName(clusterName string) string { + return controller.SafeConcatNameWithPrefix(clusterName, "service", "headless") +} diff --git a/pkg/controller/clusterset/clusterset.go b/pkg/controller/clusterset/clusterset.go index a1827b0..ea81e2a 100644 --- a/pkg/controller/clusterset/clusterset.go +++ b/pkg/controller/clusterset/clusterset.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + k3kcontroller "github.com/rancher/k3k/pkg/controller" v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -20,7 +21,6 @@ import ( const ( clusterSetController = "k3k-clusterset-controller" - networkPolicyName = "k3k-cluster-netpol" allTrafficCIDR = "0.0.0.0/0" maxConcurrentReconciles = 1 ) @@ -108,7 +108,7 @@ func netpol(ctx context.Context, clusterCIDR string, clusterSet *v1alpha1.Cluste } return &networkingv1.NetworkPolicy{ ObjectMeta: metav1.ObjectMeta{ - Name: networkPolicyName, + Name: k3kcontroller.SafeConcatNameWithPrefix(clusterSet.Name), Namespace: clusterSet.Namespace, }, TypeMeta: metav1.TypeMeta{ diff --git a/pkg/controller/clusterset/node.go b/pkg/controller/clusterset/node.go index 9cd23d7..fe2a7c9 100644 --- a/pkg/controller/clusterset/node.go +++ b/pkg/controller/clusterset/node.go @@ -4,7 +4,7 @@ import ( "context" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" - "github.com/rancher/k3k/pkg/controller/util" + k3kcontroller "github.com/rancher/k3k/pkg/controller" v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/runtime" @@ -45,7 +45,7 @@ func AddNodeController(ctx context.Context, mgr manager.Manager) error { func (n *NodeReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { var clusterSetList v1alpha1.ClusterSetList if err := n.Client.List(ctx, &clusterSetList); err != nil { - return reconcile.Result{}, util.LogAndReturnErr("failed to list clusterSets", err) + return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to list clusterSets", err) } if len(clusterSetList.Items) <= 0 { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go new file mode 100644 index 0000000..29a6d26 --- /dev/null +++ b/pkg/controller/controller.go @@ -0,0 +1,96 @@ +package controller + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "strings" + "time" + + "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + namePrefix = "k3k" + k3SImageName = "rancher/k3s" + AdminCommonName = "system:admin" +) + +// Backoff is the cluster creation duration backoff +var Backoff = wait.Backoff{ + Steps: 5, + Duration: 5 * time.Second, + Factor: 2, + Jitter: 0.1, +} + +func K3SImage(cluster *v1alpha1.Cluster) string { + return k3SImageName + ":" + cluster.Spec.Version +} + +func LogAndReturnErr(errString string, err error) error { + klog.Errorf("%s: %v", errString, err) + return err +} + +func nodeAddress(node *v1.Node) string { + var externalIP string + var internalIP string + + for _, ip := range node.Status.Addresses { + if ip.Type == "ExternalIP" && ip.Address != "" { + externalIP = ip.Address + break + } + if ip.Type == "InternalIP" && ip.Address != "" { + internalIP = ip.Address + } + } + if externalIP != "" { + return externalIP + } + + return internalIP +} + +// return all the nodes external addresses, if not found then return internal addresses +func Addresses(ctx context.Context, client ctrlruntimeclient.Client) ([]string, error) { + var nodeList v1.NodeList + if err := client.List(ctx, &nodeList); err != nil { + return nil, err + } + + addresses := make([]string, len(nodeList.Items)) + for _, node := range nodeList.Items { + addresses = append(addresses, nodeAddress(&node)) + } + + return addresses, nil +} + +// SafeConcatNameWithPrefix runs the SafeConcatName with extra prefix. +func SafeConcatNameWithPrefix(name ...string) string { + return SafeConcatName(append([]string{namePrefix}, name...)...) +} + +// SafeConcatName concatenates the given strings and ensures the returned name is under 64 characters +// by cutting the string off at 57 characters and setting the last 6 with an encoded version of the concatenated string. +func SafeConcatName(name ...string) string { + fullPath := strings.Join(name, "-") + if len(fullPath) < 64 { + return fullPath + } + digest := sha256.Sum256([]byte(fullPath)) + // since we cut the string in the middle, the last char may not be compatible with what is expected in k8s + // we are checking and if necessary removing the last char + c := fullPath[56] + if 'a' <= c && c <= 'z' || '0' <= c && c <= '9' { + return fullPath[0:57] + "-" + hex.EncodeToString(digest[0:])[0:5] + } + + return fullPath[0:56] + "-" + hex.EncodeToString(digest[0:])[0:6] +} diff --git a/pkg/controller/kubeconfig/kubeconfig.go b/pkg/controller/kubeconfig/kubeconfig.go index 2b8de59..787c797 100644 --- a/pkg/controller/kubeconfig/kubeconfig.go +++ b/pkg/controller/kubeconfig/kubeconfig.go @@ -10,8 +10,9 @@ import ( certutil "github.com/rancher/dynamiclistener/cert" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" + "github.com/rancher/k3k/pkg/controller" + "github.com/rancher/k3k/pkg/controller/cluster/server" "github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap" - "github.com/rancher/k3k/pkg/controller/util" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/clientcmd" @@ -28,8 +29,8 @@ type KubeConfig struct { func (k *KubeConfig) Extract(ctx context.Context, client client.Client, cluster *v1alpha1.Cluster, hostServerIP string) ([]byte, error) { nn := types.NamespacedName{ - Name: cluster.Name + "-bootstrap", - Namespace: util.ClusterNamespace(cluster), + Name: controller.SafeConcatNameWithPrefix(cluster.Name, "bootstrap"), + Namespace: cluster.Namespace, } var bootstrapSecret v1.Secret @@ -57,8 +58,8 @@ func (k *KubeConfig) Extract(ctx context.Context, client client.Client, cluster } // get the server service to extract the right IP nn = types.NamespacedName{ - Name: util.ServerSvcName(cluster), - Namespace: util.ClusterNamespace(cluster), + Name: server.ServiceName(cluster.Name), + Namespace: cluster.Namespace, } var k3kService v1.Service @@ -66,7 +67,7 @@ func (k *KubeConfig) Extract(ctx context.Context, client client.Client, cluster return nil, err } - url := fmt.Sprintf("https://%s:%d", k3kService.Spec.ClusterIP, util.ServerPort) + url := fmt.Sprintf("https://%s:%d", k3kService.Spec.ClusterIP, server.ServerPort) if k3kService.Spec.Type == v1.ServiceTypeNodePort { nodePort := k3kService.Spec.Ports[0].NodePort url = fmt.Sprintf("https://%s:%d", hostServerIP, nodePort) diff --git a/pkg/controller/util/util.go b/pkg/controller/util/util.go deleted file mode 100644 index 9352310..0000000 --- a/pkg/controller/util/util.go +++ /dev/null @@ -1,86 +0,0 @@ -package util - -import ( - "context" - "fmt" - - "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" - v1 "k8s.io/api/core/v1" - "k8s.io/klog" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -const ( - namespacePrefix = "k3k-" - k3SImageName = "rancher/k3s" - AdminCommonName = "system:admin" - ServerPort = 6443 -) - -const ( - K3kSystemNamespace = namespacePrefix + "system" -) - -func ClusterNamespace(cluster *v1alpha1.Cluster) string { - return cluster.Namespace -} - -func ServerSvcName(cluster *v1alpha1.Cluster) string { - return fmt.Sprintf("k3k-%s-service", cluster.Name) -} - -func ServerConfigName(cluster *v1alpha1.Cluster) string { - return fmt.Sprintf("k3k-%s-server-config", cluster.Name) -} - -func ServerInitConfigName(cluster *v1alpha1.Cluster) string { - return fmt.Sprintf("k3k-init-%s-server-config", cluster.Name) -} - -func AgentConfigName(cluster *v1alpha1.Cluster) string { - return fmt.Sprintf("k3k-%s-agent-config", cluster.Name) -} - -func K3SImage(cluster *v1alpha1.Cluster) string { - return k3SImageName + ":" + cluster.Spec.Version -} - -func LogAndReturnErr(errString string, err error) error { - klog.Errorf("%s: %v", errString, err) - return err -} - -func nodeAddress(node *v1.Node) string { - var externalIP string - var internalIP string - - for _, ip := range node.Status.Addresses { - if ip.Type == "ExternalIP" && ip.Address != "" { - externalIP = ip.Address - break - } - if ip.Type == "InternalIP" && ip.Address != "" { - internalIP = ip.Address - } - } - if externalIP != "" { - return externalIP - } - - return internalIP -} - -// return all the nodes external addresses, if not found then return internal addresses -func Addresses(ctx context.Context, client client.Client) ([]string, error) { - var nodeList v1.NodeList - if err := client.List(ctx, &nodeList); err != nil { - return nil, err - } - - var addresses []string - for _, node := range nodeList.Items { - addresses = append(addresses, nodeAddress(&node)) - } - - return addresses, nil -} diff --git a/virtual-kubelet/main.go b/virtual-kubelet/main.go deleted file mode 100644 index 22ae471..0000000 --- a/virtual-kubelet/main.go +++ /dev/null @@ -1,214 +0,0 @@ -package main - -import ( - "context" - "crypto/tls" - "crypto/x509" - "encoding/json" - "fmt" - "net" - "net/http" - "os" - "time" - - certutil "github.com/rancher/dynamiclistener/cert" - "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" - "github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap" - "github.com/rancher/k3k/pkg/controller/kubeconfig" - "github.com/rancher/k3k/pkg/controller/util" - "github.com/rancher/k3k/virtual-kubelet/pkg/provider" - "github.com/virtual-kubelet/virtual-kubelet/log" - "github.com/virtual-kubelet/virtual-kubelet/node" - "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" - "go.uber.org/zap" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" -) - -const ( - clusterNameEnv = "CLUSTER_NAME" - clusterNamespaceEnv = "CLUSTER_NAMESPACE" - hostKubeconfigEnv = "HOST_KUBECONFIG" - virtKubeconfigEnv = "VIRT_KUBECONFIG" - podIPEnv = "VIRT_POD_IP" - srvPort = 9443 - nodeName = "virtual-node" -) - -func main() { - name, ok := os.LookupEnv(clusterNameEnv) - if !ok { - fmt.Printf("env var %s is required but was not provided \n", clusterNameEnv) - os.Exit(-1) - } - namespace, ok := os.LookupEnv(clusterNamespaceEnv) - if !ok { - fmt.Printf("env var %s is required but was not provided \n", clusterNamespaceEnv) - os.Exit(-1) - } - hostKubeconfigPath, ok := os.LookupEnv(hostKubeconfigEnv) - if !ok { - fmt.Printf("env var %s is required but was not provided \n", hostKubeconfigEnv) - os.Exit(-1) - } - virtKubeconfigPath, ok := os.LookupEnv(virtKubeconfigEnv) - if !ok { - fmt.Printf("env var %s is required but was not provided \n", hostKubeconfigPath) - os.Exit(-1) - } - podIP, ok := os.LookupEnv(podIPEnv) - if !ok { - fmt.Printf("env var %s is required but was not provided \n", podIPEnv) - os.Exit(-1) - } - hostConfig, err := clientcmd.BuildConfigFromFlags("", hostKubeconfigPath) - if err != nil { - fmt.Printf("unable to load host kubeconfig at path %s, %s \n", hostKubeconfigPath, err) - os.Exit(-1) - } - virtConfig, err := clientcmd.BuildConfigFromFlags("", virtKubeconfigPath) - if err != nil { - fmt.Printf("unable to load virtual kubeconfig at path %s, %s \n", virtKubeconfigPath, err) - os.Exit(-1) - } - virtClientset, err := kubernetes.NewForConfig(virtConfig) - if err != nil { - fmt.Printf("unable to load virtual kubeconfig into kubernetes interface %s \n", err) - os.Exit(-1) - } - node, err := nodeutil.NewNode("virtual-node", func(pc nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) { - utilProvider, err := provider.New(*hostConfig, namespace, name) - if err != nil { - return nil, nil, fmt.Errorf("unable to make nodeutil provider %w", err) - } - nodeProvider := provider.Node{} - provider.ConfigureNode(pc.Node, podIP, srvPort) - return utilProvider, &nodeProvider, nil - }, - nodeutil.WithClient(virtClientset), - func(c *nodeutil.NodeConfig) error { - c.HTTPListenAddr = fmt.Sprintf(":%d", srvPort) - // set up the routes - mux := http.NewServeMux() - err := nodeutil.AttachProviderRoutes(mux)(c) - if err != nil { - return fmt.Errorf("unable to attach routes: %w", err) - } - c.Handler = mux - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - tlsConfig, err := loadTLSConfig(ctx, hostConfig, name, namespace, nodeName, podIP) - if err != nil { - return fmt.Errorf("unable to get tls config: %w", err) - } - c.TLSConfig = tlsConfig - return nil - }, - ) - if err != nil { - fmt.Printf("unable to start kubelet: %s \n", err.Error()) - os.Exit(-1) - } - // run the node async so that we can wait for it to be ready in another call - go func() { - ctx := context.Background() - logger, err := zap.NewProduction() - if err != nil { - fmt.Printf("unable to create logger: %s", err.Error()) - os.Exit(-1) - } - wrapped := LogWrapper{ - *logger.Sugar(), - } - ctx = log.WithLogger(ctx, &wrapped) - err = node.Run(ctx) - if err != nil { - fmt.Printf("node errored when running: %s \n", err.Error()) - os.Exit(-1) - } - }() - if err := node.WaitReady(context.Background(), time.Minute*1); err != nil { - fmt.Printf("node was not ready within timeout of 1 minute: %s \n", err.Error()) - os.Exit(-1) - } - <-node.Done() - if err := node.Err(); err != nil { - fmt.Printf("node stopped with an error: %s \n", err.Error()) - os.Exit(-1) - } - fmt.Printf("node exited without an error") -} - -type LogWrapper struct { - zap.SugaredLogger -} - -func (l *LogWrapper) WithError(err error) log.Logger { - return l -} - -func (l *LogWrapper) WithField(string, interface{}) log.Logger { - return l -} -func (l *LogWrapper) WithFields(field log.Fields) log.Logger { - return l -} - -func loadTLSConfig(ctx context.Context, hostConfig *rest.Config, clusterName, clusterNamespace, nodeName, ipStr string) (*tls.Config, error) { - dynamic, err := dynamic.NewForConfig(hostConfig) - if err != nil { - return nil, fmt.Errorf("unable to get clientset for kubeconfig: %w", err) - } - clusterGVR := schema.GroupVersionResource{ - Group: "k3k.io", - Version: "v1alpha1", - Resource: "clusters", - } - dynCluster, err := dynamic.Resource(clusterGVR).Namespace(clusterNamespace).Get(ctx, clusterName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("unable to get cluster: %w", err) - } - var cluster v1alpha1.Cluster - bytes, err := json.Marshal(dynCluster) - if err != nil { - return nil, fmt.Errorf("unable to marshall cluster: %w", err) - } - err = json.Unmarshal(bytes, &cluster) - if err != nil { - return nil, fmt.Errorf("unable to unmarshall cluster: %w", err) - } - - endpoint := fmt.Sprintf("%s.%s", util.ServerSvcName(&cluster), util.ClusterNamespace(&cluster)) - b, err := bootstrap.DecodedBootstrap(cluster.Spec.Token, endpoint) - if err != nil { - return nil, fmt.Errorf("unable to decode bootstrap: %w", err) - } - altNames := certutil.AltNames{ - IPs: []net.IP{net.ParseIP(ipStr)}, - } - cert, key, err := kubeconfig.CreateClientCertKey(nodeName, nil, &altNames, []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, 0, b.ServerCA.Content, b.ServerCAKey.Content) - if err != nil { - return nil, fmt.Errorf("unable to get cert and key: %w", err) - } - clientCert, err := tls.X509KeyPair(cert, key) - if err != nil { - return nil, fmt.Errorf("unable to get key pair: %w", err) - } - // create rootCA CertPool - certs, err := certutil.ParseCertsPEM([]byte(b.ServerCA.Content)) - if err != nil { - return nil, fmt.Errorf("unable to create certs: %w", err) - } - pool := x509.NewCertPool() - pool.AddCert(certs[0]) - - return &tls.Config{ - RootCAs: pool, - Certificates: []tls.Certificate{clientCert}, - }, nil -}