mirror of
https://github.com/rancher/k3k.git
synced 2026-02-14 18:10:01 +00:00
467 lines
13 KiB
Go
467 lines
13 KiB
Go
package cmds
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/url"
|
|
"os"
|
|
"strings"
|
|
"text/template"
|
|
"time"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/spf13/cobra"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/util/retry"
|
|
"k8s.io/utils/ptr"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
|
|
|
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
|
|
"github.com/rancher/k3k/pkg/controller"
|
|
k3kcluster "github.com/rancher/k3k/pkg/controller/cluster"
|
|
"github.com/rancher/k3k/pkg/controller/kubeconfig"
|
|
)
|
|
|
|
type CreateConfig struct {
|
|
token string
|
|
clusterCIDR string
|
|
serviceCIDR string
|
|
servers int
|
|
agents int
|
|
serverArgs []string
|
|
agentArgs []string
|
|
serverEnvs []string
|
|
agentEnvs []string
|
|
labels []string
|
|
annotations []string
|
|
persistenceType string
|
|
storageClassName string
|
|
storageRequestSize string
|
|
version string
|
|
mode string
|
|
kubeconfigServerHost string
|
|
policy string
|
|
mirrorHostNodes bool
|
|
customCertsPath string
|
|
timeout time.Duration
|
|
}
|
|
|
|
func NewClusterCreateCmd(appCtx *AppContext) *cobra.Command {
|
|
createConfig := &CreateConfig{}
|
|
|
|
cmd := &cobra.Command{
|
|
Use: "create",
|
|
Short: "Create a new cluster.",
|
|
Example: "k3kcli cluster create [command options] NAME",
|
|
PreRunE: func(cmd *cobra.Command, args []string) error {
|
|
return validateCreateConfig(createConfig)
|
|
},
|
|
RunE: createAction(appCtx, createConfig),
|
|
Args: cobra.ExactArgs(1),
|
|
}
|
|
|
|
CobraFlagNamespace(appCtx, cmd.Flags())
|
|
createFlags(cmd, createConfig)
|
|
|
|
return cmd
|
|
}
|
|
|
|
func createAction(appCtx *AppContext, config *CreateConfig) func(cmd *cobra.Command, args []string) error {
|
|
return func(cmd *cobra.Command, args []string) error {
|
|
ctx := context.Background()
|
|
client := appCtx.Client
|
|
name := args[0]
|
|
|
|
if name == k3kcluster.ClusterInvalidName {
|
|
return errors.New("invalid cluster name")
|
|
}
|
|
|
|
if config.mode == string(v1beta1.SharedClusterMode) && config.agents != 0 {
|
|
return errors.New("invalid flag, --agents flag is only allowed in virtual mode")
|
|
}
|
|
|
|
namespace := appCtx.Namespace(name)
|
|
|
|
if err := createNamespace(ctx, client, namespace, config.policy); err != nil {
|
|
return err
|
|
}
|
|
|
|
if strings.Contains(config.version, "+") {
|
|
orig := config.version
|
|
config.version = strings.ReplaceAll(config.version, "+", "-")
|
|
logrus.Warnf("Invalid K3s docker reference version: '%s'. Using '%s' instead", orig, config.version)
|
|
}
|
|
|
|
if config.token != "" {
|
|
logrus.Info("Creating cluster token secret")
|
|
|
|
obj := k3kcluster.TokenSecretObj(config.token, name, namespace)
|
|
|
|
if err := client.Create(ctx, &obj); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if config.customCertsPath != "" {
|
|
if err := CreateCustomCertsSecrets(ctx, name, namespace, config.customCertsPath, client); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
logrus.Infof("Creating cluster '%s' in namespace '%s'", name, namespace)
|
|
|
|
cluster, err := newCluster(name, namespace, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cluster.Spec.Expose = &v1beta1.ExposeConfig{
|
|
NodePort: &v1beta1.NodePortConfig{},
|
|
}
|
|
|
|
// add Host IP address as an extra TLS-SAN to expose the k3k cluster
|
|
url, err := url.Parse(appCtx.RestConfig.Host)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
host := strings.Split(url.Host, ":")
|
|
if config.kubeconfigServerHost != "" {
|
|
host = []string{config.kubeconfigServerHost}
|
|
}
|
|
|
|
cluster.Spec.TLSSANs = []string{host[0]}
|
|
|
|
if err := client.Create(ctx, cluster); err != nil {
|
|
if apierrors.IsAlreadyExists(err) {
|
|
logrus.Infof("Cluster '%s' already exists", name)
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := waitForClusterReconciled(ctx, client, cluster, config.timeout); err != nil {
|
|
return fmt.Errorf("failed to wait for cluster to be reconciled: %w", err)
|
|
}
|
|
|
|
clusterDetails, err := getClusterDetails(cluster)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get cluster details: %w", err)
|
|
}
|
|
|
|
logrus.Info(clusterDetails)
|
|
|
|
logrus.Infof("Waiting for cluster to be available..")
|
|
|
|
if err := waitForClusterReady(ctx, client, cluster, config.timeout); err != nil {
|
|
return fmt.Errorf("failed to wait for cluster to become ready (status: %s): %w", cluster.Status.Phase, err)
|
|
}
|
|
|
|
logrus.Infof("Extracting Kubeconfig for '%s' cluster", name)
|
|
|
|
// retry every 5s for at most 2m, or 25 times
|
|
availableBackoff := wait.Backoff{
|
|
Duration: 5 * time.Second,
|
|
Cap: 2 * time.Minute,
|
|
Steps: 25,
|
|
}
|
|
|
|
cfg := kubeconfig.New()
|
|
|
|
var kubeconfig *clientcmdapi.Config
|
|
|
|
if err := retry.OnError(availableBackoff, apierrors.IsNotFound, func() error {
|
|
kubeconfig, err = cfg.Generate(ctx, client, cluster, host[0], 0)
|
|
return err
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return writeKubeconfigFile(cluster, kubeconfig, "")
|
|
}
|
|
}
|
|
|
|
func newCluster(name, namespace string, config *CreateConfig) (*v1beta1.Cluster, error) {
|
|
var storageRequestSize *resource.Quantity
|
|
if config.storageRequestSize != "" {
|
|
parsed, err := resource.ParseQuantity(config.storageRequestSize)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
storageRequestSize = ptr.To(parsed)
|
|
}
|
|
|
|
cluster := &v1beta1.Cluster{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: name,
|
|
Namespace: namespace,
|
|
Labels: parseKeyValuePairs(config.labels, "label"),
|
|
Annotations: parseKeyValuePairs(config.annotations, "annotation"),
|
|
},
|
|
TypeMeta: metav1.TypeMeta{
|
|
Kind: "Cluster",
|
|
APIVersion: "k3k.io/v1beta1",
|
|
},
|
|
Spec: v1beta1.ClusterSpec{
|
|
Servers: ptr.To(int32(config.servers)),
|
|
Agents: ptr.To(int32(config.agents)),
|
|
ClusterCIDR: config.clusterCIDR,
|
|
ServiceCIDR: config.serviceCIDR,
|
|
ServerArgs: config.serverArgs,
|
|
AgentArgs: config.agentArgs,
|
|
ServerEnvs: env(config.serverEnvs),
|
|
AgentEnvs: env(config.agentEnvs),
|
|
Version: config.version,
|
|
Mode: v1beta1.ClusterMode(config.mode),
|
|
Persistence: v1beta1.PersistenceConfig{
|
|
Type: v1beta1.PersistenceMode(config.persistenceType),
|
|
StorageClassName: ptr.To(config.storageClassName),
|
|
StorageRequestSize: storageRequestSize,
|
|
},
|
|
MirrorHostNodes: config.mirrorHostNodes,
|
|
},
|
|
}
|
|
if config.storageClassName == "" {
|
|
cluster.Spec.Persistence.StorageClassName = nil
|
|
}
|
|
|
|
if config.token != "" {
|
|
cluster.Spec.TokenSecretRef = &corev1.SecretReference{
|
|
Name: k3kcluster.TokenSecretName(name),
|
|
Namespace: namespace,
|
|
}
|
|
}
|
|
|
|
if config.customCertsPath != "" {
|
|
cluster.Spec.CustomCAs = &v1beta1.CustomCAs{
|
|
Enabled: true,
|
|
Sources: v1beta1.CredentialSources{
|
|
ClientCA: v1beta1.CredentialSource{
|
|
SecretName: controller.SafeConcatNameWithPrefix(cluster.Name, "client-ca"),
|
|
},
|
|
ServerCA: v1beta1.CredentialSource{
|
|
SecretName: controller.SafeConcatNameWithPrefix(cluster.Name, "server-ca"),
|
|
},
|
|
ETCDServerCA: v1beta1.CredentialSource{
|
|
SecretName: controller.SafeConcatNameWithPrefix(cluster.Name, "etcd-server-ca"),
|
|
},
|
|
ETCDPeerCA: v1beta1.CredentialSource{
|
|
SecretName: controller.SafeConcatNameWithPrefix(cluster.Name, "etcd-peer-ca"),
|
|
},
|
|
RequestHeaderCA: v1beta1.CredentialSource{
|
|
SecretName: controller.SafeConcatNameWithPrefix(cluster.Name, "request-header-ca"),
|
|
},
|
|
ServiceAccountToken: v1beta1.CredentialSource{
|
|
SecretName: controller.SafeConcatNameWithPrefix(cluster.Name, "service-account-token"),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
return cluster, nil
|
|
}
|
|
|
|
func env(envSlice []string) []corev1.EnvVar {
|
|
var envVars []corev1.EnvVar
|
|
|
|
for _, env := range envSlice {
|
|
keyValue := strings.Split(env, "=")
|
|
if len(keyValue) != 2 {
|
|
logrus.Fatalf("incorrect value for environment variable %s", env)
|
|
}
|
|
|
|
envVars = append(envVars, corev1.EnvVar{
|
|
Name: keyValue[0],
|
|
Value: keyValue[1],
|
|
})
|
|
}
|
|
|
|
return envVars
|
|
}
|
|
|
|
func waitForClusterReconciled(ctx context.Context, k8sClient client.Client, cluster *v1beta1.Cluster, timeout time.Duration) error {
|
|
return wait.PollUntilContextTimeout(ctx, time.Second, timeout, false, func(ctx context.Context) (bool, error) {
|
|
key := client.ObjectKeyFromObject(cluster)
|
|
if err := k8sClient.Get(ctx, key, cluster); err != nil {
|
|
return false, fmt.Errorf("failed to get resource: %w", err)
|
|
}
|
|
|
|
return cluster.Status.HostVersion != "", nil
|
|
})
|
|
}
|
|
|
|
func waitForClusterReady(ctx context.Context, k8sClient client.Client, cluster *v1beta1.Cluster, timeout time.Duration) error {
|
|
interval := 5 * time.Second
|
|
|
|
return wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
|
|
key := client.ObjectKeyFromObject(cluster)
|
|
if err := k8sClient.Get(ctx, key, cluster); err != nil {
|
|
return false, fmt.Errorf("failed to get resource: %w", err)
|
|
}
|
|
|
|
// If resource ready -> stop polling
|
|
if cluster.Status.Phase == v1beta1.ClusterReady {
|
|
return true, nil
|
|
}
|
|
|
|
// If resource failed -> stop polling with an error
|
|
if cluster.Status.Phase == v1beta1.ClusterFailed {
|
|
return true, fmt.Errorf("cluster creation failed: %s", cluster.Status.Phase)
|
|
}
|
|
|
|
// Condition not met, continue polling.
|
|
return false, nil
|
|
})
|
|
}
|
|
|
|
func CreateCustomCertsSecrets(ctx context.Context, name, namespace, customCertsPath string, k8sclient client.Client) error {
|
|
customCAsMap := map[string]string{
|
|
"etcd-peer-ca": "/etcd/peer-ca",
|
|
"etcd-server-ca": "/etcd/server-ca",
|
|
"server-ca": "/server-ca",
|
|
"client-ca": "/client-ca",
|
|
"request-header-ca": "/request-header-ca",
|
|
"service-account-token": "/service",
|
|
}
|
|
|
|
for certName, fileName := range customCAsMap {
|
|
var (
|
|
certFilePath, keyFilePath string
|
|
cert, key []byte
|
|
err error
|
|
)
|
|
|
|
if certName != "service-account-token" {
|
|
certFilePath = customCertsPath + fileName + ".crt"
|
|
|
|
cert, err = os.ReadFile(certFilePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
keyFilePath = customCertsPath + fileName + ".key"
|
|
|
|
key, err = os.ReadFile(keyFilePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
certSecret := caCertSecret(certName, name, namespace, cert, key)
|
|
|
|
if err := k8sclient.Create(ctx, certSecret); err != nil {
|
|
return client.IgnoreAlreadyExists(err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func caCertSecret(certName, clusterName, clusterNamespace string, cert, key []byte) *corev1.Secret {
|
|
return &corev1.Secret{
|
|
TypeMeta: metav1.TypeMeta{
|
|
Kind: "Secret",
|
|
APIVersion: "v1",
|
|
},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: controller.SafeConcatNameWithPrefix(clusterName, certName),
|
|
Namespace: clusterNamespace,
|
|
},
|
|
Type: corev1.SecretTypeTLS,
|
|
Data: map[string][]byte{
|
|
corev1.TLSCertKey: cert,
|
|
corev1.TLSPrivateKeyKey: key,
|
|
},
|
|
}
|
|
}
|
|
|
|
func parseKeyValuePairs(pairs []string, pairType string) map[string]string {
|
|
resultMap := make(map[string]string)
|
|
|
|
for _, p := range pairs {
|
|
var k, v string
|
|
|
|
keyValue := strings.SplitN(p, "=", 2)
|
|
|
|
k = keyValue[0]
|
|
if len(keyValue) == 2 {
|
|
v = keyValue[1]
|
|
}
|
|
|
|
resultMap[k] = v
|
|
|
|
logrus.Debugf("Adding '%s=%s' %s to Cluster", k, v, pairType)
|
|
}
|
|
|
|
return resultMap
|
|
}
|
|
|
|
const clusterDetailsTemplate = `Cluster details:
|
|
Mode: {{ .Mode }}
|
|
Servers: {{ .Servers }}{{ if .Agents }}
|
|
Agents: {{ .Agents }}{{ end }}
|
|
Version: {{ if .Version }}{{ .Version }}{{ else }}{{ .HostVersion }}{{ end }} (Host: {{ .HostVersion }})
|
|
Persistence:
|
|
Type: {{.Persistence.Type}}{{ if .Persistence.StorageClassName }}
|
|
StorageClass: {{ .Persistence.StorageClassName }}{{ end }}{{ if .Persistence.StorageRequestSize }}
|
|
Size: {{ .Persistence.StorageRequestSize }}{{ end }}{{ if .Labels }}
|
|
Labels: {{ range $key, $value := .Labels }}
|
|
{{$key}}: {{$value}}{{ end }}{{ end }}{{ if .Annotations }}
|
|
Annotations: {{ range $key, $value := .Annotations }}
|
|
{{$key}}: {{$value}}{{ end }}{{ end }}`
|
|
|
|
func getClusterDetails(cluster *v1beta1.Cluster) (string, error) {
|
|
type templateData struct {
|
|
Mode v1beta1.ClusterMode
|
|
Servers int32
|
|
Agents int32
|
|
Version string
|
|
HostVersion string
|
|
Persistence struct {
|
|
Type v1beta1.PersistenceMode
|
|
StorageClassName string
|
|
StorageRequestSize string
|
|
}
|
|
Labels map[string]string
|
|
Annotations map[string]string
|
|
}
|
|
|
|
data := templateData{
|
|
Mode: cluster.Spec.Mode,
|
|
Servers: ptr.Deref(cluster.Spec.Servers, 0),
|
|
Agents: ptr.Deref(cluster.Spec.Agents, 0),
|
|
Version: cluster.Spec.Version,
|
|
HostVersion: cluster.Status.HostVersion,
|
|
Annotations: cluster.Annotations,
|
|
Labels: cluster.Labels,
|
|
}
|
|
|
|
data.Persistence.Type = cluster.Spec.Persistence.Type
|
|
data.Persistence.StorageClassName = ptr.Deref(cluster.Spec.Persistence.StorageClassName, "")
|
|
|
|
if srs := cluster.Spec.Persistence.StorageRequestSize; srs != nil {
|
|
data.Persistence.StorageRequestSize = srs.String()
|
|
}
|
|
|
|
tmpl, err := template.New("clusterDetails").Parse(clusterDetailsTemplate)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
if err = tmpl.Execute(&buf, data); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return buf.String(), nil
|
|
}
|