From b055aeff0fd15e72093d452610f5262cb571d4cd Mon Sep 17 00:00:00 2001 From: Hussein Galal Date: Thu, 12 Mar 2026 21:28:18 +0200 Subject: [PATCH] Refactor distribution algorithm to account for host capacity (#695) * Refactor distribution algorithm to account for host capacity Signed-off-by: galal-hussein --- k3k-kubelet/provider/configure_capacity.go | 169 ++++++----- .../provider/configure_capacity_test.go | 263 ++++++++++++++---- 2 files changed, 312 insertions(+), 120 deletions(-) diff --git a/k3k-kubelet/provider/configure_capacity.go b/k3k-kubelet/provider/configure_capacity.go index 0ecf508..ed28f83 100644 --- a/k3k-kubelet/provider/configure_capacity.go +++ b/k3k-kubelet/provider/configure_capacity.go @@ -2,6 +2,7 @@ package provider import ( "context" + "maps" "sort" "time" @@ -83,11 +84,30 @@ func updateNodeCapacity(ctx context.Context, logger logr.Logger, hostClient clie mergedResourceLists := mergeResourceLists(resourceLists...) - m, err := distributeQuotas(ctx, logger, virtualClient, mergedResourceLists) - if err != nil { - logger.Error(err, "error distributing policy quota") + var virtualNodeList, hostNodeList corev1.NodeList + + if err := virtualClient.List(ctx, &virtualNodeList); err != nil { + logger.Error(err, "error listing virtual nodes for stable capacity distribution") } + virtResourceMap := make(map[string]corev1.ResourceList) + for _, vNode := range virtualNodeList.Items { + virtResourceMap[vNode.Name] = corev1.ResourceList{} + } + + if err := hostClient.List(ctx, &hostNodeList); err != nil { + logger.Error(err, "error listing host nodes for stable capacity distribution") + } + + hostResourceMap := make(map[string]corev1.ResourceList) + + for _, hNode := range hostNodeList.Items { + if _, ok := virtResourceMap[hNode.Name]; ok { + hostResourceMap[hNode.Name] = hNode.Status.Allocatable + } + } + + m := distributeQuotas(hostResourceMap, virtResourceMap, mergedResourceLists) allocatable = m[virtualNodeName] } @@ -125,76 +145,99 @@ func mergeResourceLists(resourceLists ...corev1.ResourceList) corev1.ResourceLis return merged } -// distributeQuotas divides the total resource quotas evenly among all active virtual nodes. -// This ensures that each virtual node reports a fair share of the available resources, -// preventing the scheduler from overloading a single node. +// distributeQuotas divides the total resource quotas among all active virtual nodes, +// capped by each node's actual host capacity. This ensures that each virtual node +// reports a fair share of the available resources without exceeding what its +// underlying host node can provide. // -// The algorithm iterates over each resource, divides it as evenly as possible among the -// sorted virtual nodes, and distributes any remainder to the first few nodes to ensure -// all resources are allocated. Sorting the nodes by name guarantees a deterministic -// distribution. -func distributeQuotas(ctx context.Context, logger logr.Logger, virtualClient client.Client, quotas corev1.ResourceList) (map[string]corev1.ResourceList, error) { - // List all virtual nodes to distribute the quota stably. - var virtualNodeList corev1.NodeList - if err := virtualClient.List(ctx, &virtualNodeList); err != nil { - logger.Error(err, "error listing virtual nodes for stable capacity distribution, falling back to full quota") - return nil, err - } - - // If there are no virtual nodes, there's nothing to distribute. - numNodes := int64(len(virtualNodeList.Items)) - if numNodes == 0 { - logger.Info("error listing virtual nodes for stable capacity distribution, falling back to full quota") - return nil, nil - } - - // Sort nodes by name for a deterministic distribution of resources. - sort.Slice(virtualNodeList.Items, func(i, j int) bool { - return virtualNodeList.Items[i].Name < virtualNodeList.Items[j].Name - }) - - // Initialize the resource map for each virtual node. - resourceMap := make(map[string]corev1.ResourceList) - for _, virtualNode := range virtualNodeList.Items { - resourceMap[virtualNode.Name] = corev1.ResourceList{} - } +// For each resource type the algorithm uses a multi-pass redistribution loop: +// 1. Divide the remaining quota evenly among eligible nodes (sorted by name for +// determinism), assigning any integer remainder to the first nodes alphabetically. +// 2. Cap each node's share at its host allocatable capacity. +// 3. Remove nodes that have reached their host capacity. +// 4. If there is still unallocated quota (because some nodes were capped below their +// even share), repeat from step 1 with the remaining quota and remaining nodes. +// +// The loop terminates when the quota is fully distributed or no eligible nodes remain. +func distributeQuotas(hostResourceMap, virtResourceMap map[string]corev1.ResourceList, quotas corev1.ResourceList) map[string]corev1.ResourceList { + resourceMap := make(map[string]corev1.ResourceList, len(virtResourceMap)) + maps.Copy(resourceMap, virtResourceMap) // Distribute each resource type from the policy's hard quota for resourceName, totalQuantity := range quotas { - // Use MilliValue for precise division, especially for resources like CPU, - // which are often expressed in milli-units. Otherwise, use the standard Value(). - var totalValue int64 - if _, found := milliScaleResources[resourceName]; found { - totalValue = totalQuantity.MilliValue() - } else { - totalValue = totalQuantity.Value() + _, useMilli := milliScaleResources[resourceName] + + // eligible nodes for each distribution cycle + var eligibleNodes []string + + hostCap := make(map[string]int64) + + // Populate the host nodes capacity map and the initial effective nodes + for vn := range virtResourceMap { + hostNodeResources := hostResourceMap[vn] + if hostNodeResources == nil { + continue + } + + resourceQuantity, found := hostNodeResources[resourceName] + if !found { + // skip the node if the resource does not exist on the host node + continue + } + + hostCap[vn] = resourceQuantity.Value() + if useMilli { + hostCap[vn] = resourceQuantity.MilliValue() + } + + eligibleNodes = append(eligibleNodes, vn) } - // Calculate the base quantity of the resource to be allocated per node. - // and the remainder that needs to be distributed among the nodes. - // - // For example, if totalValue is 2000 (e.g., 2 CPU) and there are 3 nodes: - // - quantityPerNode would be 666 (2000 / 3) - // - remainder would be 2 (2000 % 3) - // The first two nodes would get 667 (666 + 1), and the last one would get 666. - quantityPerNode := totalValue / numNodes - remainder := totalValue % numNodes + sort.Strings(eligibleNodes) - // Iterate through the sorted virtual nodes to distribute the resource. - for _, virtualNode := range virtualNodeList.Items { - nodeQuantity := quantityPerNode - if remainder > 0 { - nodeQuantity++ - remainder-- + totalValue := totalQuantity.Value() + if useMilli { + totalValue = totalQuantity.MilliValue() + } + + // Start of the distribution cycle, each cycle will distribute the quota resource + // evenly between nodes, each node can not exceed the corresponding host node capacity + for totalValue > 0 && len(eligibleNodes) > 0 { + nodeNum := int64(len(eligibleNodes)) + quantityPerNode := totalValue / nodeNum + remainder := totalValue % nodeNum + + remainingNodes := []string{} + + for _, virtualNodeName := range eligibleNodes { + nodeQuantity := quantityPerNode + if remainder > 0 { + nodeQuantity++ + remainder-- + } + // We cap the quantity to the hostNode capacity + nodeQuantity = min(nodeQuantity, hostCap[virtualNodeName]) + + if nodeQuantity > 0 { + existing := resourceMap[virtualNodeName][resourceName] + if useMilli { + resourceMap[virtualNodeName][resourceName] = *resource.NewMilliQuantity(existing.MilliValue()+nodeQuantity, totalQuantity.Format) + } else { + resourceMap[virtualNodeName][resourceName] = *resource.NewQuantity(existing.Value()+nodeQuantity, totalQuantity.Format) + } + } + + totalValue -= nodeQuantity + hostCap[virtualNodeName] -= nodeQuantity + + if hostCap[virtualNodeName] > 0 { + remainingNodes = append(remainingNodes, virtualNodeName) + } } - if _, found := milliScaleResources[resourceName]; found { - resourceMap[virtualNode.Name][resourceName] = *resource.NewMilliQuantity(nodeQuantity, totalQuantity.Format) - } else { - resourceMap[virtualNode.Name][resourceName] = *resource.NewQuantity(nodeQuantity, totalQuantity.Format) - } + eligibleNodes = remainingNodes } } - return resourceMap, nil + return resourceMap } diff --git a/k3k-kubelet/provider/configure_capacity_test.go b/k3k-kubelet/provider/configure_capacity_test.go index 3dbda31..2e5614c 100644 --- a/k3k-kubelet/provider/configure_capacity_test.go +++ b/k3k-kubelet/provider/configure_capacity_test.go @@ -1,19 +1,13 @@ package provider import ( - "context" "testing" - "github.com/go-logr/zapr" "github.com/stretchr/testify/assert" - "go.uber.org/zap" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/fake" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func Test_distributeQuotas(t *testing.T) { @@ -21,39 +15,56 @@ func Test_distributeQuotas(t *testing.T) { err := corev1.AddToScheme(scheme) assert.NoError(t, err) - node1 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}} - node2 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2"}} - node3 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-3"}} + // Large allocatable so capping doesn't interfere with basic distribution tests. + largeAllocatable := corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + corev1.ResourceMemory: resource.MustParse("100Gi"), + corev1.ResourcePods: resource.MustParse("1000"), + } tests := []struct { - name string - virtualNodes []client.Object - quotas corev1.ResourceList - want map[string]corev1.ResourceList - wantErr bool + name string + virtResourceMap map[string]corev1.ResourceList + hostResourceMap map[string]corev1.ResourceList + quotas corev1.ResourceList + want map[string]corev1.ResourceList }{ { - name: "no virtual nodes", - virtualNodes: []client.Object{}, + name: "no virtual nodes", + virtResourceMap: map[string]corev1.ResourceList{}, quotas: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("2"), }, - want: map[string]corev1.ResourceList{}, - wantErr: false, + want: map[string]corev1.ResourceList{}, }, { - name: "no quotas", - virtualNodes: []client.Object{node1, node2}, - quotas: corev1.ResourceList{}, + name: "no quotas", + virtResourceMap: map[string]corev1.ResourceList{ + "node-1": {}, + "node-2": {}, + }, + hostResourceMap: map[string]corev1.ResourceList{ + "node-1": largeAllocatable, + "node-2": largeAllocatable, + }, + quotas: corev1.ResourceList{}, want: map[string]corev1.ResourceList{ "node-1": {}, "node-2": {}, }, - wantErr: false, }, { - name: "even distribution of cpu and memory", - virtualNodes: []client.Object{node1, node2}, + name: "fewer virtual nodes than host nodes", + virtResourceMap: map[string]corev1.ResourceList{ + "node-1": {}, + "node-2": {}, + }, + hostResourceMap: map[string]corev1.ResourceList{ + "node-1": largeAllocatable, + "node-2": largeAllocatable, + "node-3": largeAllocatable, + "node-4": largeAllocatable, + }, quotas: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("2"), corev1.ResourceMemory: resource.MustParse("4Gi"), @@ -68,65 +79,203 @@ func Test_distributeQuotas(t *testing.T) { corev1.ResourceMemory: resource.MustParse("2Gi"), }, }, - wantErr: false, }, { - name: "uneven distribution with remainder", - virtualNodes: []client.Object{node1, node2, node3}, + name: "even distribution of cpu and memory", + virtResourceMap: map[string]corev1.ResourceList{ + "node-1": {}, + "node-2": {}, + }, + hostResourceMap: map[string]corev1.ResourceList{ + "node-1": largeAllocatable, + "node-2": largeAllocatable, + }, quotas: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), // 2000m / 3 = 666m with 2m remainder + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + }, + want: map[string]corev1.ResourceList{ + "node-1": { + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + "node-2": { + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }, + }, + { + name: "uneven distribution with remainder", + virtResourceMap: map[string]corev1.ResourceList{ + "node-1": {}, + "node-2": {}, + "node-3": {}, + }, + hostResourceMap: map[string]corev1.ResourceList{ + "node-1": largeAllocatable, + "node-2": largeAllocatable, + "node-3": largeAllocatable, + }, + quotas: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), }, want: map[string]corev1.ResourceList{ "node-1": {corev1.ResourceCPU: resource.MustParse("667m")}, "node-2": {corev1.ResourceCPU: resource.MustParse("667m")}, "node-3": {corev1.ResourceCPU: resource.MustParse("666m")}, }, - wantErr: false, }, { - name: "distribution of number resources", - virtualNodes: []client.Object{node1, node2, node3}, + name: "distribution of number resources", + virtResourceMap: map[string]corev1.ResourceList{ + "node-1": {}, + "node-2": {}, + "node-3": {}, + }, + hostResourceMap: map[string]corev1.ResourceList{ + "node-1": largeAllocatable, + "node-2": largeAllocatable, + "node-3": largeAllocatable, + }, quotas: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourcePods: resource.MustParse("11"), - corev1.ResourceSecrets: resource.MustParse("9"), - "custom": resource.MustParse("8"), + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourcePods: resource.MustParse("11"), }, want: map[string]corev1.ResourceList{ "node-1": { - corev1.ResourceCPU: resource.MustParse("667m"), - corev1.ResourcePods: resource.MustParse("4"), - corev1.ResourceSecrets: resource.MustParse("3"), - "custom": resource.MustParse("3"), + corev1.ResourceCPU: resource.MustParse("667m"), + corev1.ResourcePods: resource.MustParse("4"), }, "node-2": { - corev1.ResourceCPU: resource.MustParse("667m"), - corev1.ResourcePods: resource.MustParse("4"), - corev1.ResourceSecrets: resource.MustParse("3"), - "custom": resource.MustParse("3"), + corev1.ResourceCPU: resource.MustParse("667m"), + corev1.ResourcePods: resource.MustParse("4"), }, "node-3": { - corev1.ResourceCPU: resource.MustParse("666m"), - corev1.ResourcePods: resource.MustParse("3"), - corev1.ResourceSecrets: resource.MustParse("3"), - "custom": resource.MustParse("2"), + corev1.ResourceCPU: resource.MustParse("666m"), + corev1.ResourcePods: resource.MustParse("3"), }, }, - wantErr: false, + }, + { + name: "extended resource distributed only to nodes that have it", + virtResourceMap: map[string]corev1.ResourceList{ + "node-1": {}, + "node-2": {}, + "node-3": {}, + }, + hostResourceMap: map[string]corev1.ResourceList{ + "node-1": { + corev1.ResourceCPU: resource.MustParse("100"), + "nvidia.com/gpu": resource.MustParse("2"), + }, + "node-2": { + corev1.ResourceCPU: resource.MustParse("100"), + }, + "node-3": { + corev1.ResourceCPU: resource.MustParse("100"), + "nvidia.com/gpu": resource.MustParse("4"), + }, + }, + quotas: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("3"), + "nvidia.com/gpu": resource.MustParse("4"), + }, + want: map[string]corev1.ResourceList{ + "node-1": { + corev1.ResourceCPU: resource.MustParse("1"), + "nvidia.com/gpu": resource.MustParse("2"), + }, + "node-2": { + corev1.ResourceCPU: resource.MustParse("1"), + }, + "node-3": { + corev1.ResourceCPU: resource.MustParse("1"), + "nvidia.com/gpu": resource.MustParse("2"), + }, + }, + }, + { + name: "capping at host capacity with redistribution", + virtResourceMap: map[string]corev1.ResourceList{ + "node-1": {}, + "node-2": {}, + }, + hostResourceMap: map[string]corev1.ResourceList{ + "node-1": { + corev1.ResourceCPU: resource.MustParse("8"), + }, + "node-2": { + corev1.ResourceCPU: resource.MustParse("2"), + }, + }, + quotas: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("6"), + }, + // Even split would be 3 each, but node-2 only has 2 CPU. + // node-2 gets capped at 2, the remaining 1 goes to node-1. + want: map[string]corev1.ResourceList{ + "node-1": {corev1.ResourceCPU: resource.MustParse("4")}, + "node-2": {corev1.ResourceCPU: resource.MustParse("2")}, + }, + }, + { + name: "gpu capping with uneven host capacity", + virtResourceMap: map[string]corev1.ResourceList{ + "node-1": {}, + "node-2": {}, + }, + hostResourceMap: map[string]corev1.ResourceList{ + "node-1": { + "nvidia.com/gpu": resource.MustParse("6"), + }, + "node-2": { + "nvidia.com/gpu": resource.MustParse("1"), + }, + }, + quotas: corev1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("4"), + }, + // Even split would be 2 each, but node-2 only has 1 GPU. + // node-2 gets capped at 1, the remaining 1 goes to node-1. + want: map[string]corev1.ResourceList{ + "node-1": {"nvidia.com/gpu": resource.MustParse("3")}, + "node-2": {"nvidia.com/gpu": resource.MustParse("1")}, + }, + }, + { + name: "quota exceeds total host capacity", + virtResourceMap: map[string]corev1.ResourceList{ + "node-1": {}, + "node-2": {}, + "node-3": {}, + }, + hostResourceMap: map[string]corev1.ResourceList{ + "node-1": { + "nvidia.com/gpu": resource.MustParse("2"), + }, + "node-2": { + "nvidia.com/gpu": resource.MustParse("1"), + }, + "node-3": { + "nvidia.com/gpu": resource.MustParse("1"), + }, + }, + quotas: corev1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("10"), + }, + // Total host capacity is 4, quota is 10. Each node gets its full capacity. + want: map[string]corev1.ResourceList{ + "node-1": {"nvidia.com/gpu": resource.MustParse("2")}, + "node-2": {"nvidia.com/gpu": resource.MustParse("1")}, + "node-3": {"nvidia.com/gpu": resource.MustParse("1")}, + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(tt.virtualNodes...).Build() - logger := zapr.NewLogger(zap.NewNop()) - - got, gotErr := distributeQuotas(context.Background(), logger, fakeClient, tt.quotas) - if tt.wantErr { - assert.Error(t, gotErr) - } else { - assert.NoError(t, gotErr) - } + got := distributeQuotas(tt.hostResourceMap, tt.virtResourceMap, tt.quotas) assert.Equal(t, len(tt.want), len(got), "Number of nodes in result should match")