Mirror host nodes (#389)

* mirror host nodes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* add mirror host nodes feature

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Add controllername to secrets/configmap syncer

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* golint

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* wsl

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* wsl

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* wsl

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* build docs

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* setting controller namespace env

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* fix typo

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Add a controller_namespace env to the test

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* fix tests

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* fix tests

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Add mirrorHostNodes spec to conformance tests

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Fixes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* Fixes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* fixes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* wsl

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* change the ptr int to int

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* fix map key name

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

---------

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
This commit is contained in:
Hussein Galal
2025-07-08 14:48:24 +03:00
committed by GitHub
parent 57263bd10e
commit fcc875ab85
25 changed files with 552 additions and 75 deletions

View File

@@ -0,0 +1,257 @@
package agent
import (
"context"
"fmt"
"os"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
ctrl "sigs.k8s.io/controller-runtime"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
const (
kubeletPortRangeConfigMapName = "k3k-kubelet-port-range"
webhookPortRangeConfigMapName = "k3k-webhook-port-range"
)
type PortAllocator struct {
KubeletCM *v1.ConfigMap
WebhookCM *v1.ConfigMap
}
func NewPortAllocator(ctx context.Context, client ctrlruntimeclient.Client, kubeletPortRange, webhookPortRange string) (*PortAllocator, error) {
log := ctrl.LoggerFrom(ctx)
log.Info("starting port allocator")
var kubeletPortRangeCM, webhookPortRangeCM v1.ConfigMap
portRangeConfigMapNamespace := os.Getenv("CONTROLLER_NAMESPACE")
if portRangeConfigMapNamespace == "" {
return nil, fmt.Errorf("failed to find k3k controller namespace")
}
kubeletPortRangeCM.Name = kubeletPortRangeConfigMapName
kubeletPortRangeCM.Namespace = portRangeConfigMapNamespace
webhookPortRangeCM.Name = webhookPortRangeConfigMapName
webhookPortRangeCM.Namespace = portRangeConfigMapNamespace
return &PortAllocator{
KubeletCM: &kubeletPortRangeCM,
WebhookCM: &webhookPortRangeCM,
}, nil
}
func (a *PortAllocator) InitPortAllocatorConfig(ctx context.Context, client ctrlruntimeclient.Client, kubeletPortRange, webhookPortRange string) manager.Runnable {
return manager.RunnableFunc(func(ctx context.Context) error {
if err := a.getOrCreate(ctx, client, a.KubeletCM, kubeletPortRange); err != nil {
return err
}
if err := a.getOrCreate(ctx, client, a.WebhookCM, webhookPortRange); err != nil {
return err
}
return nil
})
}
func (a *PortAllocator) cm(name, namespace, portRange string) *v1.ConfigMap {
return &v1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Data: map[string]string{
"range": portRange,
"allocatedPorts": "",
},
BinaryData: map[string][]byte{
"snapshotData": []byte(""),
},
}
}
func (a *PortAllocator) getOrCreate(ctx context.Context, client ctrlruntimeclient.Client, configmap *v1.ConfigMap, portRange string) error {
nn := types.NamespacedName{
Name: configmap.Name,
Namespace: configmap.Namespace,
}
if err := client.Get(ctx, nn, configmap); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
// creating the configMap for the first time
configmap = a.cm(configmap.Name, configmap.Namespace, portRange)
if err := client.Create(ctx, configmap); err != nil {
return fmt.Errorf("failed to create port range configmap: %w", err)
}
}
return nil
}
func (a *PortAllocator) AllocateWebhookPort(ctx context.Context, cfg *Config) (int, error) {
return a.allocatePort(ctx, cfg, a.WebhookCM)
}
func (a *PortAllocator) DeallocateWebhookPort(ctx context.Context, client ctrlruntimeclient.Client, clusterName, clusterNamespace string, webhookPort int) error {
return a.deallocatePort(ctx, client, clusterName, clusterNamespace, a.WebhookCM, webhookPort)
}
func (a *PortAllocator) AllocateKubeletPort(ctx context.Context, cfg *Config) (int, error) {
return a.allocatePort(ctx, cfg, a.KubeletCM)
}
func (a *PortAllocator) DeallocateKubeletPort(ctx context.Context, client ctrlruntimeclient.Client, clusterName, clusterNamespace string, kubeletPort int) error {
return a.deallocatePort(ctx, client, clusterName, clusterNamespace, a.KubeletCM, kubeletPort)
}
// allocatePort will assign port to the cluster from a port Range configured for k3k
func (a *PortAllocator) allocatePort(ctx context.Context, cfg *Config, configMap *v1.ConfigMap) (int, error) {
portRange, ok := configMap.Data["range"]
if !ok {
return 0, fmt.Errorf("port range is not initialized")
}
// get configMap first to avoid conflicts
if err := a.getOrCreate(ctx, cfg.client, configMap, portRange); err != nil {
return 0, err
}
clusterNamespaceName := cfg.cluster.Namespace + "/" + cfg.cluster.Name
portsMap, err := parsePortMap(configMap.Data["allocatedPorts"])
if err != nil {
return 0, err
}
if _, ok := portsMap[clusterNamespaceName]; ok {
return portsMap[clusterNamespaceName], nil
}
// allocate a new port and save the snapshot
snapshot := core.RangeAllocation{
Range: configMap.Data["range"],
Data: configMap.BinaryData["snapshotData"],
}
pa, err := portallocator.NewFromSnapshot(&snapshot)
if err != nil {
return 0, err
}
next, err := pa.AllocateNext()
if err != nil {
return 0, err
}
portsMap[clusterNamespaceName] = next
if err := saveSnapshot(pa, &snapshot, configMap, portsMap); err != nil {
return 0, err
}
if err := cfg.client.Update(ctx, configMap); err != nil {
return 0, err
}
return next, nil
}
// deallocatePort will remove the port used by the cluster from the port range
func (a *PortAllocator) deallocatePort(ctx context.Context, client ctrlruntimeclient.Client, clusterName, clusterNamespace string, configMap *v1.ConfigMap, port int) error {
portRange, ok := configMap.Data["range"]
if !ok {
return fmt.Errorf("port range is not initialized")
}
if err := a.getOrCreate(ctx, client, configMap, portRange); err != nil {
return err
}
clusterNamespaceName := clusterNamespace + "/" + clusterName
portsMap, err := parsePortMap(configMap.Data["allocatedPorts"])
if err != nil {
return err
}
// check if the cluster already exists in the configMap
if usedPort, ok := portsMap[clusterNamespaceName]; ok {
if usedPort != port {
return fmt.Errorf("port %d does not match used port %d for the cluster", port, usedPort)
}
snapshot := core.RangeAllocation{
Range: configMap.Data["range"],
Data: configMap.BinaryData["snapshotData"],
}
pa, err := portallocator.NewFromSnapshot(&snapshot)
if err != nil {
return err
}
if err := pa.Release(port); err != nil {
return err
}
delete(portsMap, clusterNamespaceName)
if err := saveSnapshot(pa, &snapshot, configMap, portsMap); err != nil {
return err
}
}
return client.Update(ctx, configMap)
}
// parsePortMap will convert ConfigMap Data to a portMap of string keys and values of ints
func parsePortMap(portMapData string) (map[string]int, error) {
portMap := make(map[string]int)
if err := yaml.Unmarshal([]byte(portMapData), &portMap); err != nil {
return nil, fmt.Errorf("failed to parse allocatedPorts: %w", err)
}
return portMap, nil
}
// serializePortMap will convert a portMap of string keys and values of ints to ConfigMap Data
func serializePortMap(m map[string]int) (string, error) {
out, err := yaml.Marshal(m)
if err != nil {
return "", fmt.Errorf("failed to serialize allocatedPorts: %w", err)
}
return string(out), nil
}
func saveSnapshot(portAllocator *portallocator.PortAllocator, snapshot *core.RangeAllocation, configMap *v1.ConfigMap, portsMap map[string]int) error {
// save the new snapshot
if err := portAllocator.Snapshot(snapshot); err != nil {
return err
}
// update the configmap with the new portsMap and the new snapshot
configMap.BinaryData["snapshotData"] = snapshot.Data
configMap.Data["range"] = snapshot.Range
allocatedPortsData, err := serializePortMap(portsMap)
if err != nil {
return err
}
configMap.Data["allocatedPorts"] = allocatedPortsData
return nil
}

View File

@@ -34,15 +34,19 @@ type SharedAgent struct {
image string
imagePullPolicy string
token string
kubeletPort int
webhookPort int
}
func NewSharedAgent(config *Config, serviceIP, image, imagePullPolicy, token string) *SharedAgent {
func NewSharedAgent(config *Config, serviceIP, image, imagePullPolicy, token string, kubeletPort, webhookPort int) *SharedAgent {
return &SharedAgent{
Config: config,
serviceIP: serviceIP,
image: image,
imagePullPolicy: imagePullPolicy,
token: token,
kubeletPort: kubeletPort,
webhookPort: webhookPort,
}
}
@@ -72,7 +76,7 @@ func (s *SharedAgent) ensureObject(ctx context.Context, obj ctrlruntimeclient.Ob
}
func (s *SharedAgent) config(ctx context.Context) error {
config := sharedAgentData(s.cluster, s.Name(), s.token, s.serviceIP)
config := sharedAgentData(s.cluster, s.Name(), s.token, s.serviceIP, s.kubeletPort, s.webhookPort)
configSecret := &v1.Secret{
TypeMeta: metav1.TypeMeta{
@@ -91,7 +95,7 @@ func (s *SharedAgent) config(ctx context.Context) error {
return s.ensureObject(ctx, configSecret)
}
func sharedAgentData(cluster *v1alpha1.Cluster, serviceName, token, ip string) string {
func sharedAgentData(cluster *v1alpha1.Cluster, serviceName, token, ip string, kubeletPort, webhookPort int) string {
version := cluster.Spec.Version
if cluster.Spec.Version == "" {
version = cluster.Status.HostVersion
@@ -101,9 +105,12 @@ func sharedAgentData(cluster *v1alpha1.Cluster, serviceName, token, ip string) s
clusterNamespace: %s
serverIP: %s
serviceName: %s
token: %s
version: %s`,
cluster.Name, cluster.Namespace, ip, serviceName, token, version)
token: %v
mirrorHostNodes: %t
version: %s
webhookPort: %d
kubeletPort: %d`,
cluster.Name, cluster.Namespace, ip, serviceName, token, cluster.Spec.MirrorHostNodes, version, webhookPort, kubeletPort)
}
func (s *SharedAgent) daemonset(ctx context.Context) error {
@@ -140,7 +147,17 @@ func (s *SharedAgent) daemonset(ctx context.Context) error {
}
func (s *SharedAgent) podSpec() v1.PodSpec {
hostNetwork := false
dnsPolicy := v1.DNSClusterFirst
if s.cluster.Spec.MirrorHostNodes {
hostNetwork = true
dnsPolicy = v1.DNSClusterFirstWithHostNet
}
return v1.PodSpec{
HostNetwork: hostNetwork,
DNSPolicy: dnsPolicy,
ServiceAccountName: s.Name(),
NodeSelector: s.cluster.Spec.NodeSelector,
Volumes: []v1.Volume{
@@ -203,6 +220,15 @@ func (s *SharedAgent) podSpec() v1.PodSpec {
},
},
},
{
Name: "POD_IP",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "status.podIP",
},
},
},
}, s.cluster.Spec.AgentEnvs...),
VolumeMounts: []v1.VolumeMount{
{
@@ -217,10 +243,15 @@ func (s *SharedAgent) podSpec() v1.PodSpec {
},
},
Ports: []v1.ContainerPort{
{
Name: "kubelet-port",
Protocol: v1.ProtocolTCP,
ContainerPort: int32(s.kubeletPort),
},
{
Name: "webhook-port",
Protocol: v1.ProtocolTCP,
ContainerPort: 9443,
ContainerPort: int32(s.webhookPort),
},
},
},
@@ -249,13 +280,13 @@ func (s *SharedAgent) service(ctx context.Context) error {
{
Name: "k3s-kubelet-port",
Protocol: v1.ProtocolTCP,
Port: 10250,
Port: int32(s.kubeletPort),
},
{
Name: "webhook-server",
Protocol: v1.ProtocolTCP,
Port: 9443,
TargetPort: intstr.FromInt32(9443),
Port: int32(s.webhookPort),
TargetPort: intstr.FromInt32(int32(s.webhookPort)),
},
},
},

View File

@@ -14,6 +14,8 @@ func Test_sharedAgentData(t *testing.T) {
cluster *v1alpha1.Cluster
serviceName string
ip string
kubeletPort int
webhookPort int
token string
}
@@ -34,6 +36,8 @@ func Test_sharedAgentData(t *testing.T) {
Version: "v1.2.3",
},
},
kubeletPort: 10250,
webhookPort: 9443,
ip: "10.0.0.21",
serviceName: "service-name",
token: "dnjklsdjnksd892389238",
@@ -45,6 +49,9 @@ func Test_sharedAgentData(t *testing.T) {
"serviceName": "service-name",
"token": "dnjklsdjnksd892389238",
"version": "v1.2.3",
"mirrorHostNodes": "false",
"kubeletPort": "10250",
"webhookPort": "9443",
},
},
{
@@ -63,6 +70,8 @@ func Test_sharedAgentData(t *testing.T) {
},
},
ip: "10.0.0.21",
kubeletPort: 10250,
webhookPort: 9443,
serviceName: "service-name",
token: "dnjklsdjnksd892389238",
},
@@ -73,6 +82,9 @@ func Test_sharedAgentData(t *testing.T) {
"serviceName": "service-name",
"token": "dnjklsdjnksd892389238",
"version": "v1.2.3",
"mirrorHostNodes": "false",
"kubeletPort": "10250",
"webhookPort": "9443",
},
},
{
@@ -87,6 +99,8 @@ func Test_sharedAgentData(t *testing.T) {
HostVersion: "v1.3.3",
},
},
kubeletPort: 10250,
webhookPort: 9443,
ip: "10.0.0.21",
serviceName: "service-name",
token: "dnjklsdjnksd892389238",
@@ -98,13 +112,16 @@ func Test_sharedAgentData(t *testing.T) {
"serviceName": "service-name",
"token": "dnjklsdjnksd892389238",
"version": "v1.3.3",
"mirrorHostNodes": "false",
"kubeletPort": "10250",
"webhookPort": "9443",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := sharedAgentData(tt.args.cluster, tt.args.serviceName, tt.args.token, tt.args.ip)
config := sharedAgentData(tt.args.cluster, tt.args.serviceName, tt.args.token, tt.args.ip, tt.args.kubeletPort, tt.args.webhookPort)
data := make(map[string]string)
err := yaml.Unmarshal([]byte(config), data)