mirror of
https://github.com/rancher/k3k.git
synced 2026-03-17 09:00:35 +00:00
Compare commits
7 Commits
syncer-con
...
copilot/ge
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
983847c733 | ||
|
|
9b354e3245 | ||
|
|
fcb05793b1 | ||
|
|
83b4415f02 | ||
|
|
cd72bcbc15 | ||
|
|
9836f8376d | ||
|
|
dba054786e |
18
.gitconfig
Normal file
18
.gitconfig
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
[core]
|
||||||
|
autocrlf = input
|
||||||
|
whitespace = trailing-space,space-before-tab
|
||||||
|
|
||||||
|
[pull]
|
||||||
|
rebase = true
|
||||||
|
|
||||||
|
[push]
|
||||||
|
default = current
|
||||||
|
|
||||||
|
[rebase]
|
||||||
|
autoStash = true
|
||||||
|
|
||||||
|
[alias]
|
||||||
|
st = status
|
||||||
|
co = checkout
|
||||||
|
br = branch
|
||||||
|
lg = log --oneline --graph --decorate --all
|
||||||
@@ -935,6 +935,29 @@ spec:
|
|||||||
- Terminating
|
- Terminating
|
||||||
- Unknown
|
- Unknown
|
||||||
type: string
|
type: string
|
||||||
|
policy:
|
||||||
|
description: |-
|
||||||
|
policy represents the status of the policy applied to this cluster.
|
||||||
|
This field is set by the VirtualClusterPolicy controller.
|
||||||
|
properties:
|
||||||
|
name:
|
||||||
|
description: name is the name of the VirtualClusterPolicy currently
|
||||||
|
applied to this cluster.
|
||||||
|
minLength: 1
|
||||||
|
type: string
|
||||||
|
nodeSelector:
|
||||||
|
additionalProperties:
|
||||||
|
type: string
|
||||||
|
description: nodeSelector is a node selector enforced by the active
|
||||||
|
VirtualClusterPolicy.
|
||||||
|
type: object
|
||||||
|
priorityClass:
|
||||||
|
description: priorityClass is the priority class enforced by the
|
||||||
|
active VirtualClusterPolicy.
|
||||||
|
type: string
|
||||||
|
required:
|
||||||
|
- name
|
||||||
|
type: object
|
||||||
policyName:
|
policyName:
|
||||||
description: PolicyName specifies the virtual cluster policy name
|
description: PolicyName specifies the virtual cluster policy name
|
||||||
bound to the virtual cluster.
|
bound to the virtual cluster.
|
||||||
|
|||||||
@@ -41,6 +41,29 @@ _Appears In:_
|
|||||||
|===
|
|===
|
||||||
|
|
||||||
|
|
||||||
|
[id="{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-appliedpolicy"]
|
||||||
|
=== AppliedPolicy
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
AppliedPolicy defines the observed state of an applied policy.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
_Appears In:_
|
||||||
|
|
||||||
|
* xref:{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-clusterstatus[$$ClusterStatus$$]
|
||||||
|
|
||||||
|
[cols="25a,55a,10a,10a", options="header"]
|
||||||
|
|===
|
||||||
|
| Field | Description | Default | Validation
|
||||||
|
| *`name`* __string__ | name is the name of the VirtualClusterPolicy currently applied to this cluster. + | | MinLength: 1 +
|
||||||
|
|
||||||
|
| *`priorityClass`* __string__ | priorityClass is the priority class enforced by the active VirtualClusterPolicy. + | |
|
||||||
|
| *`nodeSelector`* __object (keys:string, values:string)__ | nodeSelector is a node selector enforced by the active VirtualClusterPolicy. + | |
|
||||||
|
|===
|
||||||
|
|
||||||
|
|
||||||
[id="{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-cluster"]
|
[id="{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-cluster"]
|
||||||
=== Cluster
|
=== Cluster
|
||||||
|
|
||||||
|
|||||||
@@ -32,6 +32,24 @@ _Appears in:_
|
|||||||
| `secretRef` _string_ | SecretRef is the name of the Secret. | | |
|
| `secretRef` _string_ | SecretRef is the name of the Secret. | | |
|
||||||
|
|
||||||
|
|
||||||
|
#### AppliedPolicy
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
AppliedPolicy defines the observed state of an applied policy.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
_Appears in:_
|
||||||
|
- [ClusterStatus](#clusterstatus)
|
||||||
|
|
||||||
|
| Field | Description | Default | Validation |
|
||||||
|
| --- | --- | --- | --- |
|
||||||
|
| `name` _string_ | name is the name of the VirtualClusterPolicy currently applied to this cluster. | | MinLength: 1 <br /> |
|
||||||
|
| `priorityClass` _string_ | priorityClass is the priority class enforced by the active VirtualClusterPolicy. | | |
|
||||||
|
| `nodeSelector` _object (keys:string, values:string)_ | nodeSelector is a node selector enforced by the active VirtualClusterPolicy. | | |
|
||||||
|
|
||||||
|
|
||||||
#### Cluster
|
#### Cluster
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -276,7 +276,7 @@ func (k *kubelet) newProviderFunc(cfg config) nodeutil.NewProviderFunc {
|
|||||||
cfg.AgentHostname,
|
cfg.AgentHostname,
|
||||||
k.port,
|
k.port,
|
||||||
k.agentIP,
|
k.agentIP,
|
||||||
utilProvider.HostClient,
|
k.hostMgr,
|
||||||
utilProvider.VirtualClient,
|
utilProvider.VirtualClient,
|
||||||
k.virtualCluster,
|
k.virtualCluster,
|
||||||
cfg.Version,
|
cfg.Version,
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"github.com/go-logr/logr"
|
"github.com/go-logr/logr"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@@ -13,13 +14,13 @@ import (
|
|||||||
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
|
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ConfigureNode(logger logr.Logger, node *corev1.Node, hostname string, servicePort int, ip string, hostClient client.Client, virtualClient client.Client, virtualCluster v1beta1.Cluster, version string, mirrorHostNodes bool) {
|
func ConfigureNode(logger logr.Logger, node *corev1.Node, hostname string, servicePort int, ip string, hostMgr manager.Manager, virtualClient client.Client, virtualCluster v1beta1.Cluster, version string, mirrorHostNodes bool) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
if mirrorHostNodes {
|
if mirrorHostNodes {
|
||||||
var hostNode corev1.Node
|
var hostNode corev1.Node
|
||||||
if err := hostClient.Get(ctx, types.NamespacedName{Name: node.Name}, &hostNode); err != nil {
|
if err := hostMgr.GetAPIReader().Get(ctx, types.NamespacedName{Name: node.Name}, &hostNode); err != nil {
|
||||||
logger.Error(err, "error getting host node for mirroring", err)
|
logger.Error(err, "error getting host node for mirroring")
|
||||||
}
|
}
|
||||||
|
|
||||||
node.Spec = *hostNode.Spec.DeepCopy()
|
node.Spec = *hostNode.Spec.DeepCopy()
|
||||||
@@ -48,7 +49,7 @@ func ConfigureNode(logger logr.Logger, node *corev1.Node, hostname string, servi
|
|||||||
// configure versions
|
// configure versions
|
||||||
node.Status.NodeInfo.KubeletVersion = version
|
node.Status.NodeInfo.KubeletVersion = version
|
||||||
|
|
||||||
startNodeCapacityUpdater(ctx, logger, hostClient, virtualClient, virtualCluster, node.Name)
|
startNodeCapacityUpdater(ctx, logger, hostMgr.GetClient(), virtualClient, virtualCluster, node.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package provider
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"maps"
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -83,11 +84,30 @@ func updateNodeCapacity(ctx context.Context, logger logr.Logger, hostClient clie
|
|||||||
|
|
||||||
mergedResourceLists := mergeResourceLists(resourceLists...)
|
mergedResourceLists := mergeResourceLists(resourceLists...)
|
||||||
|
|
||||||
m, err := distributeQuotas(ctx, logger, virtualClient, mergedResourceLists)
|
var virtualNodeList, hostNodeList corev1.NodeList
|
||||||
if err != nil {
|
|
||||||
logger.Error(err, "error distributing policy quota")
|
if err := virtualClient.List(ctx, &virtualNodeList); err != nil {
|
||||||
|
logger.Error(err, "error listing virtual nodes for stable capacity distribution")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
virtResourceMap := make(map[string]corev1.ResourceList)
|
||||||
|
for _, vNode := range virtualNodeList.Items {
|
||||||
|
virtResourceMap[vNode.Name] = corev1.ResourceList{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := hostClient.List(ctx, &hostNodeList); err != nil {
|
||||||
|
logger.Error(err, "error listing host nodes for stable capacity distribution")
|
||||||
|
}
|
||||||
|
|
||||||
|
hostResourceMap := make(map[string]corev1.ResourceList)
|
||||||
|
|
||||||
|
for _, hNode := range hostNodeList.Items {
|
||||||
|
if _, ok := virtResourceMap[hNode.Name]; ok {
|
||||||
|
hostResourceMap[hNode.Name] = hNode.Status.Allocatable
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m := distributeQuotas(hostResourceMap, virtResourceMap, mergedResourceLists)
|
||||||
allocatable = m[virtualNodeName]
|
allocatable = m[virtualNodeName]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,76 +145,99 @@ func mergeResourceLists(resourceLists ...corev1.ResourceList) corev1.ResourceLis
|
|||||||
return merged
|
return merged
|
||||||
}
|
}
|
||||||
|
|
||||||
// distributeQuotas divides the total resource quotas evenly among all active virtual nodes.
|
// distributeQuotas divides the total resource quotas among all active virtual nodes,
|
||||||
// This ensures that each virtual node reports a fair share of the available resources,
|
// capped by each node's actual host capacity. This ensures that each virtual node
|
||||||
// preventing the scheduler from overloading a single node.
|
// reports a fair share of the available resources without exceeding what its
|
||||||
|
// underlying host node can provide.
|
||||||
//
|
//
|
||||||
// The algorithm iterates over each resource, divides it as evenly as possible among the
|
// For each resource type the algorithm uses a multi-pass redistribution loop:
|
||||||
// sorted virtual nodes, and distributes any remainder to the first few nodes to ensure
|
// 1. Divide the remaining quota evenly among eligible nodes (sorted by name for
|
||||||
// all resources are allocated. Sorting the nodes by name guarantees a deterministic
|
// determinism), assigning any integer remainder to the first nodes alphabetically.
|
||||||
// distribution.
|
// 2. Cap each node's share at its host allocatable capacity.
|
||||||
func distributeQuotas(ctx context.Context, logger logr.Logger, virtualClient client.Client, quotas corev1.ResourceList) (map[string]corev1.ResourceList, error) {
|
// 3. Remove nodes that have reached their host capacity.
|
||||||
// List all virtual nodes to distribute the quota stably.
|
// 4. If there is still unallocated quota (because some nodes were capped below their
|
||||||
var virtualNodeList corev1.NodeList
|
// even share), repeat from step 1 with the remaining quota and remaining nodes.
|
||||||
if err := virtualClient.List(ctx, &virtualNodeList); err != nil {
|
//
|
||||||
logger.Error(err, "error listing virtual nodes for stable capacity distribution, falling back to full quota")
|
// The loop terminates when the quota is fully distributed or no eligible nodes remain.
|
||||||
return nil, err
|
func distributeQuotas(hostResourceMap, virtResourceMap map[string]corev1.ResourceList, quotas corev1.ResourceList) map[string]corev1.ResourceList {
|
||||||
}
|
resourceMap := make(map[string]corev1.ResourceList, len(virtResourceMap))
|
||||||
|
maps.Copy(resourceMap, virtResourceMap)
|
||||||
// If there are no virtual nodes, there's nothing to distribute.
|
|
||||||
numNodes := int64(len(virtualNodeList.Items))
|
|
||||||
if numNodes == 0 {
|
|
||||||
logger.Info("error listing virtual nodes for stable capacity distribution, falling back to full quota")
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sort nodes by name for a deterministic distribution of resources.
|
|
||||||
sort.Slice(virtualNodeList.Items, func(i, j int) bool {
|
|
||||||
return virtualNodeList.Items[i].Name < virtualNodeList.Items[j].Name
|
|
||||||
})
|
|
||||||
|
|
||||||
// Initialize the resource map for each virtual node.
|
|
||||||
resourceMap := make(map[string]corev1.ResourceList)
|
|
||||||
for _, virtualNode := range virtualNodeList.Items {
|
|
||||||
resourceMap[virtualNode.Name] = corev1.ResourceList{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Distribute each resource type from the policy's hard quota
|
// Distribute each resource type from the policy's hard quota
|
||||||
for resourceName, totalQuantity := range quotas {
|
for resourceName, totalQuantity := range quotas {
|
||||||
// Use MilliValue for precise division, especially for resources like CPU,
|
_, useMilli := milliScaleResources[resourceName]
|
||||||
// which are often expressed in milli-units. Otherwise, use the standard Value().
|
|
||||||
var totalValue int64
|
// eligible nodes for each distribution cycle
|
||||||
if _, found := milliScaleResources[resourceName]; found {
|
var eligibleNodes []string
|
||||||
totalValue = totalQuantity.MilliValue()
|
|
||||||
} else {
|
hostCap := make(map[string]int64)
|
||||||
totalValue = totalQuantity.Value()
|
|
||||||
|
// Populate the host nodes capacity map and the initial effective nodes
|
||||||
|
for vn := range virtResourceMap {
|
||||||
|
hostNodeResources := hostResourceMap[vn]
|
||||||
|
if hostNodeResources == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
resourceQuantity, found := hostNodeResources[resourceName]
|
||||||
|
if !found {
|
||||||
|
// skip the node if the resource does not exist on the host node
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
hostCap[vn] = resourceQuantity.Value()
|
||||||
|
if useMilli {
|
||||||
|
hostCap[vn] = resourceQuantity.MilliValue()
|
||||||
|
}
|
||||||
|
|
||||||
|
eligibleNodes = append(eligibleNodes, vn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate the base quantity of the resource to be allocated per node.
|
sort.Strings(eligibleNodes)
|
||||||
// and the remainder that needs to be distributed among the nodes.
|
|
||||||
//
|
|
||||||
// For example, if totalValue is 2000 (e.g., 2 CPU) and there are 3 nodes:
|
|
||||||
// - quantityPerNode would be 666 (2000 / 3)
|
|
||||||
// - remainder would be 2 (2000 % 3)
|
|
||||||
// The first two nodes would get 667 (666 + 1), and the last one would get 666.
|
|
||||||
quantityPerNode := totalValue / numNodes
|
|
||||||
remainder := totalValue % numNodes
|
|
||||||
|
|
||||||
// Iterate through the sorted virtual nodes to distribute the resource.
|
totalValue := totalQuantity.Value()
|
||||||
for _, virtualNode := range virtualNodeList.Items {
|
if useMilli {
|
||||||
nodeQuantity := quantityPerNode
|
totalValue = totalQuantity.MilliValue()
|
||||||
if remainder > 0 {
|
}
|
||||||
nodeQuantity++
|
|
||||||
remainder--
|
// Start of the distribution cycle, each cycle will distribute the quota resource
|
||||||
|
// evenly between nodes, each node can not exceed the corresponding host node capacity
|
||||||
|
for totalValue > 0 && len(eligibleNodes) > 0 {
|
||||||
|
nodeNum := int64(len(eligibleNodes))
|
||||||
|
quantityPerNode := totalValue / nodeNum
|
||||||
|
remainder := totalValue % nodeNum
|
||||||
|
|
||||||
|
remainingNodes := []string{}
|
||||||
|
|
||||||
|
for _, virtualNodeName := range eligibleNodes {
|
||||||
|
nodeQuantity := quantityPerNode
|
||||||
|
if remainder > 0 {
|
||||||
|
nodeQuantity++
|
||||||
|
remainder--
|
||||||
|
}
|
||||||
|
// We cap the quantity to the hostNode capacity
|
||||||
|
nodeQuantity = min(nodeQuantity, hostCap[virtualNodeName])
|
||||||
|
|
||||||
|
if nodeQuantity > 0 {
|
||||||
|
existing := resourceMap[virtualNodeName][resourceName]
|
||||||
|
if useMilli {
|
||||||
|
resourceMap[virtualNodeName][resourceName] = *resource.NewMilliQuantity(existing.MilliValue()+nodeQuantity, totalQuantity.Format)
|
||||||
|
} else {
|
||||||
|
resourceMap[virtualNodeName][resourceName] = *resource.NewQuantity(existing.Value()+nodeQuantity, totalQuantity.Format)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
totalValue -= nodeQuantity
|
||||||
|
hostCap[virtualNodeName] -= nodeQuantity
|
||||||
|
|
||||||
|
if hostCap[virtualNodeName] > 0 {
|
||||||
|
remainingNodes = append(remainingNodes, virtualNodeName)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, found := milliScaleResources[resourceName]; found {
|
eligibleNodes = remainingNodes
|
||||||
resourceMap[virtualNode.Name][resourceName] = *resource.NewMilliQuantity(nodeQuantity, totalQuantity.Format)
|
|
||||||
} else {
|
|
||||||
resourceMap[virtualNode.Name][resourceName] = *resource.NewQuantity(nodeQuantity, totalQuantity.Format)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return resourceMap, nil
|
return resourceMap
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,19 +1,13 @@
|
|||||||
package provider
|
package provider
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/go-logr/zapr"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"go.uber.org/zap"
|
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_distributeQuotas(t *testing.T) {
|
func Test_distributeQuotas(t *testing.T) {
|
||||||
@@ -21,39 +15,56 @@ func Test_distributeQuotas(t *testing.T) {
|
|||||||
err := corev1.AddToScheme(scheme)
|
err := corev1.AddToScheme(scheme)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
node1 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}
|
// Large allocatable so capping doesn't interfere with basic distribution tests.
|
||||||
node2 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2"}}
|
largeAllocatable := corev1.ResourceList{
|
||||||
node3 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-3"}}
|
corev1.ResourceCPU: resource.MustParse("100"),
|
||||||
|
corev1.ResourceMemory: resource.MustParse("100Gi"),
|
||||||
|
corev1.ResourcePods: resource.MustParse("1000"),
|
||||||
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
virtualNodes []client.Object
|
virtResourceMap map[string]corev1.ResourceList
|
||||||
quotas corev1.ResourceList
|
hostResourceMap map[string]corev1.ResourceList
|
||||||
want map[string]corev1.ResourceList
|
quotas corev1.ResourceList
|
||||||
wantErr bool
|
want map[string]corev1.ResourceList
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "no virtual nodes",
|
name: "no virtual nodes",
|
||||||
virtualNodes: []client.Object{},
|
virtResourceMap: map[string]corev1.ResourceList{},
|
||||||
quotas: corev1.ResourceList{
|
quotas: corev1.ResourceList{
|
||||||
corev1.ResourceCPU: resource.MustParse("2"),
|
corev1.ResourceCPU: resource.MustParse("2"),
|
||||||
},
|
},
|
||||||
want: map[string]corev1.ResourceList{},
|
want: map[string]corev1.ResourceList{},
|
||||||
wantErr: false,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "no quotas",
|
name: "no quotas",
|
||||||
virtualNodes: []client.Object{node1, node2},
|
virtResourceMap: map[string]corev1.ResourceList{
|
||||||
quotas: corev1.ResourceList{},
|
"node-1": {},
|
||||||
|
"node-2": {},
|
||||||
|
},
|
||||||
|
hostResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": largeAllocatable,
|
||||||
|
"node-2": largeAllocatable,
|
||||||
|
},
|
||||||
|
quotas: corev1.ResourceList{},
|
||||||
want: map[string]corev1.ResourceList{
|
want: map[string]corev1.ResourceList{
|
||||||
"node-1": {},
|
"node-1": {},
|
||||||
"node-2": {},
|
"node-2": {},
|
||||||
},
|
},
|
||||||
wantErr: false,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "even distribution of cpu and memory",
|
name: "fewer virtual nodes than host nodes",
|
||||||
virtualNodes: []client.Object{node1, node2},
|
virtResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {},
|
||||||
|
"node-2": {},
|
||||||
|
},
|
||||||
|
hostResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": largeAllocatable,
|
||||||
|
"node-2": largeAllocatable,
|
||||||
|
"node-3": largeAllocatable,
|
||||||
|
"node-4": largeAllocatable,
|
||||||
|
},
|
||||||
quotas: corev1.ResourceList{
|
quotas: corev1.ResourceList{
|
||||||
corev1.ResourceCPU: resource.MustParse("2"),
|
corev1.ResourceCPU: resource.MustParse("2"),
|
||||||
corev1.ResourceMemory: resource.MustParse("4Gi"),
|
corev1.ResourceMemory: resource.MustParse("4Gi"),
|
||||||
@@ -68,65 +79,203 @@ func Test_distributeQuotas(t *testing.T) {
|
|||||||
corev1.ResourceMemory: resource.MustParse("2Gi"),
|
corev1.ResourceMemory: resource.MustParse("2Gi"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
wantErr: false,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "uneven distribution with remainder",
|
name: "even distribution of cpu and memory",
|
||||||
virtualNodes: []client.Object{node1, node2, node3},
|
virtResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {},
|
||||||
|
"node-2": {},
|
||||||
|
},
|
||||||
|
hostResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": largeAllocatable,
|
||||||
|
"node-2": largeAllocatable,
|
||||||
|
},
|
||||||
quotas: corev1.ResourceList{
|
quotas: corev1.ResourceList{
|
||||||
corev1.ResourceCPU: resource.MustParse("2"), // 2000m / 3 = 666m with 2m remainder
|
corev1.ResourceCPU: resource.MustParse("2"),
|
||||||
|
corev1.ResourceMemory: resource.MustParse("4Gi"),
|
||||||
|
},
|
||||||
|
want: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {
|
||||||
|
corev1.ResourceCPU: resource.MustParse("1"),
|
||||||
|
corev1.ResourceMemory: resource.MustParse("2Gi"),
|
||||||
|
},
|
||||||
|
"node-2": {
|
||||||
|
corev1.ResourceCPU: resource.MustParse("1"),
|
||||||
|
corev1.ResourceMemory: resource.MustParse("2Gi"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "uneven distribution with remainder",
|
||||||
|
virtResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {},
|
||||||
|
"node-2": {},
|
||||||
|
"node-3": {},
|
||||||
|
},
|
||||||
|
hostResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": largeAllocatable,
|
||||||
|
"node-2": largeAllocatable,
|
||||||
|
"node-3": largeAllocatable,
|
||||||
|
},
|
||||||
|
quotas: corev1.ResourceList{
|
||||||
|
corev1.ResourceCPU: resource.MustParse("2"),
|
||||||
},
|
},
|
||||||
want: map[string]corev1.ResourceList{
|
want: map[string]corev1.ResourceList{
|
||||||
"node-1": {corev1.ResourceCPU: resource.MustParse("667m")},
|
"node-1": {corev1.ResourceCPU: resource.MustParse("667m")},
|
||||||
"node-2": {corev1.ResourceCPU: resource.MustParse("667m")},
|
"node-2": {corev1.ResourceCPU: resource.MustParse("667m")},
|
||||||
"node-3": {corev1.ResourceCPU: resource.MustParse("666m")},
|
"node-3": {corev1.ResourceCPU: resource.MustParse("666m")},
|
||||||
},
|
},
|
||||||
wantErr: false,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "distribution of number resources",
|
name: "distribution of number resources",
|
||||||
virtualNodes: []client.Object{node1, node2, node3},
|
virtResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {},
|
||||||
|
"node-2": {},
|
||||||
|
"node-3": {},
|
||||||
|
},
|
||||||
|
hostResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": largeAllocatable,
|
||||||
|
"node-2": largeAllocatable,
|
||||||
|
"node-3": largeAllocatable,
|
||||||
|
},
|
||||||
quotas: corev1.ResourceList{
|
quotas: corev1.ResourceList{
|
||||||
corev1.ResourceCPU: resource.MustParse("2"),
|
corev1.ResourceCPU: resource.MustParse("2"),
|
||||||
corev1.ResourcePods: resource.MustParse("11"),
|
corev1.ResourcePods: resource.MustParse("11"),
|
||||||
corev1.ResourceSecrets: resource.MustParse("9"),
|
|
||||||
"custom": resource.MustParse("8"),
|
|
||||||
},
|
},
|
||||||
want: map[string]corev1.ResourceList{
|
want: map[string]corev1.ResourceList{
|
||||||
"node-1": {
|
"node-1": {
|
||||||
corev1.ResourceCPU: resource.MustParse("667m"),
|
corev1.ResourceCPU: resource.MustParse("667m"),
|
||||||
corev1.ResourcePods: resource.MustParse("4"),
|
corev1.ResourcePods: resource.MustParse("4"),
|
||||||
corev1.ResourceSecrets: resource.MustParse("3"),
|
|
||||||
"custom": resource.MustParse("3"),
|
|
||||||
},
|
},
|
||||||
"node-2": {
|
"node-2": {
|
||||||
corev1.ResourceCPU: resource.MustParse("667m"),
|
corev1.ResourceCPU: resource.MustParse("667m"),
|
||||||
corev1.ResourcePods: resource.MustParse("4"),
|
corev1.ResourcePods: resource.MustParse("4"),
|
||||||
corev1.ResourceSecrets: resource.MustParse("3"),
|
|
||||||
"custom": resource.MustParse("3"),
|
|
||||||
},
|
},
|
||||||
"node-3": {
|
"node-3": {
|
||||||
corev1.ResourceCPU: resource.MustParse("666m"),
|
corev1.ResourceCPU: resource.MustParse("666m"),
|
||||||
corev1.ResourcePods: resource.MustParse("3"),
|
corev1.ResourcePods: resource.MustParse("3"),
|
||||||
corev1.ResourceSecrets: resource.MustParse("3"),
|
|
||||||
"custom": resource.MustParse("2"),
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
wantErr: false,
|
},
|
||||||
|
{
|
||||||
|
name: "extended resource distributed only to nodes that have it",
|
||||||
|
virtResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {},
|
||||||
|
"node-2": {},
|
||||||
|
"node-3": {},
|
||||||
|
},
|
||||||
|
hostResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {
|
||||||
|
corev1.ResourceCPU: resource.MustParse("100"),
|
||||||
|
"nvidia.com/gpu": resource.MustParse("2"),
|
||||||
|
},
|
||||||
|
"node-2": {
|
||||||
|
corev1.ResourceCPU: resource.MustParse("100"),
|
||||||
|
},
|
||||||
|
"node-3": {
|
||||||
|
corev1.ResourceCPU: resource.MustParse("100"),
|
||||||
|
"nvidia.com/gpu": resource.MustParse("4"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
quotas: corev1.ResourceList{
|
||||||
|
corev1.ResourceCPU: resource.MustParse("3"),
|
||||||
|
"nvidia.com/gpu": resource.MustParse("4"),
|
||||||
|
},
|
||||||
|
want: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {
|
||||||
|
corev1.ResourceCPU: resource.MustParse("1"),
|
||||||
|
"nvidia.com/gpu": resource.MustParse("2"),
|
||||||
|
},
|
||||||
|
"node-2": {
|
||||||
|
corev1.ResourceCPU: resource.MustParse("1"),
|
||||||
|
},
|
||||||
|
"node-3": {
|
||||||
|
corev1.ResourceCPU: resource.MustParse("1"),
|
||||||
|
"nvidia.com/gpu": resource.MustParse("2"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "capping at host capacity with redistribution",
|
||||||
|
virtResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {},
|
||||||
|
"node-2": {},
|
||||||
|
},
|
||||||
|
hostResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {
|
||||||
|
corev1.ResourceCPU: resource.MustParse("8"),
|
||||||
|
},
|
||||||
|
"node-2": {
|
||||||
|
corev1.ResourceCPU: resource.MustParse("2"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
quotas: corev1.ResourceList{
|
||||||
|
corev1.ResourceCPU: resource.MustParse("6"),
|
||||||
|
},
|
||||||
|
// Even split would be 3 each, but node-2 only has 2 CPU.
|
||||||
|
// node-2 gets capped at 2, the remaining 1 goes to node-1.
|
||||||
|
want: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {corev1.ResourceCPU: resource.MustParse("4")},
|
||||||
|
"node-2": {corev1.ResourceCPU: resource.MustParse("2")},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "gpu capping with uneven host capacity",
|
||||||
|
virtResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {},
|
||||||
|
"node-2": {},
|
||||||
|
},
|
||||||
|
hostResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {
|
||||||
|
"nvidia.com/gpu": resource.MustParse("6"),
|
||||||
|
},
|
||||||
|
"node-2": {
|
||||||
|
"nvidia.com/gpu": resource.MustParse("1"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
quotas: corev1.ResourceList{
|
||||||
|
"nvidia.com/gpu": resource.MustParse("4"),
|
||||||
|
},
|
||||||
|
// Even split would be 2 each, but node-2 only has 1 GPU.
|
||||||
|
// node-2 gets capped at 1, the remaining 1 goes to node-1.
|
||||||
|
want: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {"nvidia.com/gpu": resource.MustParse("3")},
|
||||||
|
"node-2": {"nvidia.com/gpu": resource.MustParse("1")},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "quota exceeds total host capacity",
|
||||||
|
virtResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {},
|
||||||
|
"node-2": {},
|
||||||
|
"node-3": {},
|
||||||
|
},
|
||||||
|
hostResourceMap: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {
|
||||||
|
"nvidia.com/gpu": resource.MustParse("2"),
|
||||||
|
},
|
||||||
|
"node-2": {
|
||||||
|
"nvidia.com/gpu": resource.MustParse("1"),
|
||||||
|
},
|
||||||
|
"node-3": {
|
||||||
|
"nvidia.com/gpu": resource.MustParse("1"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
quotas: corev1.ResourceList{
|
||||||
|
"nvidia.com/gpu": resource.MustParse("10"),
|
||||||
|
},
|
||||||
|
// Total host capacity is 4, quota is 10. Each node gets its full capacity.
|
||||||
|
want: map[string]corev1.ResourceList{
|
||||||
|
"node-1": {"nvidia.com/gpu": resource.MustParse("2")},
|
||||||
|
"node-2": {"nvidia.com/gpu": resource.MustParse("1")},
|
||||||
|
"node-3": {"nvidia.com/gpu": resource.MustParse("1")},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(tt.virtualNodes...).Build()
|
got := distributeQuotas(tt.hostResourceMap, tt.virtResourceMap, tt.quotas)
|
||||||
logger := zapr.NewLogger(zap.NewNop())
|
|
||||||
|
|
||||||
got, gotErr := distributeQuotas(context.Background(), logger, fakeClient, tt.quotas)
|
|
||||||
if tt.wantErr {
|
|
||||||
assert.Error(t, gotErr)
|
|
||||||
} else {
|
|
||||||
assert.NoError(t, gotErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, len(tt.want), len(got), "Number of nodes in result should match")
|
assert.Equal(t, len(tt.want), len(got), "Number of nodes in result should match")
|
||||||
|
|
||||||
|
|||||||
@@ -401,28 +401,45 @@ func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
|
|||||||
// Schedule the host pod in the same host node of the virtual kubelet
|
// Schedule the host pod in the same host node of the virtual kubelet
|
||||||
hostPod.Spec.NodeName = p.agentHostname
|
hostPod.Spec.NodeName = p.agentHostname
|
||||||
|
|
||||||
|
// The pod's own nodeSelector is ignored.
|
||||||
|
// The final selector is determined by the cluster spec, but overridden by a policy if present.
|
||||||
hostPod.Spec.NodeSelector = cluster.Spec.NodeSelector
|
hostPod.Spec.NodeSelector = cluster.Spec.NodeSelector
|
||||||
|
if cluster.Status.Policy != nil && len(cluster.Status.Policy.NodeSelector) > 0 {
|
||||||
|
hostPod.Spec.NodeSelector = cluster.Status.Policy.NodeSelector
|
||||||
|
}
|
||||||
|
|
||||||
// setting the hostname for the pod if its not set
|
// setting the hostname for the pod if its not set
|
||||||
if virtualPod.Spec.Hostname == "" {
|
if virtualPod.Spec.Hostname == "" {
|
||||||
hostPod.Spec.Hostname = k3kcontroller.SafeConcatName(virtualPod.Name)
|
hostPod.Spec.Hostname = k3kcontroller.SafeConcatName(virtualPod.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the priorityClass for the virtual cluster is set then override the provided value
|
// When a PriorityClass is set we will use the translated one in the HostCluster.
|
||||||
|
// If the Cluster or a Policy defines a PriorityClass of the host we are going to use that one.
|
||||||
// Note: the core-dns and local-path-provisioner pod are scheduled by k3s with the
|
// Note: the core-dns and local-path-provisioner pod are scheduled by k3s with the
|
||||||
// 'system-cluster-critical' and 'system-node-critical' default priority classes.
|
// 'system-cluster-critical' and 'system-node-critical' default priority classes.
|
||||||
if !strings.HasPrefix(hostPod.Spec.PriorityClassName, "system-") {
|
//
|
||||||
if hostPod.Spec.PriorityClassName != "" {
|
// TODO: we probably need to define a custom "intermediate" k3k-system-* priority
|
||||||
tPriorityClassName := p.Translator.TranslateName("", hostPod.Spec.PriorityClassName)
|
if strings.HasPrefix(virtualPod.Spec.PriorityClassName, "system-") {
|
||||||
hostPod.Spec.PriorityClassName = tPriorityClassName
|
hostPod.Spec.PriorityClassName = virtualPod.Spec.PriorityClassName
|
||||||
|
} else {
|
||||||
|
enforcedPriorityClassName := cluster.Spec.PriorityClass
|
||||||
|
if cluster.Status.Policy != nil && cluster.Status.Policy.PriorityClass != nil {
|
||||||
|
enforcedPriorityClassName = *cluster.Status.Policy.PriorityClass
|
||||||
}
|
}
|
||||||
|
|
||||||
if cluster.Spec.PriorityClass != "" {
|
if enforcedPriorityClassName != "" {
|
||||||
hostPod.Spec.PriorityClassName = cluster.Spec.PriorityClass
|
hostPod.Spec.PriorityClassName = enforcedPriorityClassName
|
||||||
|
} else if virtualPod.Spec.PriorityClassName != "" {
|
||||||
|
hostPod.Spec.PriorityClassName = p.Translator.TranslateName("", virtualPod.Spec.PriorityClassName)
|
||||||
hostPod.Spec.Priority = nil
|
hostPod.Spec.Priority = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if the priority class is set we need to remove the priority
|
||||||
|
if hostPod.Spec.PriorityClassName != "" {
|
||||||
|
hostPod.Spec.Priority = nil
|
||||||
|
}
|
||||||
|
|
||||||
p.configurePodEnvs(hostPod, &virtualPod)
|
p.configurePodEnvs(hostPod, &virtualPod)
|
||||||
|
|
||||||
// fieldpath annotations
|
// fieldpath annotations
|
||||||
|
|||||||
@@ -538,6 +538,12 @@ type ClusterStatus struct {
|
|||||||
// +optional
|
// +optional
|
||||||
PolicyName string `json:"policyName,omitempty"`
|
PolicyName string `json:"policyName,omitempty"`
|
||||||
|
|
||||||
|
// policy represents the status of the policy applied to this cluster.
|
||||||
|
// This field is set by the VirtualClusterPolicy controller.
|
||||||
|
//
|
||||||
|
// +optional
|
||||||
|
Policy *AppliedPolicy `json:"policy,omitempty"`
|
||||||
|
|
||||||
// KubeletPort specefies the port used by k3k-kubelet in shared mode.
|
// KubeletPort specefies the port used by k3k-kubelet in shared mode.
|
||||||
//
|
//
|
||||||
// +optional
|
// +optional
|
||||||
@@ -561,6 +567,25 @@ type ClusterStatus struct {
|
|||||||
Phase ClusterPhase `json:"phase,omitempty"`
|
Phase ClusterPhase `json:"phase,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AppliedPolicy defines the observed state of an applied policy.
|
||||||
|
type AppliedPolicy struct {
|
||||||
|
// name is the name of the VirtualClusterPolicy currently applied to this cluster.
|
||||||
|
//
|
||||||
|
// +kubebuilder:validation:MinLength:=1
|
||||||
|
// +required
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
|
||||||
|
// priorityClass is the priority class enforced by the active VirtualClusterPolicy.
|
||||||
|
//
|
||||||
|
// +optional
|
||||||
|
PriorityClass *string `json:"priorityClass,omitempty"`
|
||||||
|
|
||||||
|
// nodeSelector is a node selector enforced by the active VirtualClusterPolicy.
|
||||||
|
//
|
||||||
|
// +optional
|
||||||
|
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
// ClusterPhase is a high-level summary of the cluster's current lifecycle state.
|
// ClusterPhase is a high-level summary of the cluster's current lifecycle state.
|
||||||
type ClusterPhase string
|
type ClusterPhase string
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,33 @@ func (in *Addon) DeepCopy() *Addon {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||||
|
func (in *AppliedPolicy) DeepCopyInto(out *AppliedPolicy) {
|
||||||
|
*out = *in
|
||||||
|
if in.PriorityClass != nil {
|
||||||
|
in, out := &in.PriorityClass, &out.PriorityClass
|
||||||
|
*out = new(string)
|
||||||
|
**out = **in
|
||||||
|
}
|
||||||
|
if in.NodeSelector != nil {
|
||||||
|
in, out := &in.NodeSelector, &out.NodeSelector
|
||||||
|
*out = make(map[string]string, len(*in))
|
||||||
|
for key, val := range *in {
|
||||||
|
(*out)[key] = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AppliedPolicy.
|
||||||
|
func (in *AppliedPolicy) DeepCopy() *AppliedPolicy {
|
||||||
|
if in == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
out := new(AppliedPolicy)
|
||||||
|
in.DeepCopyInto(out)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||||
func (in *Cluster) DeepCopyInto(out *Cluster) {
|
func (in *Cluster) DeepCopyInto(out *Cluster) {
|
||||||
*out = *in
|
*out = *in
|
||||||
@@ -200,6 +227,11 @@ func (in *ClusterStatus) DeepCopyInto(out *ClusterStatus) {
|
|||||||
*out = make([]string, len(*in))
|
*out = make([]string, len(*in))
|
||||||
copy(*out, *in)
|
copy(*out, *in)
|
||||||
}
|
}
|
||||||
|
if in.Policy != nil {
|
||||||
|
in, out := &in.Policy, &out.Policy
|
||||||
|
*out = new(AppliedPolicy)
|
||||||
|
(*in).DeepCopyInto(*out)
|
||||||
|
}
|
||||||
if in.Conditions != nil {
|
if in.Conditions != nil {
|
||||||
in, out := &in.Conditions, &out.Conditions
|
in, out := &in.Conditions, &out.Conditions
|
||||||
*out = make([]metav1.Condition, len(*in))
|
*out = make([]metav1.Condition, len(*in))
|
||||||
|
|||||||
@@ -2,13 +2,17 @@ package policy
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/selection"
|
"k8s.io/apimachinery/pkg/selection"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
networkingv1 "k8s.io/api/networking/v1"
|
networkingv1 "k8s.io/api/networking/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
ctrl "sigs.k8s.io/controller-runtime"
|
ctrl "sigs.k8s.io/controller-runtime"
|
||||||
|
|
||||||
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
|
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
|
||||||
@@ -52,15 +56,36 @@ func (c *VirtualClusterPolicyReconciler) cleanupNamespaces(ctx context.Context)
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, ns := range namespaces.Items {
|
for _, ns := range namespaces.Items {
|
||||||
selector := labels.NewSelector()
|
currentPolicyName := ns.Labels[PolicyNameLabelKey]
|
||||||
|
|
||||||
if req, err := labels.NewRequirement(ManagedByLabelKey, selection.Equals, []string{VirtualPolicyControllerName}); err == nil {
|
// This will match all the resources managed by the K3k Policy controller
|
||||||
selector = selector.Add(*req)
|
// that have the app.kubernetes.io/managed-by=k3k-policy-controller label
|
||||||
}
|
selector := labels.SelectorFromSet(labels.Set{
|
||||||
|
ManagedByLabelKey: VirtualPolicyControllerName,
|
||||||
|
})
|
||||||
|
|
||||||
// if the namespace is bound to a policy -> cleanup resources of other policies
|
// If the namespace is not bound to any policy, or if the policy it was bound to no longer exists,
|
||||||
if ns.Labels[PolicyNameLabelKey] != "" {
|
// we need to clear policy-related fields on its Cluster objects.
|
||||||
requirement, err := labels.NewRequirement(PolicyNameLabelKey, selection.NotEquals, []string{ns.Labels[PolicyNameLabelKey]})
|
if currentPolicyName == "" {
|
||||||
|
if err := c.clearPolicyFieldsForClustersInNamespace(ctx, ns.Name); err != nil {
|
||||||
|
log.Error(err, "error clearing policy fields for clusters in unbound namespace", "namespace", ns.Name)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
var policy v1beta1.VirtualClusterPolicy
|
||||||
|
if err := c.Client.Get(ctx, types.NamespacedName{Name: currentPolicyName}, &policy); err != nil {
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
if err := c.clearPolicyFieldsForClustersInNamespace(ctx, ns.Name); err != nil {
|
||||||
|
log.Error(err, "error clearing policy fields for clusters in namespace with non-existent policy", "namespace", ns.Name, "policy", currentPolicyName)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Error(err, "error getting policy for namespace", "namespace", ns.Name, "policy", currentPolicyName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the namespace is bound to a policy -> cleanup resources of other policies
|
||||||
|
requirement, err := labels.NewRequirement(
|
||||||
|
PolicyNameLabelKey, selection.NotEquals, []string{currentPolicyName},
|
||||||
|
)
|
||||||
|
|
||||||
// log the error but continue cleaning up the other namespaces
|
// log the error but continue cleaning up the other namespaces
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -90,3 +115,30 @@ func (c *VirtualClusterPolicyReconciler) cleanupNamespaces(ctx context.Context)
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clearPolicyFieldsForClustersInNamespace sets the policy status on Cluster objects in the given namespace to nil.
|
||||||
|
func (c *VirtualClusterPolicyReconciler) clearPolicyFieldsForClustersInNamespace(ctx context.Context, namespace string) error {
|
||||||
|
log := ctrl.LoggerFrom(ctx)
|
||||||
|
|
||||||
|
var clusters v1beta1.ClusterList
|
||||||
|
if err := c.Client.List(ctx, &clusters, client.InNamespace(namespace)); err != nil {
|
||||||
|
return fmt.Errorf("failed listing clusters in namespace %s: %w", namespace, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var updateErrs []error
|
||||||
|
|
||||||
|
for i := range clusters.Items {
|
||||||
|
cluster := clusters.Items[i]
|
||||||
|
if cluster.Status.Policy != nil {
|
||||||
|
log.V(1).Info("Clearing policy status for Cluster", "cluster", cluster.Name, "namespace", namespace)
|
||||||
|
cluster.Status.Policy = nil
|
||||||
|
|
||||||
|
if updateErr := c.Client.Status().Update(ctx, &cluster); updateErr != nil {
|
||||||
|
updateErr = fmt.Errorf("failed updating Status for Cluster %s: %w", cluster.Name, updateErr)
|
||||||
|
updateErrs = append(updateErrs, updateErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.Join(updateErrs...)
|
||||||
|
}
|
||||||
|
|||||||
@@ -470,16 +470,21 @@ func (c *VirtualClusterPolicyReconciler) reconcileClusters(ctx context.Context,
|
|||||||
var clusterUpdateErrs []error
|
var clusterUpdateErrs []error
|
||||||
|
|
||||||
for _, cluster := range clusters.Items {
|
for _, cluster := range clusters.Items {
|
||||||
orig := cluster.DeepCopy()
|
origStatus := cluster.Status.DeepCopy()
|
||||||
|
|
||||||
cluster.Spec.PriorityClass = policy.Spec.DefaultPriorityClass
|
cluster.Status.Policy = &v1beta1.AppliedPolicy{
|
||||||
cluster.Spec.NodeSelector = policy.Spec.DefaultNodeSelector
|
Name: policy.Name,
|
||||||
|
PriorityClass: &policy.Spec.DefaultPriorityClass,
|
||||||
|
NodeSelector: policy.Spec.DefaultNodeSelector,
|
||||||
|
}
|
||||||
|
|
||||||
if !reflect.DeepEqual(orig, cluster) {
|
if !reflect.DeepEqual(origStatus, &cluster.Status) {
|
||||||
log.V(1).Info("Updating Cluster", "cluster", cluster.Name, "namespace", namespace.Name)
|
log.V(1).Info("Updating Cluster", "cluster", cluster.Name, "namespace", namespace.Name)
|
||||||
|
|
||||||
// continue updating also the other clusters even if an error occurred
|
// continue updating also the other clusters even if an error occurred
|
||||||
clusterUpdateErrs = append(clusterUpdateErrs, c.Client.Update(ctx, &cluster))
|
if err := c.Client.Status().Update(ctx, &cluster); err != nil {
|
||||||
|
clusterUpdateErrs = append(clusterUpdateErrs, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package policy_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"reflect"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
@@ -307,7 +306,7 @@ var _ = Describe("VirtualClusterPolicy Controller", Label("controller"), Label("
|
|||||||
Expect(ns.Labels).Should(HaveKeyWithValue("pod-security.kubernetes.io/enforce-version", "latest"))
|
Expect(ns.Labels).Should(HaveKeyWithValue("pod-security.kubernetes.io/enforce-version", "latest"))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should update Cluster's PriorityClass", func() {
|
It("updates the Cluster's policy status with the DefaultPriorityClass", func() {
|
||||||
policy := newPolicy(v1beta1.VirtualClusterPolicySpec{
|
policy := newPolicy(v1beta1.VirtualClusterPolicySpec{
|
||||||
DefaultPriorityClass: "foobar",
|
DefaultPriorityClass: "foobar",
|
||||||
})
|
})
|
||||||
@@ -329,19 +328,22 @@ var _ = Describe("VirtualClusterPolicy Controller", Label("controller"), Label("
|
|||||||
err := k8sClient.Create(ctx, cluster)
|
err := k8sClient.Create(ctx, cluster)
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
|
|
||||||
// wait a bit
|
Eventually(func(g Gomega) {
|
||||||
Eventually(func() bool {
|
|
||||||
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
|
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
|
||||||
err = k8sClient.Get(ctx, key, cluster)
|
err = k8sClient.Get(ctx, key, cluster)
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
g.Expect(err).To(Not(HaveOccurred()))
|
||||||
return cluster.Spec.PriorityClass == policy.Spec.DefaultPriorityClass
|
|
||||||
|
g.Expect(cluster.Spec.PriorityClass).To(BeEmpty())
|
||||||
|
g.Expect(cluster.Status.Policy).To(Not(BeNil()))
|
||||||
|
g.Expect(cluster.Status.Policy.PriorityClass).To(Not(BeNil()))
|
||||||
|
g.Expect(*cluster.Status.Policy.PriorityClass).To(Equal(policy.Spec.DefaultPriorityClass))
|
||||||
}).
|
}).
|
||||||
WithTimeout(time.Second * 10).
|
WithTimeout(time.Second * 10).
|
||||||
WithPolling(time.Second).
|
WithPolling(time.Second).
|
||||||
Should(BeTrue())
|
Should(Succeed())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should update Cluster's NodeSelector", func() {
|
It("updates the Cluster's policy status with the DefaultNodeSelector", func() {
|
||||||
policy := newPolicy(v1beta1.VirtualClusterPolicySpec{
|
policy := newPolicy(v1beta1.VirtualClusterPolicySpec{
|
||||||
DefaultNodeSelector: map[string]string{"label-1": "value-1"},
|
DefaultNodeSelector: map[string]string{"label-1": "value-1"},
|
||||||
})
|
})
|
||||||
@@ -366,18 +368,21 @@ var _ = Describe("VirtualClusterPolicy Controller", Label("controller"), Label("
|
|||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
|
|
||||||
// wait a bit
|
// wait a bit
|
||||||
Eventually(func() bool {
|
Eventually(func(g Gomega) {
|
||||||
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
|
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
|
||||||
err = k8sClient.Get(ctx, key, cluster)
|
err = k8sClient.Get(ctx, key, cluster)
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
return reflect.DeepEqual(cluster.Spec.NodeSelector, policy.Spec.DefaultNodeSelector)
|
|
||||||
|
g.Expect(cluster.Spec.NodeSelector).To(BeEmpty())
|
||||||
|
g.Expect(cluster.Status.Policy).To(Not(BeNil()))
|
||||||
|
g.Expect(cluster.Status.Policy.NodeSelector).To(Equal(map[string]string{"label-1": "value-1"}))
|
||||||
}).
|
}).
|
||||||
WithTimeout(time.Second * 10).
|
WithTimeout(time.Second * 10).
|
||||||
WithPolling(time.Second).
|
WithPolling(time.Second).
|
||||||
Should(BeTrue())
|
Should(Succeed())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should update the nodeSelector if changed", func() {
|
It("updates the Cluster's policy status when the VCP nodeSelector changes", func() {
|
||||||
policy := newPolicy(v1beta1.VirtualClusterPolicySpec{
|
policy := newPolicy(v1beta1.VirtualClusterPolicySpec{
|
||||||
DefaultNodeSelector: map[string]string{"label-1": "value-1"},
|
DefaultNodeSelector: map[string]string{"label-1": "value-1"},
|
||||||
})
|
})
|
||||||
@@ -399,43 +404,56 @@ var _ = Describe("VirtualClusterPolicy Controller", Label("controller"), Label("
|
|||||||
err := k8sClient.Create(ctx, cluster)
|
err := k8sClient.Create(ctx, cluster)
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
|
|
||||||
Expect(cluster.Spec.NodeSelector).To(Equal(policy.Spec.DefaultNodeSelector))
|
// Cluster Spec should not change, VCP NodeSelector should be present in the Status
|
||||||
|
Eventually(func(g Gomega) {
|
||||||
|
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
|
||||||
|
err = k8sClient.Get(ctx, key, cluster)
|
||||||
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
|
|
||||||
|
g.Expect(cluster.Spec.NodeSelector).To(Equal(map[string]string{"label-1": "value-1"}))
|
||||||
|
g.Expect(cluster.Status.Policy).To(Not(BeNil()))
|
||||||
|
g.Expect(cluster.Status.Policy.NodeSelector).To(Equal(map[string]string{"label-1": "value-1"}))
|
||||||
|
}).
|
||||||
|
WithTimeout(time.Second * 10).
|
||||||
|
WithPolling(time.Second).
|
||||||
|
Should(Succeed())
|
||||||
|
|
||||||
// update the VirtualClusterPolicy
|
// update the VirtualClusterPolicy
|
||||||
policy.Spec.DefaultNodeSelector["label-2"] = "value-2"
|
policy.Spec.DefaultNodeSelector["label-2"] = "value-2"
|
||||||
err = k8sClient.Update(ctx, policy)
|
err = k8sClient.Update(ctx, policy)
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
Expect(cluster.Spec.NodeSelector).To(Not(Equal(policy.Spec.DefaultNodeSelector)))
|
|
||||||
|
|
||||||
// wait a bit
|
// wait a bit
|
||||||
Eventually(func() bool {
|
Eventually(func(g Gomega) {
|
||||||
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
|
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
|
||||||
err = k8sClient.Get(ctx, key, cluster)
|
err = k8sClient.Get(ctx, key, cluster)
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
return reflect.DeepEqual(cluster.Spec.NodeSelector, policy.Spec.DefaultNodeSelector)
|
|
||||||
|
g.Expect(cluster.Spec.NodeSelector).To(Equal(map[string]string{"label-1": "value-1"}))
|
||||||
|
g.Expect(cluster.Status.Policy).To(Not(BeNil()))
|
||||||
|
g.Expect(cluster.Status.Policy.NodeSelector).To(Equal(map[string]string{"label-1": "value-1", "label-2": "value-2"}))
|
||||||
}).
|
}).
|
||||||
WithTimeout(time.Second * 10).
|
WithTimeout(time.Second * 10).
|
||||||
WithPolling(time.Second).
|
WithPolling(time.Second).
|
||||||
Should(BeTrue())
|
Should(Succeed())
|
||||||
|
|
||||||
// Update the Cluster
|
// Update the Cluster
|
||||||
|
err = k8sClient.Get(ctx, client.ObjectKeyFromObject(cluster), cluster)
|
||||||
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
cluster.Spec.NodeSelector["label-3"] = "value-3"
|
cluster.Spec.NodeSelector["label-3"] = "value-3"
|
||||||
err = k8sClient.Update(ctx, cluster)
|
err = k8sClient.Update(ctx, cluster)
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
Expect(cluster.Spec.NodeSelector).To(Not(Equal(policy.Spec.DefaultNodeSelector)))
|
|
||||||
|
|
||||||
// wait a bit and check it's restored
|
// wait a bit and check it's restored
|
||||||
Eventually(func() bool {
|
Consistently(func(g Gomega) {
|
||||||
var updatedCluster v1beta1.Cluster
|
|
||||||
|
|
||||||
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
|
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
|
||||||
err = k8sClient.Get(ctx, key, &updatedCluster)
|
err = k8sClient.Get(ctx, key, cluster)
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
g.Expect(err).To(Not(HaveOccurred()))
|
||||||
return reflect.DeepEqual(updatedCluster.Spec.NodeSelector, policy.Spec.DefaultNodeSelector)
|
g.Expect(cluster.Spec.NodeSelector).To(Equal(map[string]string{"label-1": "value-1", "label-3": "value-3"}))
|
||||||
}).
|
}).
|
||||||
WithTimeout(time.Second * 10).
|
WithTimeout(time.Second * 10).
|
||||||
WithPolling(time.Second).
|
WithPolling(time.Second).
|
||||||
Should(BeTrue())
|
Should(Succeed())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should create a ResourceQuota if Quota is enabled", func() {
|
It("should create a ResourceQuota if Quota is enabled", func() {
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
schedv1 "k8s.io/api/scheduling/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
|
||||||
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
|
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
|
||||||
@@ -99,6 +100,100 @@ var _ = When("a cluster's status is tracked", Label(e2eTestLabel), Label(statusT
|
|||||||
WithPolling(time.Second * 5).
|
WithPolling(time.Second * 5).
|
||||||
Should(Succeed())
|
Should(Succeed())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("created with field controlled from a policy", func() {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
priorityClass := &schedv1.PriorityClass{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
GenerateName: "pc-",
|
||||||
|
},
|
||||||
|
Value: 100,
|
||||||
|
}
|
||||||
|
Expect(k8sClient.Create(ctx, priorityClass)).To(Succeed())
|
||||||
|
|
||||||
|
clusterObj := &v1beta1.Cluster{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
GenerateName: "status-cluster-",
|
||||||
|
Namespace: namespace.Name,
|
||||||
|
},
|
||||||
|
Spec: v1beta1.ClusterSpec{
|
||||||
|
PriorityClass: priorityClass.Name,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
Expect(k8sClient.Create(ctx, clusterObj)).To(Succeed())
|
||||||
|
|
||||||
|
DeferCleanup(func() {
|
||||||
|
Expect(k8sClient.Delete(ctx, priorityClass)).To(Succeed())
|
||||||
|
})
|
||||||
|
|
||||||
|
clusterKey := client.ObjectKeyFromObject(clusterObj)
|
||||||
|
|
||||||
|
// Check for the initial status to be set
|
||||||
|
Eventually(func(g Gomega) {
|
||||||
|
err := k8sClient.Get(ctx, clusterKey, clusterObj)
|
||||||
|
g.Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
g.Expect(clusterObj.Status.Phase).To(Equal(v1beta1.ClusterProvisioning))
|
||||||
|
|
||||||
|
cond := meta.FindStatusCondition(clusterObj.Status.Conditions, cluster.ConditionReady)
|
||||||
|
g.Expect(cond).NotTo(BeNil())
|
||||||
|
g.Expect(cond.Status).To(Equal(metav1.ConditionFalse))
|
||||||
|
g.Expect(cond.Reason).To(Equal(cluster.ReasonProvisioning))
|
||||||
|
}).
|
||||||
|
WithPolling(time.Second * 2).
|
||||||
|
WithTimeout(time.Second * 20).
|
||||||
|
Should(Succeed())
|
||||||
|
|
||||||
|
// Check for the status to be updated to Ready
|
||||||
|
Eventually(func(g Gomega) {
|
||||||
|
err := k8sClient.Get(ctx, clusterKey, clusterObj)
|
||||||
|
g.Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
g.Expect(clusterObj.Status.Phase).To(Equal(v1beta1.ClusterReady))
|
||||||
|
g.Expect(clusterObj.Status.Policy).To(Not(BeNil()))
|
||||||
|
g.Expect(clusterObj.Status.Policy.Name).To(Equal(vcp.Name))
|
||||||
|
|
||||||
|
cond := meta.FindStatusCondition(clusterObj.Status.Conditions, cluster.ConditionReady)
|
||||||
|
g.Expect(cond).NotTo(BeNil())
|
||||||
|
g.Expect(cond.Status).To(Equal(metav1.ConditionTrue))
|
||||||
|
g.Expect(cond.Reason).To(Equal(cluster.ReasonProvisioned))
|
||||||
|
}).
|
||||||
|
WithTimeout(time.Minute * 3).
|
||||||
|
WithPolling(time.Second * 5).
|
||||||
|
Should(Succeed())
|
||||||
|
|
||||||
|
// update policy
|
||||||
|
|
||||||
|
priorityClassVCP := &schedv1.PriorityClass{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
GenerateName: "pc-",
|
||||||
|
},
|
||||||
|
Value: 100,
|
||||||
|
}
|
||||||
|
Expect(k8sClient.Create(ctx, priorityClassVCP)).To(Succeed())
|
||||||
|
|
||||||
|
DeferCleanup(func() {
|
||||||
|
Expect(k8sClient.Delete(ctx, priorityClassVCP)).To(Succeed())
|
||||||
|
})
|
||||||
|
|
||||||
|
vcp.Spec.DefaultPriorityClass = priorityClassVCP.Name
|
||||||
|
Expect(k8sClient.Update(ctx, vcp)).To(Succeed())
|
||||||
|
|
||||||
|
// Check for the status to be updated to Ready
|
||||||
|
Eventually(func(g Gomega) {
|
||||||
|
err := k8sClient.Get(ctx, clusterKey, clusterObj)
|
||||||
|
g.Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
g.Expect(clusterObj.Status.Policy).To(Not(BeNil()))
|
||||||
|
g.Expect(clusterObj.Status.Policy.PriorityClass).To(Not(BeNil()))
|
||||||
|
g.Expect(*clusterObj.Status.Policy.PriorityClass).To(Equal(priorityClassVCP.Name))
|
||||||
|
g.Expect(clusterObj.Spec.PriorityClass).To(Equal(priorityClass.Name))
|
||||||
|
}).
|
||||||
|
WithTimeout(time.Minute * 3).
|
||||||
|
WithPolling(time.Second * 5).
|
||||||
|
Should(Succeed())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Context("and the cluster has validation errors", func() {
|
Context("and the cluster has validation errors", func() {
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ import (
|
|||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
networkingv1 "k8s.io/api/networking/v1"
|
networkingv1 "k8s.io/api/networking/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
|
||||||
|
|
||||||
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
|
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
|
||||||
|
|
||||||
@@ -77,7 +78,6 @@ var (
|
|||||||
k8s *kubernetes.Clientset
|
k8s *kubernetes.Clientset
|
||||||
k8sClient client.Client
|
k8sClient client.Client
|
||||||
kubeconfigPath string
|
kubeconfigPath string
|
||||||
repo string
|
|
||||||
helmActionConfig *action.Configuration
|
helmActionConfig *action.Configuration
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -86,17 +86,17 @@ var _ = BeforeSuite(func() {
|
|||||||
|
|
||||||
GinkgoWriter.Println("GOCOVERDIR:", os.Getenv("GOCOVERDIR"))
|
GinkgoWriter.Println("GOCOVERDIR:", os.Getenv("GOCOVERDIR"))
|
||||||
|
|
||||||
repo = os.Getenv("REPO")
|
|
||||||
if repo == "" {
|
|
||||||
repo = "rancher"
|
|
||||||
}
|
|
||||||
|
|
||||||
_, dockerInstallEnabled := os.LookupEnv("K3K_DOCKER_INSTALL")
|
_, dockerInstallEnabled := os.LookupEnv("K3K_DOCKER_INSTALL")
|
||||||
|
|
||||||
if dockerInstallEnabled {
|
if dockerInstallEnabled {
|
||||||
installK3SDocker(ctx)
|
repo := os.Getenv("REPO")
|
||||||
|
if repo == "" {
|
||||||
|
repo = "rancher"
|
||||||
|
}
|
||||||
|
|
||||||
|
installK3SDocker(ctx, repo+"/k3k", repo+"/k3k-kubelet")
|
||||||
initKubernetesClient(ctx)
|
initKubernetesClient(ctx)
|
||||||
installK3kChart()
|
installK3kChart(repo+"/k3k", repo+"/k3k-kubelet")
|
||||||
} else {
|
} else {
|
||||||
initKubernetesClient(ctx)
|
initKubernetesClient(ctx)
|
||||||
}
|
}
|
||||||
@@ -110,6 +110,11 @@ func initKubernetesClient(ctx context.Context) {
|
|||||||
kubeconfig []byte
|
kubeconfig []byte
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logger, err := zap.NewDevelopment()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
log.SetLogger(zapr.NewLogger(logger))
|
||||||
|
|
||||||
kubeconfigPath := os.Getenv("KUBECONFIG")
|
kubeconfigPath := os.Getenv("KUBECONFIG")
|
||||||
Expect(kubeconfigPath).To(Not(BeEmpty()))
|
Expect(kubeconfigPath).To(Not(BeEmpty()))
|
||||||
|
|
||||||
@@ -128,21 +133,12 @@ func initKubernetesClient(ctx context.Context) {
|
|||||||
scheme := buildScheme()
|
scheme := buildScheme()
|
||||||
k8sClient, err = client.New(restcfg, client.Options{Scheme: scheme})
|
k8sClient, err = client.New(restcfg, client.Options{Scheme: scheme})
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
logger, err := zap.NewDevelopment()
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
|
|
||||||
log.SetLogger(zapr.NewLogger(logger))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildScheme() *runtime.Scheme {
|
func buildScheme() *runtime.Scheme {
|
||||||
scheme := runtime.NewScheme()
|
scheme := runtime.NewScheme()
|
||||||
|
|
||||||
err := v1.AddToScheme(scheme)
|
err := clientgoscheme.AddToScheme(scheme)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
err = appsv1.AddToScheme(scheme)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
err = networkingv1.AddToScheme(scheme)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
err = v1beta1.AddToScheme(scheme)
|
err = v1beta1.AddToScheme(scheme)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
@@ -150,7 +146,7 @@ func buildScheme() *runtime.Scheme {
|
|||||||
return scheme
|
return scheme
|
||||||
}
|
}
|
||||||
|
|
||||||
func installK3SDocker(ctx context.Context) {
|
func installK3SDocker(ctx context.Context, controllerImage, kubeletImage string) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
kubeconfig []byte
|
kubeconfig []byte
|
||||||
@@ -182,16 +178,15 @@ func installK3SDocker(ctx context.Context) {
|
|||||||
Expect(tmpFile.Close()).To(Succeed())
|
Expect(tmpFile.Close()).To(Succeed())
|
||||||
kubeconfigPath = tmpFile.Name()
|
kubeconfigPath = tmpFile.Name()
|
||||||
|
|
||||||
err = k3sContainer.LoadImages(ctx, repo+"/k3k:dev", repo+"/k3k-kubelet:dev")
|
err = k3sContainer.LoadImages(ctx, controllerImage+":dev", kubeletImage+":dev")
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
DeferCleanup(os.Remove, kubeconfigPath)
|
DeferCleanup(os.Remove, kubeconfigPath)
|
||||||
|
|
||||||
Expect(os.Setenv("KUBECONFIG", kubeconfigPath)).To(Succeed())
|
Expect(os.Setenv("KUBECONFIG", kubeconfigPath)).To(Succeed())
|
||||||
GinkgoWriter.Print(kubeconfigPath)
|
GinkgoWriter.Printf("KUBECONFIG set to: %s\n", kubeconfigPath)
|
||||||
GinkgoWriter.Print(string(kubeconfig))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func installK3kChart() {
|
func installK3kChart(controllerImage, kubeletImage string) {
|
||||||
pwd, err := os.Getwd()
|
pwd, err := os.Getwd()
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
|
|
||||||
@@ -207,7 +202,7 @@ func installK3kChart() {
|
|||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
|
|
||||||
err = helmActionConfig.Init(restClientGetter, k3kNamespace, os.Getenv("HELM_DRIVER"), func(format string, v ...any) {
|
err = helmActionConfig.Init(restClientGetter, k3kNamespace, os.Getenv("HELM_DRIVER"), func(format string, v ...any) {
|
||||||
GinkgoWriter.Printf("helm debug: "+format+"\n", v...)
|
GinkgoWriter.Printf("[Helm] "+format+"\n", v...)
|
||||||
})
|
})
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
|
|
||||||
@@ -219,9 +214,17 @@ func installK3kChart() {
|
|||||||
iCli.Wait = true
|
iCli.Wait = true
|
||||||
|
|
||||||
controllerMap, _ := k3kChart.Values["controller"].(map[string]any)
|
controllerMap, _ := k3kChart.Values["controller"].(map[string]any)
|
||||||
|
|
||||||
|
extraEnvArray, _ := controllerMap["extraEnv"].([]map[string]any)
|
||||||
|
extraEnvArray = append(extraEnvArray, map[string]any{
|
||||||
|
"name": "DEBUG",
|
||||||
|
"value": "true",
|
||||||
|
})
|
||||||
|
controllerMap["extraEnv"] = extraEnvArray
|
||||||
|
|
||||||
imageMap, _ := controllerMap["image"].(map[string]any)
|
imageMap, _ := controllerMap["image"].(map[string]any)
|
||||||
maps.Copy(imageMap, map[string]any{
|
maps.Copy(imageMap, map[string]any{
|
||||||
"repository": repo + "/k3k",
|
"repository": controllerImage,
|
||||||
"tag": "dev",
|
"tag": "dev",
|
||||||
"pullPolicy": "IfNotPresent",
|
"pullPolicy": "IfNotPresent",
|
||||||
})
|
})
|
||||||
@@ -230,14 +233,14 @@ func installK3kChart() {
|
|||||||
sharedAgentMap, _ := agentMap["shared"].(map[string]any)
|
sharedAgentMap, _ := agentMap["shared"].(map[string]any)
|
||||||
sharedAgentImageMap, _ := sharedAgentMap["image"].(map[string]any)
|
sharedAgentImageMap, _ := sharedAgentMap["image"].(map[string]any)
|
||||||
maps.Copy(sharedAgentImageMap, map[string]any{
|
maps.Copy(sharedAgentImageMap, map[string]any{
|
||||||
"repository": repo + "/k3k-kubelet",
|
"repository": kubeletImage,
|
||||||
"tag": "dev",
|
"tag": "dev",
|
||||||
})
|
})
|
||||||
|
|
||||||
release, err := iCli.Run(k3kChart, k3kChart.Values)
|
release, err := iCli.Run(k3kChart, k3kChart.Values)
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
|
|
||||||
GinkgoWriter.Printf("Release %s installed in %s namespace\n", release.Name, release.Namespace)
|
GinkgoWriter.Printf("Helm release '%s' installed in '%s' namespace\n", release.Name, release.Namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
func patchPVC(ctx context.Context, clientset *kubernetes.Clientset) {
|
func patchPVC(ctx context.Context, clientset *kubernetes.Clientset) {
|
||||||
@@ -300,36 +303,28 @@ func patchPVC(ctx context.Context, clientset *kubernetes.Clientset) {
|
|||||||
_, err = clientset.AppsV1().Deployments(k3kNamespace).Update(ctx, k3kDeployment, metav1.UpdateOptions{})
|
_, err = clientset.AppsV1().Deployments(k3kNamespace).Update(ctx, k3kDeployment, metav1.UpdateOptions{})
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
Expect(err).To(Not(HaveOccurred()))
|
||||||
|
|
||||||
Eventually(func() bool {
|
Eventually(func(g Gomega) {
|
||||||
GinkgoWriter.Println("Checking K3k deployment status")
|
GinkgoWriter.Println("Checking K3k deployment status")
|
||||||
|
|
||||||
dep, err := clientset.AppsV1().Deployments(k3kNamespace).Get(ctx, k3kDeployment.Name, metav1.GetOptions{})
|
dep, err := clientset.AppsV1().Deployments(k3kNamespace).Get(ctx, k3kDeployment.Name, metav1.GetOptions{})
|
||||||
Expect(err).To(Not(HaveOccurred()))
|
g.Expect(err).To(Not(HaveOccurred()))
|
||||||
|
g.Expect(dep.Generation).To(Equal(dep.Status.ObservedGeneration))
|
||||||
|
|
||||||
// 1. Check if the controller has observed the latest generation
|
var availableCond appsv1.DeploymentCondition
|
||||||
if dep.Generation > dep.Status.ObservedGeneration {
|
|
||||||
GinkgoWriter.Printf("K3k deployment generation: %d, observed generation: %d\n", dep.Generation, dep.Status.ObservedGeneration)
|
for _, cond := range dep.Status.Conditions {
|
||||||
return false
|
if cond.Type == appsv1.DeploymentAvailable {
|
||||||
|
availableCond = cond
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Check if all replicas have been updated
|
g.Expect(availableCond.Type).To(Equal(appsv1.DeploymentAvailable))
|
||||||
if dep.Spec.Replicas != nil && dep.Status.UpdatedReplicas < *dep.Spec.Replicas {
|
g.Expect(availableCond.Status).To(Equal(v1.ConditionTrue))
|
||||||
GinkgoWriter.Printf("K3k deployment replicas: %d, updated replicas: %d\n", *dep.Spec.Replicas, dep.Status.UpdatedReplicas)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Check if all updated replicas are available
|
|
||||||
if dep.Status.AvailableReplicas < dep.Status.UpdatedReplicas {
|
|
||||||
GinkgoWriter.Printf("K3k deployment available replicas: %d, updated replicas: %d\n", dep.Status.AvailableReplicas, dep.Status.UpdatedReplicas)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}).
|
}).
|
||||||
MustPassRepeatedly(5).
|
|
||||||
WithPolling(time.Second).
|
WithPolling(time.Second).
|
||||||
WithTimeout(time.Second * 30).
|
WithTimeout(time.Second * 30).
|
||||||
Should(BeTrue())
|
Should(Succeed())
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ = AfterSuite(func() {
|
var _ = AfterSuite(func() {
|
||||||
|
|||||||
Reference in New Issue
Block a user