Refactor distribution algorithm to account for host capacity (#695)

* Refactor distribution algorithm to account for host capacity

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
This commit is contained in:
Hussein Galal
2026-03-12 21:28:18 +02:00
committed by GitHub
parent 5ef99f83ee
commit b055aeff0f
2 changed files with 312 additions and 120 deletions

View File

@@ -2,6 +2,7 @@ package provider
import (
"context"
"maps"
"sort"
"time"
@@ -83,11 +84,30 @@ func updateNodeCapacity(ctx context.Context, logger logr.Logger, hostClient clie
mergedResourceLists := mergeResourceLists(resourceLists...)
m, err := distributeQuotas(ctx, logger, virtualClient, mergedResourceLists)
if err != nil {
logger.Error(err, "error distributing policy quota")
var virtualNodeList, hostNodeList corev1.NodeList
if err := virtualClient.List(ctx, &virtualNodeList); err != nil {
logger.Error(err, "error listing virtual nodes for stable capacity distribution")
}
virtResourceMap := make(map[string]corev1.ResourceList)
for _, vNode := range virtualNodeList.Items {
virtResourceMap[vNode.Name] = corev1.ResourceList{}
}
if err := hostClient.List(ctx, &hostNodeList); err != nil {
logger.Error(err, "error listing host nodes for stable capacity distribution")
}
hostResourceMap := make(map[string]corev1.ResourceList)
for _, hNode := range hostNodeList.Items {
if _, ok := virtResourceMap[hNode.Name]; ok {
hostResourceMap[hNode.Name] = hNode.Status.Allocatable
}
}
m := distributeQuotas(hostResourceMap, virtResourceMap, mergedResourceLists)
allocatable = m[virtualNodeName]
}
@@ -125,76 +145,99 @@ func mergeResourceLists(resourceLists ...corev1.ResourceList) corev1.ResourceLis
return merged
}
// distributeQuotas divides the total resource quotas evenly among all active virtual nodes.
// This ensures that each virtual node reports a fair share of the available resources,
// preventing the scheduler from overloading a single node.
// distributeQuotas divides the total resource quotas among all active virtual nodes,
// capped by each node's actual host capacity. This ensures that each virtual node
// reports a fair share of the available resources without exceeding what its
// underlying host node can provide.
//
// The algorithm iterates over each resource, divides it as evenly as possible among the
// sorted virtual nodes, and distributes any remainder to the first few nodes to ensure
// all resources are allocated. Sorting the nodes by name guarantees a deterministic
// distribution.
func distributeQuotas(ctx context.Context, logger logr.Logger, virtualClient client.Client, quotas corev1.ResourceList) (map[string]corev1.ResourceList, error) {
// List all virtual nodes to distribute the quota stably.
var virtualNodeList corev1.NodeList
if err := virtualClient.List(ctx, &virtualNodeList); err != nil {
logger.Error(err, "error listing virtual nodes for stable capacity distribution, falling back to full quota")
return nil, err
}
// If there are no virtual nodes, there's nothing to distribute.
numNodes := int64(len(virtualNodeList.Items))
if numNodes == 0 {
logger.Info("error listing virtual nodes for stable capacity distribution, falling back to full quota")
return nil, nil
}
// Sort nodes by name for a deterministic distribution of resources.
sort.Slice(virtualNodeList.Items, func(i, j int) bool {
return virtualNodeList.Items[i].Name < virtualNodeList.Items[j].Name
})
// Initialize the resource map for each virtual node.
resourceMap := make(map[string]corev1.ResourceList)
for _, virtualNode := range virtualNodeList.Items {
resourceMap[virtualNode.Name] = corev1.ResourceList{}
}
// For each resource type the algorithm uses a multi-pass redistribution loop:
// 1. Divide the remaining quota evenly among eligible nodes (sorted by name for
// determinism), assigning any integer remainder to the first nodes alphabetically.
// 2. Cap each node's share at its host allocatable capacity.
// 3. Remove nodes that have reached their host capacity.
// 4. If there is still unallocated quota (because some nodes were capped below their
// even share), repeat from step 1 with the remaining quota and remaining nodes.
//
// The loop terminates when the quota is fully distributed or no eligible nodes remain.
func distributeQuotas(hostResourceMap, virtResourceMap map[string]corev1.ResourceList, quotas corev1.ResourceList) map[string]corev1.ResourceList {
resourceMap := make(map[string]corev1.ResourceList, len(virtResourceMap))
maps.Copy(resourceMap, virtResourceMap)
// Distribute each resource type from the policy's hard quota
for resourceName, totalQuantity := range quotas {
// Use MilliValue for precise division, especially for resources like CPU,
// which are often expressed in milli-units. Otherwise, use the standard Value().
var totalValue int64
if _, found := milliScaleResources[resourceName]; found {
totalValue = totalQuantity.MilliValue()
} else {
totalValue = totalQuantity.Value()
_, useMilli := milliScaleResources[resourceName]
// eligible nodes for each distribution cycle
var eligibleNodes []string
hostCap := make(map[string]int64)
// Populate the host nodes capacity map and the initial effective nodes
for vn := range virtResourceMap {
hostNodeResources := hostResourceMap[vn]
if hostNodeResources == nil {
continue
}
resourceQuantity, found := hostNodeResources[resourceName]
if !found {
// skip the node if the resource does not exist on the host node
continue
}
hostCap[vn] = resourceQuantity.Value()
if useMilli {
hostCap[vn] = resourceQuantity.MilliValue()
}
eligibleNodes = append(eligibleNodes, vn)
}
// Calculate the base quantity of the resource to be allocated per node.
// and the remainder that needs to be distributed among the nodes.
//
// For example, if totalValue is 2000 (e.g., 2 CPU) and there are 3 nodes:
// - quantityPerNode would be 666 (2000 / 3)
// - remainder would be 2 (2000 % 3)
// The first two nodes would get 667 (666 + 1), and the last one would get 666.
quantityPerNode := totalValue / numNodes
remainder := totalValue % numNodes
sort.Strings(eligibleNodes)
// Iterate through the sorted virtual nodes to distribute the resource.
for _, virtualNode := range virtualNodeList.Items {
nodeQuantity := quantityPerNode
if remainder > 0 {
nodeQuantity++
remainder--
totalValue := totalQuantity.Value()
if useMilli {
totalValue = totalQuantity.MilliValue()
}
// Start of the distribution cycle, each cycle will distribute the quota resource
// evenly between nodes, each node can not exceed the corresponding host node capacity
for totalValue > 0 && len(eligibleNodes) > 0 {
nodeNum := int64(len(eligibleNodes))
quantityPerNode := totalValue / nodeNum
remainder := totalValue % nodeNum
remainingNodes := []string{}
for _, virtualNodeName := range eligibleNodes {
nodeQuantity := quantityPerNode
if remainder > 0 {
nodeQuantity++
remainder--
}
// We cap the quantity to the hostNode capacity
nodeQuantity = min(nodeQuantity, hostCap[virtualNodeName])
if nodeQuantity > 0 {
existing := resourceMap[virtualNodeName][resourceName]
if useMilli {
resourceMap[virtualNodeName][resourceName] = *resource.NewMilliQuantity(existing.MilliValue()+nodeQuantity, totalQuantity.Format)
} else {
resourceMap[virtualNodeName][resourceName] = *resource.NewQuantity(existing.Value()+nodeQuantity, totalQuantity.Format)
}
}
totalValue -= nodeQuantity
hostCap[virtualNodeName] -= nodeQuantity
if hostCap[virtualNodeName] > 0 {
remainingNodes = append(remainingNodes, virtualNodeName)
}
}
if _, found := milliScaleResources[resourceName]; found {
resourceMap[virtualNode.Name][resourceName] = *resource.NewMilliQuantity(nodeQuantity, totalQuantity.Format)
} else {
resourceMap[virtualNode.Name][resourceName] = *resource.NewQuantity(nodeQuantity, totalQuantity.Format)
}
eligibleNodes = remainingNodes
}
}
return resourceMap, nil
return resourceMap
}

View File

@@ -1,19 +1,13 @@
package provider
import (
"context"
"testing"
"github.com/go-logr/zapr"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func Test_distributeQuotas(t *testing.T) {
@@ -21,39 +15,56 @@ func Test_distributeQuotas(t *testing.T) {
err := corev1.AddToScheme(scheme)
assert.NoError(t, err)
node1 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}
node2 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2"}}
node3 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-3"}}
// Large allocatable so capping doesn't interfere with basic distribution tests.
largeAllocatable := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
corev1.ResourceMemory: resource.MustParse("100Gi"),
corev1.ResourcePods: resource.MustParse("1000"),
}
tests := []struct {
name string
virtualNodes []client.Object
quotas corev1.ResourceList
want map[string]corev1.ResourceList
wantErr bool
name string
virtResourceMap map[string]corev1.ResourceList
hostResourceMap map[string]corev1.ResourceList
quotas corev1.ResourceList
want map[string]corev1.ResourceList
}{
{
name: "no virtual nodes",
virtualNodes: []client.Object{},
name: "no virtual nodes",
virtResourceMap: map[string]corev1.ResourceList{},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
},
want: map[string]corev1.ResourceList{},
wantErr: false,
want: map[string]corev1.ResourceList{},
},
{
name: "no quotas",
virtualNodes: []client.Object{node1, node2},
quotas: corev1.ResourceList{},
name: "no quotas",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": largeAllocatable,
"node-2": largeAllocatable,
},
quotas: corev1.ResourceList{},
want: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
},
wantErr: false,
},
{
name: "even distribution of cpu and memory",
virtualNodes: []client.Object{node1, node2},
name: "fewer virtual nodes than host nodes",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": largeAllocatable,
"node-2": largeAllocatable,
"node-3": largeAllocatable,
"node-4": largeAllocatable,
},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("4Gi"),
@@ -68,65 +79,203 @@ func Test_distributeQuotas(t *testing.T) {
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
},
wantErr: false,
},
{
name: "uneven distribution with remainder",
virtualNodes: []client.Object{node1, node2, node3},
name: "even distribution of cpu and memory",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": largeAllocatable,
"node-2": largeAllocatable,
},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"), // 2000m / 3 = 666m with 2m remainder
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("4Gi"),
},
want: map[string]corev1.ResourceList{
"node-1": {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
"node-2": {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
},
},
{
name: "uneven distribution with remainder",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
"node-3": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": largeAllocatable,
"node-2": largeAllocatable,
"node-3": largeAllocatable,
},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
},
want: map[string]corev1.ResourceList{
"node-1": {corev1.ResourceCPU: resource.MustParse("667m")},
"node-2": {corev1.ResourceCPU: resource.MustParse("667m")},
"node-3": {corev1.ResourceCPU: resource.MustParse("666m")},
},
wantErr: false,
},
{
name: "distribution of number resources",
virtualNodes: []client.Object{node1, node2, node3},
name: "distribution of number resources",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
"node-3": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": largeAllocatable,
"node-2": largeAllocatable,
"node-3": largeAllocatable,
},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourcePods: resource.MustParse("11"),
corev1.ResourceSecrets: resource.MustParse("9"),
"custom": resource.MustParse("8"),
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourcePods: resource.MustParse("11"),
},
want: map[string]corev1.ResourceList{
"node-1": {
corev1.ResourceCPU: resource.MustParse("667m"),
corev1.ResourcePods: resource.MustParse("4"),
corev1.ResourceSecrets: resource.MustParse("3"),
"custom": resource.MustParse("3"),
corev1.ResourceCPU: resource.MustParse("667m"),
corev1.ResourcePods: resource.MustParse("4"),
},
"node-2": {
corev1.ResourceCPU: resource.MustParse("667m"),
corev1.ResourcePods: resource.MustParse("4"),
corev1.ResourceSecrets: resource.MustParse("3"),
"custom": resource.MustParse("3"),
corev1.ResourceCPU: resource.MustParse("667m"),
corev1.ResourcePods: resource.MustParse("4"),
},
"node-3": {
corev1.ResourceCPU: resource.MustParse("666m"),
corev1.ResourcePods: resource.MustParse("3"),
corev1.ResourceSecrets: resource.MustParse("3"),
"custom": resource.MustParse("2"),
corev1.ResourceCPU: resource.MustParse("666m"),
corev1.ResourcePods: resource.MustParse("3"),
},
},
wantErr: false,
},
{
name: "extended resource distributed only to nodes that have it",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
"node-3": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": {
corev1.ResourceCPU: resource.MustParse("100"),
"nvidia.com/gpu": resource.MustParse("2"),
},
"node-2": {
corev1.ResourceCPU: resource.MustParse("100"),
},
"node-3": {
corev1.ResourceCPU: resource.MustParse("100"),
"nvidia.com/gpu": resource.MustParse("4"),
},
},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("3"),
"nvidia.com/gpu": resource.MustParse("4"),
},
want: map[string]corev1.ResourceList{
"node-1": {
corev1.ResourceCPU: resource.MustParse("1"),
"nvidia.com/gpu": resource.MustParse("2"),
},
"node-2": {
corev1.ResourceCPU: resource.MustParse("1"),
},
"node-3": {
corev1.ResourceCPU: resource.MustParse("1"),
"nvidia.com/gpu": resource.MustParse("2"),
},
},
},
{
name: "capping at host capacity with redistribution",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": {
corev1.ResourceCPU: resource.MustParse("8"),
},
"node-2": {
corev1.ResourceCPU: resource.MustParse("2"),
},
},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
},
// Even split would be 3 each, but node-2 only has 2 CPU.
// node-2 gets capped at 2, the remaining 1 goes to node-1.
want: map[string]corev1.ResourceList{
"node-1": {corev1.ResourceCPU: resource.MustParse("4")},
"node-2": {corev1.ResourceCPU: resource.MustParse("2")},
},
},
{
name: "gpu capping with uneven host capacity",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": {
"nvidia.com/gpu": resource.MustParse("6"),
},
"node-2": {
"nvidia.com/gpu": resource.MustParse("1"),
},
},
quotas: corev1.ResourceList{
"nvidia.com/gpu": resource.MustParse("4"),
},
// Even split would be 2 each, but node-2 only has 1 GPU.
// node-2 gets capped at 1, the remaining 1 goes to node-1.
want: map[string]corev1.ResourceList{
"node-1": {"nvidia.com/gpu": resource.MustParse("3")},
"node-2": {"nvidia.com/gpu": resource.MustParse("1")},
},
},
{
name: "quota exceeds total host capacity",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
"node-3": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": {
"nvidia.com/gpu": resource.MustParse("2"),
},
"node-2": {
"nvidia.com/gpu": resource.MustParse("1"),
},
"node-3": {
"nvidia.com/gpu": resource.MustParse("1"),
},
},
quotas: corev1.ResourceList{
"nvidia.com/gpu": resource.MustParse("10"),
},
// Total host capacity is 4, quota is 10. Each node gets its full capacity.
want: map[string]corev1.ResourceList{
"node-1": {"nvidia.com/gpu": resource.MustParse("2")},
"node-2": {"nvidia.com/gpu": resource.MustParse("1")},
"node-3": {"nvidia.com/gpu": resource.MustParse("1")},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(tt.virtualNodes...).Build()
logger := zapr.NewLogger(zap.NewNop())
got, gotErr := distributeQuotas(context.Background(), logger, fakeClient, tt.quotas)
if tt.wantErr {
assert.Error(t, gotErr)
} else {
assert.NoError(t, gotErr)
}
got := distributeQuotas(tt.hostResourceMap, tt.virtResourceMap, tt.quotas)
assert.Equal(t, len(tt.want), len(got), "Number of nodes in result should match")