From a9ddb0aa8155b9cd0bf015845a72d95e6b67ba70 Mon Sep 17 00:00:00 2001 From: Hussein Galal Date: Tue, 5 May 2026 17:08:42 +0300 Subject: [PATCH] Filter requests. infrastructure resources for node capacity (#834) * Filter requests. infrastructure resources for node capacity Signed-off-by: galal-hussein --- k3k-kubelet/provider/configure_capacity.go | 44 ++++ .../provider/configure_capacity_test.go | 203 +++++++++++++++++- 2 files changed, 236 insertions(+), 11 deletions(-) diff --git a/k3k-kubelet/provider/configure_capacity.go b/k3k-kubelet/provider/configure_capacity.go index 63a541c..5e54920 100644 --- a/k3k-kubelet/provider/configure_capacity.go +++ b/k3k-kubelet/provider/configure_capacity.go @@ -4,6 +4,7 @@ import ( "context" "maps" "sort" + "strings" "time" "github.com/go-logr/logr" @@ -37,6 +38,19 @@ var milliScaleResources = map[corev1.ResourceName]struct{}{ corev1.ResourceLimitsEphemeralStorage: {}, } +// coreResources is a set of the core infrastructure resource requests, if a quota resource +// is in this map then it should not be reflected to the virtual node capacity. +var coreResources = map[corev1.ResourceName]struct{}{ + corev1.ResourceCPU: {}, + corev1.ResourceMemory: {}, + corev1.ResourceStorage: {}, + corev1.ResourceEphemeralStorage: {}, + corev1.ResourceRequestsCPU: {}, + corev1.ResourceRequestsMemory: {}, + corev1.ResourceRequestsStorage: {}, + corev1.ResourceRequestsEphemeralStorage: {}, +} + // StartNodeCapacityUpdater starts a goroutine that periodically updates the capacity // of the virtual node based on host node capacity and any applied ResourceQuotas. func startNodeCapacityUpdater(ctx context.Context, logger logr.Logger, hostClient client.Client, virtualClient client.Client, virtualCluster v1beta1.Cluster, virtualNodeName string) { @@ -109,6 +123,7 @@ func updateNodeCapacity(ctx context.Context, logger logr.Logger, hostClient clie } mergedQuota := mergeQuotas(resourceLists...) + mergedQuota = filterQuotas(mergedQuota) // get the node's quota and merge it with the current values m := distributeQuotas(hostResourceMap, virtResourceMap, mergedQuota) @@ -169,6 +184,16 @@ func distributeQuotas(hostResourceMap, virtResourceMap map[string]corev1.Resourc resourceMap := make(map[string]corev1.ResourceList, len(virtResourceMap)) maps.Copy(resourceMap, virtResourceMap) + // fill out any allocatable resource that does not exist in quota + for vn := range virtResourceMap { + if hostResources, ok := hostResourceMap[vn]; ok { + for resourceName, resourceQty := range hostResources { + if _, ok := quotas[resourceName]; !ok { + resourceMap[vn][resourceName] = resourceQty + } + } + } + } // Distribute each resource type from the policy's hard quota for resourceName, totalQuantity := range quotas { _, useMilli := milliScaleResources[resourceName] @@ -247,3 +272,22 @@ func distributeQuotas(hostResourceMap, virtResourceMap map[string]corev1.Resourc return resourceMap } + +// filterQuotas filters a resource list from any resource that is not eligible to be used for node capacity +// like core resources requests, it also strips requests/limits prefixes from other extended resources +// for example "requests.nvidia.com/gpu" will return back "nvidia.com/gpu" +func filterQuotas(resources corev1.ResourceList) corev1.ResourceList { + filteredResources := make(map[corev1.ResourceName]resource.Quantity) + + for resourceName, resourceValue := range resources { + if _, ok := coreResources[resourceName]; ok { + continue + } + + filteredResourceName := strings.TrimPrefix(resourceName.String(), "requests.") + filteredResourceName = strings.TrimPrefix(filteredResourceName, "limits.") + filteredResources[corev1.ResourceName(filteredResourceName)] = resourceValue + } + + return filteredResources +} diff --git a/k3k-kubelet/provider/configure_capacity_test.go b/k3k-kubelet/provider/configure_capacity_test.go index 2e5614c..c8bca5d 100644 --- a/k3k-kubelet/provider/configure_capacity_test.go +++ b/k3k-kubelet/provider/configure_capacity_test.go @@ -49,8 +49,8 @@ func Test_distributeQuotas(t *testing.T) { }, quotas: corev1.ResourceList{}, want: map[string]corev1.ResourceList{ - "node-1": {}, - "node-2": {}, + "node-1": largeAllocatable, + "node-2": largeAllocatable, }, }, { @@ -73,10 +73,12 @@ func Test_distributeQuotas(t *testing.T) { "node-1": { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("2Gi"), + corev1.ResourcePods: resource.MustParse("1000"), }, "node-2": { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("2Gi"), + corev1.ResourcePods: resource.MustParse("1000"), }, }, }, @@ -98,10 +100,12 @@ func Test_distributeQuotas(t *testing.T) { "node-1": { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("2Gi"), + corev1.ResourcePods: resource.MustParse("1000"), }, "node-2": { corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourceMemory: resource.MustParse("2Gi"), + corev1.ResourcePods: resource.MustParse("1000"), }, }, }, @@ -121,9 +125,21 @@ func Test_distributeQuotas(t *testing.T) { corev1.ResourceCPU: resource.MustParse("2"), }, want: map[string]corev1.ResourceList{ - "node-1": {corev1.ResourceCPU: resource.MustParse("667m")}, - "node-2": {corev1.ResourceCPU: resource.MustParse("667m")}, - "node-3": {corev1.ResourceCPU: resource.MustParse("666m")}, + "node-1": { + corev1.ResourceCPU: resource.MustParse("667m"), + corev1.ResourceMemory: resource.MustParse("100Gi"), + corev1.ResourcePods: resource.MustParse("1000"), + }, + "node-2": { + corev1.ResourceCPU: resource.MustParse("667m"), + corev1.ResourceMemory: resource.MustParse("100Gi"), + corev1.ResourcePods: resource.MustParse("1000"), + }, + "node-3": { + corev1.ResourceCPU: resource.MustParse("666m"), + corev1.ResourceMemory: resource.MustParse("100Gi"), + corev1.ResourcePods: resource.MustParse("1000"), + }, }, }, { @@ -144,16 +160,19 @@ func Test_distributeQuotas(t *testing.T) { }, want: map[string]corev1.ResourceList{ "node-1": { - corev1.ResourceCPU: resource.MustParse("667m"), - corev1.ResourcePods: resource.MustParse("4"), + corev1.ResourceCPU: resource.MustParse("667m"), + corev1.ResourcePods: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("100Gi"), }, "node-2": { - corev1.ResourceCPU: resource.MustParse("667m"), - corev1.ResourcePods: resource.MustParse("4"), + corev1.ResourceCPU: resource.MustParse("667m"), + corev1.ResourcePods: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("100Gi"), }, "node-3": { - corev1.ResourceCPU: resource.MustParse("666m"), - corev1.ResourcePods: resource.MustParse("3"), + corev1.ResourceCPU: resource.MustParse("666m"), + corev1.ResourcePods: resource.MustParse("3"), + corev1.ResourceMemory: resource.MustParse("100Gi"), }, }, }, @@ -275,6 +294,7 @@ func Test_distributeQuotas(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + // quotas are assumed to be filtered and merged into one list. there is a dedicated test for filtering quotas got := distributeQuotas(tt.hostResourceMap, tt.virtResourceMap, tt.quotas) assert.Equal(t, len(tt.want), len(got), "Number of nodes in result should match") @@ -294,3 +314,164 @@ func Test_distributeQuotas(t *testing.T) { }) } } + +func Test_mergeQuotas(t *testing.T) { + tests := []struct { + name string + quotasList []corev1.ResourceList + want corev1.ResourceList + }{ + { + name: "no resource lists", + quotasList: []corev1.ResourceList{}, + want: corev1.ResourceList{}, + }, + { + name: "single resource list", + quotasList: []corev1.ResourceList{ + { + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + want: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + { + name: "most restrictive value wins", + quotasList: []corev1.ResourceList{ + { + corev1.ResourceCPU: resource.MustParse("8"), + corev1.ResourceMemory: resource.MustParse("16Gi"), + }, + { + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + want: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("16Gi"), + }, + }, + { + name: "resource only in one list is included", + quotasList: []corev1.ResourceList{ + { + corev1.ResourceCPU: resource.MustParse("4"), + }, + { + corev1.ResourceCPU: resource.MustParse("8"), + corev1.ResourceMemory: resource.MustParse("16Gi"), + }, + }, + want: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("16Gi"), + }, + }, + { + name: "three lists pick minimum across all", + quotasList: []corev1.ResourceList{ + {corev1.ResourceCPU: resource.MustParse("10")}, + {corev1.ResourceCPU: resource.MustParse("6")}, + {corev1.ResourceCPU: resource.MustParse("8")}, + }, + want: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("6"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := mergeQuotas(tt.quotasList...) + + assert.Equal(t, len(tt.want), len(got), "Number of resources in merged quota should match") + + for resName, expectedQty := range tt.want { + actualQty, ok := got[resName] + assert.True(t, ok, "Resource %s not found in merged quota", resName) + assert.True(t, expectedQty.Equal(actualQty), "Resource %s did not match. want: %s, got: %s", resName, expectedQty.String(), actualQty.String()) + } + }) + } +} + +func Test_filterQuotas(t *testing.T) { + scheme := runtime.NewScheme() + err := corev1.AddToScheme(scheme) + assert.NoError(t, err) + + tests := []struct { + name string + quotas corev1.ResourceList + want corev1.ResourceList + }{ + { + name: "no quotas", + want: corev1.ResourceList{}, + }, { + name: "filter core infrastructure request resources with no requests prefix", + quotas: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + corev1.ResourceStorage: resource.MustParse("5Gi"), + corev1.ResourceEphemeralStorage: resource.MustParse("5Gi"), + }, + want: corev1.ResourceList{}, + }, { + name: "filter core infrastructure request resources with requests prefix", + quotas: corev1.ResourceList{ + corev1.ResourceRequestsCPU: resource.MustParse("500m"), + corev1.ResourceRequestsMemory: resource.MustParse("1Gi"), + corev1.ResourceRequestsStorage: resource.MustParse("5Gi"), + corev1.ResourceRequestsEphemeralStorage: resource.MustParse("5Gi"), + }, + want: corev1.ResourceList{}, + }, { + name: "trim limits prefix in core infrastructure resources", + quotas: corev1.ResourceList{ + corev1.ResourceLimitsCPU: resource.MustParse("500m"), + corev1.ResourceLimitsMemory: resource.MustParse("1Gi"), + corev1.ResourceLimitsEphemeralStorage: resource.MustParse("5Gi"), + }, + want: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("1Gi"), + corev1.ResourceEphemeralStorage: resource.MustParse("5Gi"), + }, + }, { + name: "trim requests prefix in extended resources", + quotas: corev1.ResourceList{ + "requests.nvidia.com/gpu": resource.MustParse("2"), + }, + want: corev1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("2"), + }, + }, { + name: "will not filter extended resources or object counts", + quotas: corev1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("2"), + corev1.ResourcePods: resource.MustParse("5"), + }, + want: corev1.ResourceList{ + "nvidia.com/gpu": resource.MustParse("2"), + corev1.ResourcePods: resource.MustParse("5"), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filteredQuota := filterQuotas(tt.quotas) + + for expectedResourceName, expectedResourceQty := range tt.want { + actualResourceQty, ok := filteredQuota[expectedResourceName] + assert.True(t, ok, "%s resource is not found in filtered quotas", expectedResourceName) + assert.True(t, expectedResourceQty.Equal(actualResourceQty), "Filtered Resource %s did not match. want: %s, got: %s", expectedResourceName, expectedResourceQty.String(), actualResourceQty.String()) + } + }) + } +}