mirror of
https://github.com/rancher/k3k.git
synced 2026-03-03 02:00:38 +00:00
Merge pull request #19 from galal-hussein/issue-9
Initial Allocator Impl
This commit is contained in:
38
crd/cidrallocationpool.yaml
Normal file
38
crd/cidrallocationpool.yaml
Normal file
@@ -0,0 +1,38 @@
|
||||
apiVersion: apiextensions.k8s.io/v1
|
||||
kind: CustomResourceDefinition
|
||||
metadata:
|
||||
name: cidrallocationpools.k3k.io
|
||||
spec:
|
||||
group: k3k.io
|
||||
versions:
|
||||
- name: v1alpha1
|
||||
served: true
|
||||
storage: true
|
||||
schema:
|
||||
openAPIV3Schema:
|
||||
type: object
|
||||
properties:
|
||||
spec:
|
||||
type: object
|
||||
properties:
|
||||
defaultClusterCIDR:
|
||||
type: string
|
||||
status:
|
||||
type: object
|
||||
properties:
|
||||
pool:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
clusterName:
|
||||
type: string
|
||||
issued:
|
||||
type: integer
|
||||
ipNet:
|
||||
type: string
|
||||
scope: Cluster
|
||||
names:
|
||||
plural: cidrallocationpools
|
||||
singular: cidrallocationpool
|
||||
kind: CIDRAllocationPool
|
||||
@@ -8,7 +8,7 @@ CODEGEN_GIT_PKG=https://github.com/kubernetes/code-generator.git
|
||||
git clone --depth 1 ${CODEGEN_GIT_PKG} || true
|
||||
|
||||
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
|
||||
CODEGEN_PKG=../code-generator
|
||||
CODEGEN_PKG=./code-generator
|
||||
|
||||
"${CODEGEN_PKG}/generate-groups.sh" \
|
||||
"deepcopy" \
|
||||
|
||||
7
main.go
7
main.go
@@ -29,6 +29,8 @@ func main() {
|
||||
ctrlconfig.RegisterFlags(nil)
|
||||
flag.Parse()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
kubeconfig := flag.Lookup("kubeconfig").Value.String()
|
||||
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
|
||||
if err != nil {
|
||||
@@ -41,11 +43,12 @@ func main() {
|
||||
if err != nil {
|
||||
klog.Fatalf("Failed to create new controller runtime manager: %v", err)
|
||||
}
|
||||
if err := cluster.Add(mgr); err != nil {
|
||||
|
||||
if err := cluster.Add(ctx, mgr); err != nil {
|
||||
klog.Fatalf("Failed to add the new controller: %v", err)
|
||||
}
|
||||
|
||||
if err := mgr.Start(context.Background()); err != nil {
|
||||
if err := mgr.Start(ctx); err != nil {
|
||||
klog.Fatalf("Failed to start the manager: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,3 +59,36 @@ type ClusterStatus struct {
|
||||
ServiceCIDR string `json:"serviceCIDR,omitempty"`
|
||||
ClusterDNS string `json:"clusterDNS,omitempty"`
|
||||
}
|
||||
|
||||
type Allocation struct {
|
||||
ClusterName string `json:"clusterName"`
|
||||
Issued int64 `json:"issued"`
|
||||
IPNet string `json:"ipNet"`
|
||||
}
|
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
|
||||
type CIDRAllocationPool struct {
|
||||
metav1.ObjectMeta `json:"metadata,omitempty"`
|
||||
metav1.TypeMeta `json:",inline"`
|
||||
|
||||
Spec CIDRAllocationPoolSpec `json:"spec"`
|
||||
Status CIDRAllocationPoolStatus `json:"status"`
|
||||
}
|
||||
|
||||
type CIDRAllocationPoolSpec struct {
|
||||
DefaultClusterCIDR string `json:"defaultClusterCIDR"`
|
||||
}
|
||||
|
||||
type CIDRAllocationPoolStatus struct {
|
||||
Pool []Allocation `json:"pool"`
|
||||
}
|
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
|
||||
type CIDRAllocationPoolList struct {
|
||||
metav1.ListMeta `json:"metadata,omitempty"`
|
||||
metav1.TypeMeta `json:",inline"`
|
||||
|
||||
Items []CIDRAllocationPool `json:"items"`
|
||||
}
|
||||
|
||||
@@ -9,6 +9,120 @@ import (
|
||||
runtime "k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *Allocation) DeepCopyInto(out *Allocation) {
|
||||
*out = *in
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Allocation.
|
||||
func (in *Allocation) DeepCopy() *Allocation {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(Allocation)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *CIDRAllocationPool) DeepCopyInto(out *CIDRAllocationPool) {
|
||||
*out = *in
|
||||
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
|
||||
out.TypeMeta = in.TypeMeta
|
||||
out.Spec = in.Spec
|
||||
in.Status.DeepCopyInto(&out.Status)
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CIDRAllocationPool.
|
||||
func (in *CIDRAllocationPool) DeepCopy() *CIDRAllocationPool {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(CIDRAllocationPool)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
|
||||
func (in *CIDRAllocationPool) DeepCopyObject() runtime.Object {
|
||||
if c := in.DeepCopy(); c != nil {
|
||||
return c
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *CIDRAllocationPoolList) DeepCopyInto(out *CIDRAllocationPoolList) {
|
||||
*out = *in
|
||||
in.ListMeta.DeepCopyInto(&out.ListMeta)
|
||||
out.TypeMeta = in.TypeMeta
|
||||
if in.Items != nil {
|
||||
in, out := &in.Items, &out.Items
|
||||
*out = make([]CIDRAllocationPool, len(*in))
|
||||
for i := range *in {
|
||||
(*in)[i].DeepCopyInto(&(*out)[i])
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CIDRAllocationPoolList.
|
||||
func (in *CIDRAllocationPoolList) DeepCopy() *CIDRAllocationPoolList {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(CIDRAllocationPoolList)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
|
||||
func (in *CIDRAllocationPoolList) DeepCopyObject() runtime.Object {
|
||||
if c := in.DeepCopy(); c != nil {
|
||||
return c
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *CIDRAllocationPoolSpec) DeepCopyInto(out *CIDRAllocationPoolSpec) {
|
||||
*out = *in
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CIDRAllocationPoolSpec.
|
||||
func (in *CIDRAllocationPoolSpec) DeepCopy() *CIDRAllocationPoolSpec {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(CIDRAllocationPoolSpec)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *CIDRAllocationPoolStatus) DeepCopyInto(out *CIDRAllocationPoolStatus) {
|
||||
*out = *in
|
||||
if in.Pool != nil {
|
||||
in, out := &in.Pool, &out.Pool
|
||||
*out = make([]Allocation, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CIDRAllocationPoolStatus.
|
||||
func (in *CIDRAllocationPoolStatus) DeepCopy() *CIDRAllocationPoolStatus {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(CIDRAllocationPoolStatus)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *Cluster) DeepCopyInto(out *Cluster) {
|
||||
*out = *in
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
func Agent(cluster *v1alpha1.Cluster) *apps.Deployment {
|
||||
image := util.K3SImage(cluster)
|
||||
|
||||
name := "k3k-agent"
|
||||
const name = "k3k-agent"
|
||||
|
||||
return &apps.Deployment{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
@@ -47,6 +47,7 @@ func Agent(cluster *v1alpha1.Cluster) *apps.Deployment {
|
||||
|
||||
func agentPodSpec(image, name string, args []string) v1.PodSpec {
|
||||
privileged := true
|
||||
|
||||
return v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
|
||||
120
pkg/controller/cluster/cidr_allocation.go
Normal file
120
pkg/controller/cluster/cidr_allocation.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/galal-hussein/k3k/pkg/apis/k3k.io/v1alpha1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
const (
|
||||
cidrAllocationClusterPoolName = "k3k-cluster-cidr-allocation-pool"
|
||||
cidrAllocationServicePoolName = "k3k-service-cidr-allocation-pool"
|
||||
|
||||
defaultClusterCIDR = "10.44.0.0/16"
|
||||
defaultClusterServiceCIDR = "10.45.0.0/16"
|
||||
)
|
||||
|
||||
// determineOctet dertermines the octet for the
|
||||
// given mask bits of a subnet.
|
||||
func determineOctet(mb int) uint8 {
|
||||
switch {
|
||||
case mb <= 8:
|
||||
return 1
|
||||
case mb >= 8 && mb <= 16:
|
||||
return 2
|
||||
case mb >= 8 && mb <= 24:
|
||||
return 3
|
||||
case mb >= 8 && mb <= 32:
|
||||
return 4
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// generateSubnets generates all subnets for the given CIDR.
|
||||
func generateSubnets(cidr string) ([]string, error) {
|
||||
_, ipNet, err := net.ParseCIDR(cidr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
usedBits, _ := ipNet.Mask.Size()
|
||||
|
||||
octet := determineOctet(usedBits)
|
||||
|
||||
ip := ipNet.IP.To4()
|
||||
octetVal := ip[octet-1]
|
||||
|
||||
var subnets []string
|
||||
|
||||
for i := octetVal; i < 254; i++ {
|
||||
octetVal++
|
||||
ip[octet-1] = octetVal
|
||||
subnets = append(subnets, fmt.Sprintf("%s/%d", ip, usedBits))
|
||||
}
|
||||
|
||||
return subnets, nil
|
||||
}
|
||||
|
||||
// nextCIDR retrieves the next available CIDR address from the given pool.
|
||||
func (c *ClusterReconciler) nextCIDR(ctx context.Context, cidrAllocationPoolName, clusterName string) (*net.IPNet, error) {
|
||||
var cidrPool v1alpha1.CIDRAllocationPool
|
||||
|
||||
nn := types.NamespacedName{
|
||||
Name: cidrAllocationPoolName,
|
||||
}
|
||||
if err := c.Client.Get(ctx, nn, &cidrPool); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ipNet *net.IPNet
|
||||
|
||||
for i := 0; i < len(cidrPool.Status.Pool); i++ {
|
||||
if cidrPool.Status.Pool[i].ClusterName == "" && cidrPool.Status.Pool[i].Issued == 0 {
|
||||
cidrPool.Status.Pool[i].ClusterName = clusterName
|
||||
cidrPool.Status.Pool[i].Issued = time.Now().Unix()
|
||||
|
||||
_, ipn, err := net.ParseCIDR(cidrPool.Status.Pool[i].IPNet)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := c.Client.Status().Update(ctx, &cidrPool); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ipNet = ipn
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return ipNet, nil
|
||||
}
|
||||
|
||||
// releaseCIDR updates the given CIDR pool by marking the address as available.
|
||||
func (c *ClusterReconciler) releaseCIDR(ctx context.Context, cidrAllocationPoolName, clusterName string) error {
|
||||
var cidrPool v1alpha1.CIDRAllocationPool
|
||||
|
||||
nn := types.NamespacedName{
|
||||
Name: cidrAllocationPoolName,
|
||||
}
|
||||
if err := c.Client.Get(ctx, nn, &cidrPool); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < len(cidrPool.Status.Pool); i++ {
|
||||
if cidrPool.Status.Pool[i].ClusterName == clusterName {
|
||||
cidrPool.Status.Pool[i].ClusterName = ""
|
||||
cidrPool.Status.Pool[i].Issued = 0
|
||||
}
|
||||
|
||||
if err := c.Client.Status().Update(ctx, &cidrPool); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
func AgentConfig(cluster *v1alpha1.Cluster, serviceIP string) v1.Secret {
|
||||
config := agentConfigData(serviceIP, cluster.Spec.Token)
|
||||
|
||||
return v1.Secret{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "Secret",
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/galal-hussein/k3k/pkg/apis/k3k.io/v1alpha1"
|
||||
"github.com/galal-hussein/k3k/pkg/controller/util"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
@@ -35,32 +33,29 @@ func ServerConfig(cluster *v1alpha1.Cluster, init bool, serviceIP string) (*v1.S
|
||||
}
|
||||
|
||||
func serverConfigData(serviceIP string, cluster *v1alpha1.Cluster) string {
|
||||
opts := serverOptions(cluster)
|
||||
return fmt.Sprintf(`cluster-init: true
|
||||
server: https://%s:6443
|
||||
%s`, serviceIP, opts)
|
||||
return "cluster-init: true\nserver: https://" + serviceIP + ":6443" + serverOptions(cluster)
|
||||
}
|
||||
|
||||
func initConfigData(cluster *v1alpha1.Cluster) string {
|
||||
opts := serverOptions(cluster)
|
||||
return fmt.Sprintf(`cluster-init: true
|
||||
%s`, opts)
|
||||
return "cluster-init: true\n" + serverOptions(cluster)
|
||||
}
|
||||
|
||||
func serverOptions(cluster *v1alpha1.Cluster) string {
|
||||
opts := ""
|
||||
var opts string
|
||||
|
||||
// TODO: generate token if not found
|
||||
if cluster.Spec.Token != "" {
|
||||
opts = fmt.Sprintf("token: %s\n", cluster.Spec.Token)
|
||||
opts = "token: " + cluster.Spec.Token + "\n"
|
||||
}
|
||||
if cluster.Spec.ClusterCIDR != "" {
|
||||
opts = fmt.Sprintf("%scluster-cidr: %s\n", opts, cluster.Spec.ClusterCIDR)
|
||||
opts = opts + "cluster-cidr: " + cluster.Spec.ClusterCIDR + "\n"
|
||||
}
|
||||
if cluster.Spec.ServiceCIDR != "" {
|
||||
opts = fmt.Sprintf("%sservice-cidr: %s\n", opts, cluster.Spec.ServiceCIDR)
|
||||
opts = opts + "service-cidr: " + cluster.Spec.ServiceCIDR + "\n"
|
||||
}
|
||||
if cluster.Spec.ClusterDNS != "" {
|
||||
opts = fmt.Sprintf("%scluster-dns: %s\n", opts, cluster.Spec.ClusterDNS)
|
||||
opts = opts + "cluster-dns: " + cluster.Spec.ClusterDNS + "\n"
|
||||
}
|
||||
|
||||
return opts
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/galal-hussein/k3k/pkg/apis/k3k.io/v1alpha1"
|
||||
"github.com/galal-hussein/k3k/pkg/controller/cluster/agent"
|
||||
@@ -24,8 +23,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
ClusterController = "k3k-cluster-controller"
|
||||
ClusterFinalizerName = "cluster.k3k.io/finalizer"
|
||||
clusterController = "k3k-cluster-controller"
|
||||
clusterFinalizerName = "cluster.k3k.io/finalizer"
|
||||
)
|
||||
|
||||
type ClusterReconciler struct {
|
||||
@@ -34,94 +33,182 @@ type ClusterReconciler struct {
|
||||
}
|
||||
|
||||
// Add adds a new controller to the manager
|
||||
func Add(mgr manager.Manager) error {
|
||||
func Add(ctx context.Context, mgr manager.Manager) error {
|
||||
// initialize a new Reconciler
|
||||
reconciler := ClusterReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Scheme: mgr.GetScheme(),
|
||||
}
|
||||
|
||||
// create a new controller and add it to the manager
|
||||
//this can be replaced by the new builder functionality in controller-runtime
|
||||
controller, err := controller.New(ClusterController, mgr, controller.Options{
|
||||
Reconciler: &reconciler,
|
||||
MaxConcurrentReconciles: 1,
|
||||
})
|
||||
|
||||
clusterSubnets, err := generateSubnets(defaultClusterCIDR)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := controller.Watch(&source.Kind{Type: &v1alpha1.Cluster{}},
|
||||
&handler.EnqueueRequestForObject{}); err != nil {
|
||||
var clusterSubnetAllocations []v1alpha1.Allocation
|
||||
for _, cs := range clusterSubnets {
|
||||
clusterSubnetAllocations = append(clusterSubnetAllocations, v1alpha1.Allocation{
|
||||
IPNet: cs,
|
||||
})
|
||||
}
|
||||
|
||||
cidrClusterPool := v1alpha1.CIDRAllocationPool{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: cidrAllocationClusterPoolName,
|
||||
},
|
||||
Spec: v1alpha1.CIDRAllocationPoolSpec{
|
||||
DefaultClusterCIDR: defaultClusterCIDR,
|
||||
},
|
||||
Status: v1alpha1.CIDRAllocationPoolStatus{
|
||||
Pool: clusterSubnetAllocations,
|
||||
},
|
||||
}
|
||||
if err := reconciler.Client.Create(ctx, &cidrClusterPool); err != nil {
|
||||
if !apierrors.IsConflict(err) {
|
||||
// return nil since the resource has
|
||||
// already been created
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
clusterServiceSubnets, err := generateSubnets(defaultClusterServiceCIDR)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var clusterServiceSubnetAllocations []v1alpha1.Allocation
|
||||
for _, ss := range clusterServiceSubnets {
|
||||
clusterServiceSubnetAllocations = append(clusterServiceSubnetAllocations, v1alpha1.Allocation{
|
||||
IPNet: ss,
|
||||
})
|
||||
}
|
||||
|
||||
cidrServicePool := v1alpha1.CIDRAllocationPool{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: cidrAllocationServicePoolName,
|
||||
},
|
||||
Spec: v1alpha1.CIDRAllocationPoolSpec{
|
||||
DefaultClusterCIDR: defaultClusterCIDR,
|
||||
},
|
||||
Status: v1alpha1.CIDRAllocationPoolStatus{
|
||||
Pool: clusterServiceSubnetAllocations,
|
||||
},
|
||||
}
|
||||
if err := reconciler.Client.Create(ctx, &cidrServicePool); err != nil {
|
||||
if !apierrors.IsConflict(err) {
|
||||
// return nil since the resource has
|
||||
// already been created
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// create a new controller and add it to the manager
|
||||
//this can be replaced by the new builder functionality in controller-runtime
|
||||
controller, err := controller.New(clusterController, mgr, controller.Options{
|
||||
Reconciler: &reconciler,
|
||||
MaxConcurrentReconciles: 1,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return controller.Watch(&source.Kind{Type: &v1alpha1.Cluster{}}, &handler.EnqueueRequestForObject{})
|
||||
}
|
||||
|
||||
func (r *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
|
||||
cluster := &v1alpha1.Cluster{}
|
||||
func (c *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
|
||||
var cluster v1alpha1.Cluster
|
||||
|
||||
if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
|
||||
if err := c.Client.Get(ctx, req.NamespacedName, &cluster); err != nil {
|
||||
return reconcile.Result{}, client.IgnoreNotFound(err)
|
||||
}
|
||||
|
||||
if cluster.DeletionTimestamp.IsZero() {
|
||||
if !controllerutil.ContainsFinalizer(cluster, ClusterFinalizerName) {
|
||||
controllerutil.AddFinalizer(cluster, ClusterFinalizerName)
|
||||
if err := r.Client.Update(ctx, cluster); err != nil {
|
||||
if !controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) {
|
||||
controllerutil.AddFinalizer(&cluster, clusterFinalizerName)
|
||||
if err := c.Client.Update(ctx, &cluster); err != nil {
|
||||
return reconcile.Result{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// we create a namespace for each new cluster
|
||||
ns := &v1.Namespace{}
|
||||
if err := r.Client.Get(ctx, client.ObjectKey{Name: util.ClusterNamespace(cluster)}, ns); err != nil {
|
||||
var ns v1.Namespace
|
||||
objKey := client.ObjectKey{
|
||||
Name: util.ClusterNamespace(&cluster),
|
||||
}
|
||||
if err := c.Client.Get(ctx, objKey, &ns); err != nil {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
return reconcile.Result{},
|
||||
util.WrapErr(fmt.Sprintf("failed to get cluster namespace %s", util.ClusterNamespace(cluster)), err)
|
||||
return reconcile.Result{}, util.WrapErr("failed to get cluster namespace "+util.ClusterNamespace(&cluster), err)
|
||||
}
|
||||
}
|
||||
|
||||
klog.Infof("enqueue cluster [%s]", cluster.Name)
|
||||
return reconcile.Result{}, r.createCluster(ctx, cluster)
|
||||
|
||||
return reconcile.Result{}, c.createCluster(ctx, &cluster)
|
||||
}
|
||||
if controllerutil.ContainsFinalizer(cluster, ClusterFinalizerName) {
|
||||
|
||||
if controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) {
|
||||
// TODO: handle CIDR deletion
|
||||
if err := c.releaseCIDR(ctx, cluster.Status.ClusterCIDR, cluster.Name); err != nil {
|
||||
return reconcile.Result{}, err
|
||||
}
|
||||
|
||||
// remove our finalizer from the list and update it.
|
||||
controllerutil.RemoveFinalizer(cluster, ClusterFinalizerName)
|
||||
if err := r.Client.Update(ctx, cluster); err != nil {
|
||||
controllerutil.RemoveFinalizer(&cluster, clusterFinalizerName)
|
||||
if err := c.Client.Update(ctx, &cluster); err != nil {
|
||||
return reconcile.Result{}, err
|
||||
}
|
||||
}
|
||||
klog.Infof("deleting cluster [%s]", cluster.Name)
|
||||
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
|
||||
func (r *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1.Cluster) error {
|
||||
func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1.Cluster) error {
|
||||
// create a new namespace for the cluster
|
||||
if err := r.createNamespace(ctx, cluster); err != nil {
|
||||
if err := c.createNamespace(ctx, cluster); err != nil {
|
||||
return util.WrapErr("failed to create ns", err)
|
||||
}
|
||||
|
||||
serviceIP, err := r.createClusterService(ctx, cluster)
|
||||
if cluster.Spec.ClusterCIDR == "" && cluster.Status.ClusterCIDR == "" {
|
||||
clusterCIDR, err := c.nextCIDR(ctx, cidrAllocationClusterPoolName, cluster.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cluster.Status.ClusterCIDR = clusterCIDR.String()
|
||||
}
|
||||
|
||||
if cluster.Spec.ServiceCIDR == "" && cluster.Status.ServiceCIDR == "" {
|
||||
serviceCIDR, err := c.nextCIDR(ctx, cidrAllocationServicePoolName, cluster.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cluster.Status.ServiceCIDR = serviceCIDR.String()
|
||||
}
|
||||
|
||||
serviceIP, err := c.createClusterService(ctx, cluster)
|
||||
if err != nil {
|
||||
return util.WrapErr("failed to create cluster service", err)
|
||||
}
|
||||
|
||||
if err := r.createClusterConfigs(ctx, cluster, serviceIP); err != nil {
|
||||
if err := c.createClusterConfigs(ctx, cluster, serviceIP); err != nil {
|
||||
return util.WrapErr("failed to create cluster configs", err)
|
||||
}
|
||||
|
||||
if err := r.createDeployments(ctx, cluster); err != nil {
|
||||
if err := c.createDeployments(ctx, cluster); err != nil {
|
||||
return util.WrapErr("failed to create servers and agents deployment", err)
|
||||
}
|
||||
|
||||
if cluster.Spec.Expose.Ingress.Enabled {
|
||||
serverIngress, err := server.Ingress(ctx, cluster, r.Client)
|
||||
serverIngress, err := server.Ingress(ctx, cluster, c.Client)
|
||||
if err != nil {
|
||||
return util.WrapErr("failed to create ingress object", err)
|
||||
}
|
||||
if err := r.Client.Create(ctx, serverIngress); err != nil {
|
||||
|
||||
if err := c.Client.Create(ctx, serverIngress); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
return util.WrapErr("failed to create server ingress", err)
|
||||
}
|
||||
@@ -132,25 +219,28 @@ func (r *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1
|
||||
if err != nil {
|
||||
return util.WrapErr("failed to generate new kubeconfig", err)
|
||||
}
|
||||
if err := r.Client.Create(ctx, kubeconfigSecret); err != nil {
|
||||
|
||||
if err := c.Client.Create(ctx, kubeconfigSecret); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
return util.WrapErr("failed to create kubeconfig secret", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
return c.Client.Update(ctx, cluster)
|
||||
}
|
||||
|
||||
func (r *ClusterReconciler) createNamespace(ctx context.Context, cluster *v1alpha1.Cluster) error {
|
||||
func (c *ClusterReconciler) createNamespace(ctx context.Context, cluster *v1alpha1.Cluster) error {
|
||||
// create a new namespace for the cluster
|
||||
namespace := &v1.Namespace{
|
||||
namespace := v1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: util.ClusterNamespace(cluster),
|
||||
},
|
||||
}
|
||||
if err := controllerutil.SetControllerReference(cluster, namespace, r.Scheme); err != nil {
|
||||
if err := controllerutil.SetControllerReference(cluster, &namespace, c.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.Client.Create(ctx, namespace); err != nil {
|
||||
|
||||
if err := c.Client.Create(ctx, &namespace); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
return util.WrapErr("failed to create ns", err)
|
||||
}
|
||||
@@ -159,18 +249,18 @@ func (r *ClusterReconciler) createNamespace(ctx context.Context, cluster *v1alph
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ClusterReconciler) createClusterConfigs(ctx context.Context, cluster *v1alpha1.Cluster, serviceIP string) error {
|
||||
func (c *ClusterReconciler) createClusterConfigs(ctx context.Context, cluster *v1alpha1.Cluster, serviceIP string) error {
|
||||
// create init node config
|
||||
initServerConfig, err := config.ServerConfig(cluster, true, serviceIP)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := controllerutil.SetControllerReference(cluster, initServerConfig, r.Scheme); err != nil {
|
||||
if err := controllerutil.SetControllerReference(cluster, initServerConfig, c.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.Client.Create(ctx, initServerConfig); err != nil {
|
||||
if err := c.Client.Create(ctx, initServerConfig); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
@@ -181,10 +271,10 @@ func (r *ClusterReconciler) createClusterConfigs(ctx context.Context, cluster *v
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := controllerutil.SetControllerReference(cluster, serverConfig, r.Scheme); err != nil {
|
||||
if err := controllerutil.SetControllerReference(cluster, serverConfig, c.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.Client.Create(ctx, serverConfig); err != nil {
|
||||
if err := c.Client.Create(ctx, serverConfig); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
@@ -192,52 +282,54 @@ func (r *ClusterReconciler) createClusterConfigs(ctx context.Context, cluster *v
|
||||
|
||||
// create agents configuration
|
||||
agentsConfig := config.AgentConfig(cluster, serviceIP)
|
||||
if err := controllerutil.SetControllerReference(cluster, &agentsConfig, r.Scheme); err != nil {
|
||||
if err := controllerutil.SetControllerReference(cluster, &agentsConfig, c.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.Client.Create(ctx, &agentsConfig); err != nil {
|
||||
if err := c.Client.Create(ctx, &agentsConfig); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ClusterReconciler) createClusterService(ctx context.Context, cluster *v1alpha1.Cluster) (string, error) {
|
||||
func (c *ClusterReconciler) createClusterService(ctx context.Context, cluster *v1alpha1.Cluster) (string, error) {
|
||||
// create cluster service
|
||||
clusterService := server.Service(cluster)
|
||||
|
||||
if err := controllerutil.SetControllerReference(cluster, clusterService, r.Scheme); err != nil {
|
||||
if err := controllerutil.SetControllerReference(cluster, clusterService, c.Scheme); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err := r.Client.Create(ctx, clusterService); err != nil {
|
||||
if err := c.Client.Create(ctx, clusterService); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
service := v1.Service{}
|
||||
if err := r.Client.Get(ctx,
|
||||
client.ObjectKey{
|
||||
Namespace: util.ClusterNamespace(cluster),
|
||||
Name: "k3k-server-service"},
|
||||
&service); err != nil {
|
||||
var service v1.Service
|
||||
|
||||
objKey := client.ObjectKey{
|
||||
Namespace: util.ClusterNamespace(cluster),
|
||||
Name: "k3k-server-service",
|
||||
}
|
||||
if err := c.Client.Get(ctx, objKey, &service); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return service.Spec.ClusterIP, nil
|
||||
}
|
||||
|
||||
func (r *ClusterReconciler) createDeployments(ctx context.Context, cluster *v1alpha1.Cluster) error {
|
||||
func (c *ClusterReconciler) createDeployments(ctx context.Context, cluster *v1alpha1.Cluster) error {
|
||||
// create deployment for the init server
|
||||
// the init deployment must have only 1 replica
|
||||
initServerDeployment := server.Server(cluster, true)
|
||||
|
||||
if err := controllerutil.SetControllerReference(cluster, initServerDeployment, r.Scheme); err != nil {
|
||||
if err := controllerutil.SetControllerReference(cluster, initServerDeployment, c.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.Client.Create(ctx, initServerDeployment); err != nil {
|
||||
if err := c.Client.Create(ctx, initServerDeployment); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
@@ -246,22 +338,22 @@ func (r *ClusterReconciler) createDeployments(ctx context.Context, cluster *v1al
|
||||
// create deployment for the rest of the servers
|
||||
serversDeployment := server.Server(cluster, false)
|
||||
|
||||
if err := controllerutil.SetControllerReference(cluster, serversDeployment, r.Scheme); err != nil {
|
||||
if err := controllerutil.SetControllerReference(cluster, serversDeployment, c.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.Client.Create(ctx, serversDeployment); err != nil {
|
||||
if err := c.Client.Create(ctx, serversDeployment); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
agentsDeployment := agent.Agent(cluster)
|
||||
if err := controllerutil.SetControllerReference(cluster, agentsDeployment, r.Scheme); err != nil {
|
||||
if err := controllerutil.SetControllerReference(cluster, agentsDeployment, c.Scheme); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.Client.Create(ctx, agentsDeployment); err != nil {
|
||||
if err := c.Client.Create(ctx, agentsDeployment); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
@@ -48,15 +47,14 @@ type content struct {
|
||||
func GenerateNewKubeConfig(ctx context.Context, cluster *v1alpha1.Cluster, ip string) (*v1.Secret, error) {
|
||||
token := cluster.Spec.Token
|
||||
|
||||
bootstrap := &controlRuntimeBootstrap{}
|
||||
err := retry.OnError(retry.DefaultBackoff, func(err error) bool {
|
||||
var bootstrap *controlRuntimeBootstrap
|
||||
if err := retry.OnError(retry.DefaultBackoff, func(err error) bool {
|
||||
return true
|
||||
}, func() error {
|
||||
var err error
|
||||
bootstrap, err = requestBootstrap(token, ip)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -65,10 +63,8 @@ func GenerateNewKubeConfig(ctx context.Context, cluster *v1alpha1.Cluster, ip st
|
||||
}
|
||||
|
||||
adminCert, adminKey, err := createClientCertKey(
|
||||
adminCommonName,
|
||||
[]string{user.SystemPrivilegedGroup},
|
||||
nil,
|
||||
[]x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
|
||||
adminCommonName, []string{user.SystemPrivilegedGroup},
|
||||
nil, []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
|
||||
bootstrap.ClientCA.Content,
|
||||
bootstrap.ClientCAKey.Content)
|
||||
if err != nil {
|
||||
@@ -109,7 +105,7 @@ func requestBootstrap(token, serverIP string) (*controlRuntimeBootstrap, error)
|
||||
Timeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -120,28 +116,16 @@ func requestBootstrap(token, serverIP string) (*controlRuntimeBootstrap, error)
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
|
||||
runtimeBootstrap := controlRuntimeBootstrap{}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(body, &runtimeBootstrap); err != nil {
|
||||
var runtimeBootstrap controlRuntimeBootstrap
|
||||
if err := json.NewDecoder(resp.Body).Decode(&runtimeBootstrap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &runtimeBootstrap, nil
|
||||
}
|
||||
|
||||
func createClientCertKey(
|
||||
commonName string,
|
||||
organization []string,
|
||||
altNames *certutil.AltNames,
|
||||
extKeyUsage []x509.ExtKeyUsage,
|
||||
caCert,
|
||||
caKey string) ([]byte, []byte, error) {
|
||||
|
||||
func createClientCertKey(commonName string, organization []string, altNames *certutil.AltNames, extKeyUsage []x509.ExtKeyUsage, caCert, caKey string) ([]byte, []byte, error) {
|
||||
caKeyPEM, err := certutil.ParsePrivateKeyPEM([]byte(caKey))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@@ -152,12 +136,12 @@ func createClientCertKey(
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
keyBytes, err := generateKey()
|
||||
b, err := generateKey()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
key, err := certutil.ParsePrivateKeyPEM(keyBytes)
|
||||
key, err := certutil.ParsePrivateKeyPEM(b)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@@ -175,7 +159,7 @@ func createClientCertKey(
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return append(certutil.EncodeCertPEM(cert), certutil.EncodeCertPEM(caCertPEM[0])...), keyBytes, nil
|
||||
return append(certutil.EncodeCertPEM(cert), certutil.EncodeCertPEM(caCertPEM[0])...), b, nil
|
||||
}
|
||||
|
||||
func generateKey() (data []byte, err error) {
|
||||
@@ -183,6 +167,7 @@ func generateKey() (data []byte, err error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error generating key: %v", err)
|
||||
}
|
||||
|
||||
return generatedData, nil
|
||||
}
|
||||
|
||||
@@ -210,6 +195,7 @@ func kubeconfig(url string, serverCA, clientCert, clientKey []byte) ([]byte, err
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return kubeconfig, nil
|
||||
}
|
||||
|
||||
@@ -225,23 +211,27 @@ func decodeBootstrap(bootstrap *controlRuntimeBootstrap) error {
|
||||
return err
|
||||
}
|
||||
bootstrap.ClientCA.Content = string(decoded)
|
||||
|
||||
//client-ca-key
|
||||
decoded, err = base64.StdEncoding.DecodeString(bootstrap.ClientCAKey.Content)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bootstrap.ClientCAKey.Content = string(decoded)
|
||||
|
||||
//server-ca
|
||||
decoded, err = base64.StdEncoding.DecodeString(bootstrap.ServerCA.Content)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bootstrap.ServerCA.Content = string(decoded)
|
||||
|
||||
//server-ca-key
|
||||
decoded, err = base64.StdEncoding.DecodeString(bootstrap.ServerCAKey.Content)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bootstrap.ServerCAKey.Content = string(decoded)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ func Server(cluster *v1alpha1.Cluster, init bool) *apps.Deployment {
|
||||
if init {
|
||||
replicas = 1
|
||||
}
|
||||
|
||||
return &apps.Deployment{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "Deployment",
|
||||
@@ -58,6 +59,7 @@ func Server(cluster *v1alpha1.Cluster, init bool) *apps.Deployment {
|
||||
|
||||
func serverPodSpec(image, name string, args []string) v1.PodSpec {
|
||||
privileged := true
|
||||
|
||||
return v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
|
||||
@@ -10,16 +10,16 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
NamespacePrefix = "k3k-"
|
||||
K3SImageName = "rancher/k3s"
|
||||
namespacePrefix = "k3k-"
|
||||
k3SImageName = "rancher/k3s"
|
||||
)
|
||||
|
||||
func ClusterNamespace(cluster *v1alpha1.Cluster) string {
|
||||
return NamespacePrefix + cluster.Name
|
||||
return namespacePrefix + cluster.Name
|
||||
}
|
||||
|
||||
func K3SImage(cluster *v1alpha1.Cluster) string {
|
||||
return K3SImageName + ":" + cluster.Spec.Version
|
||||
return k3SImageName + ":" + cluster.Spec.Version
|
||||
}
|
||||
|
||||
func WrapErr(errString string, err error) error {
|
||||
@@ -27,24 +27,10 @@ func WrapErr(errString string, err error) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// return all the nodes external addresses, if not found then return internal addresses
|
||||
func Addresses(ctx context.Context, client client.Client) ([]string, error) {
|
||||
addresses := []string{}
|
||||
nodeList := v1.NodeList{}
|
||||
if err := client.List(ctx, &nodeList); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func nodeAddress(node *v1.Node) string {
|
||||
var externalIP string
|
||||
var internalIP string
|
||||
|
||||
for _, node := range nodeList.Items {
|
||||
addresses = append(addresses, getNodeAddress(&node))
|
||||
}
|
||||
|
||||
return addresses, nil
|
||||
}
|
||||
|
||||
func getNodeAddress(node *v1.Node) string {
|
||||
externalIP := ""
|
||||
internalIP := ""
|
||||
for _, ip := range node.Status.Addresses {
|
||||
if ip.Type == "ExternalIP" && ip.Address != "" {
|
||||
externalIP = ip.Address
|
||||
@@ -59,3 +45,19 @@ func getNodeAddress(node *v1.Node) string {
|
||||
|
||||
return internalIP
|
||||
}
|
||||
|
||||
// return all the nodes external addresses, if not found then return internal addresses
|
||||
func Addresses(ctx context.Context, client client.Client) ([]string, error) {
|
||||
var nodeList v1.NodeList
|
||||
if err := client.List(ctx, &nodeList); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
addresses := make([]string, len(nodeList.Items))
|
||||
|
||||
for _, node := range nodeList.Items {
|
||||
addresses = append(addresses, nodeAddress(&node))
|
||||
}
|
||||
|
||||
return addresses, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user