Compare commits

..

8 Commits

Author SHA1 Message Date
renovate-rancher[bot]
8d87e8a296 chore(deps): update github actions 2026-03-18 05:09:26 +00:00
Enrico Candino
7641a1c9c5 Add sync of Host StorageClasses (#681)
* initial impl

* wip test

* fix

* wip tests

* Refactor storage class sync logic and enhance test coverage

* fix test

* remove storageclass sync test

* removed commented code

* added sync to cluster status to apply policy configuration

* fix for storageClass policy indexes

* fix for missing indexed field, and label sync

* - update sync options descriptions for resource types
- added storage class tests sync with policy
- requested changes

* fix for nil map
2026-03-17 16:53:29 +01:00
Hussein Galal
fcb05793b1 Refactor distribution algorithm to account for host capacity (#688)
* Refactor distribution algorithm to account for host capacity

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* wsl

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* wsl

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* simplify the useMilli condition

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* use nodelists instead of passing clients

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* only pass resource maps for both virtual and host nodes

Signed-off-by: hussein <hussein@thinkpad-hussein.hgalal.az>

* fixes

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

* wsl

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>

---------

Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
Signed-off-by: hussein <hussein@thinkpad-hussein.hgalal.az>
Co-authored-by: hussein <hussein@thinkpad-hussein.hgalal.az>
2026-03-12 16:52:37 +02:00
Enrico Candino
83b4415f02 refactor: streamline K3S Docker installation and chart setup with dynamic repository handling (#692) 2026-03-12 11:11:12 +01:00
Hussein Galal
cd72bcbc15 use apireader instead of client for node registration in mirror host node (#686)
Signed-off-by: galal-hussein <hussein.galal.ahmed.11@gmail.com>
2026-03-10 17:43:42 +02:00
Enrico Candino
9836f8376d Added policy in Cluster Status (#663)
* initial implementation

restored policyName

* added test, fixed priority scheduling

* requested changes from review

- wrapped errors
- fixed some kube-api-linter issues to match k8s conventions
- moved policy namespace check in the same condition branch
2026-02-17 16:15:13 +01:00
Andreas Kupries
dba054786e Merge pull request #659 from andreas-kupries/syncer-controller-owner
change ControllerReferences over to OwnerReferences
2026-02-17 11:54:30 +01:00
Andreas Kupries
c94f7c7a30 fix: switch ControllerReferences over to OwnerReferences 2026-02-17 11:01:21 +01:00
34 changed files with 1637 additions and 280 deletions

View File

@@ -19,10 +19,10 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
- name: Set up Go
uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6
with:
go-version-file: go.mod
@@ -30,7 +30,7 @@ jobs:
uses: docker/setup-qemu-action@c7c53464625b32c7a7e944ae62b3e17d2b600130 # v3
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@e435ccd777264be153ace6237001ef4d979d3a7a # v6
uses: goreleaser/goreleaser-action@ec59f474b9834571250b370d4735c50f8e2d1e29 # v7
with:
distribution: goreleaser
version: v2

View File

@@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
fetch-depth: 0

View File

@@ -24,7 +24,7 @@ jobs:
run: echo "::error::Missing tag from input" && exit 1
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
- name: Check if release is draft
run: |

View File

@@ -21,7 +21,7 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
fetch-depth: 0
fetch-tags: true
@@ -31,7 +31,7 @@ jobs:
run: git checkout ${{ inputs.commit }}
- name: Set up Go
uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6
with:
go-version-file: go.mod
@@ -62,7 +62,7 @@ jobs:
echo "DOCKER_PASSWORD=${{ github.token }}" >> $GITHUB_ENV
- name: Login to container registry
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3
uses: docker/login-action@b45d80f862d83dbcd57f89517bcf500b2ab88fb2 # v4
with:
registry: ${{ env.REGISTRY }}
username: ${{ env.DOCKER_USERNAME }}
@@ -85,7 +85,7 @@ jobs:
echo "CURRENT_TAG=${CURRENT_TAG}" >> "$GITHUB_OUTPUT"
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@e435ccd777264be153ace6237001ef4d979d3a7a # v6
uses: goreleaser/goreleaser-action@ec59f474b9834571250b370d4735c50f8e2d1e29 # v7
with:
distribution: goreleaser
version: v2

View File

@@ -21,12 +21,12 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
fetch-depth: 0
fetch-tags: true
- uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
- uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6
with:
go-version-file: go.mod
@@ -131,7 +131,7 @@ jobs:
--output-dir /tmp
- name: Archive conformance logs
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
if: always()
with:
name: conformance-${{ matrix.type }}-logs

View File

@@ -21,12 +21,12 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
fetch-depth: 0
fetch-tags: true
- uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
- uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6
with:
go-version-file: go.mod
@@ -104,21 +104,21 @@ jobs:
kubectl logs -n k3k-system -l "app.kubernetes.io/name=k3k" --tail=-1 > /tmp/k3k.log
- name: Archive K3s logs
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
if: always()
with:
name: k3s-${{ matrix.type }}-logs
path: /tmp/k3s.log
- name: Archive K3k logs
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
if: always()
with:
name: k3k-${{ matrix.type }}-logs
path: /tmp/k3k.log
- name: Archive conformance logs
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
if: always()
with:
name: conformance-${{ matrix.type }}-logs

View File

@@ -16,12 +16,12 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
fetch-depth: 0
fetch-tags: true
- uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
- uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6
with:
go-version-file: go.mod
@@ -78,14 +78,14 @@ jobs:
flags: e2e
- name: Archive k3s logs
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
if: always()
with:
name: e2e-k3s-logs
path: /tmp/k3s.log
- name: Archive k3k logs
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
if: always()
with:
name: e2e-k3k-logs
@@ -95,12 +95,12 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
fetch-depth: 0
fetch-tags: true
- uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
- uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6
with:
go-version-file: go.mod
@@ -157,14 +157,14 @@ jobs:
flags: e2e
- name: Archive k3s logs
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
if: always()
with:
name: e2e-k3s-logs
path: /tmp/k3s.log
- name: Archive k3k logs
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
if: always()
with:
name: e2e-k3k-logs

View File

@@ -16,9 +16,9 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
- uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
- uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6
with:
go-version-file: go.mod
@@ -37,12 +37,12 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
fetch-depth: 0
fetch-tags: true
- uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
- uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6
with:
go-version-file: go.mod
@@ -85,14 +85,14 @@ jobs:
flags: cli
- name: Archive k3s logs
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
if: always()
with:
name: cli-k3s-logs
path: /tmp/k3s.log
- name: Archive k3k logs
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7
if: always()
with:
name: cli-k3k-logs

View File

@@ -15,10 +15,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
- name: Set up Go
uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6
with:
go-version-file: go.mod
cache: true

View File

@@ -810,6 +810,25 @@ spec:
required:
- enabled
type: object
storageClasses:
default:
enabled: false
description: StorageClasses resources sync configuration.
properties:
enabled:
default: false
description: Enabled is an on/off switch for syncing resources.
type: boolean
selector:
additionalProperties:
type: string
description: |-
Selector specifies set of labels of the resources that will be synced, if empty
then all resources of the given type will be synced.
type: object
required:
- enabled
type: object
type: object
tlsSANs:
description: TLSSANs specifies subject alternative names for the K3s
@@ -935,6 +954,164 @@ spec:
- Terminating
- Unknown
type: string
policy:
description: |-
policy represents the status of the policy applied to this cluster.
This field is set by the VirtualClusterPolicy controller.
properties:
name:
description: name is the name of the VirtualClusterPolicy currently
applied to this cluster.
minLength: 1
type: string
nodeSelector:
additionalProperties:
type: string
description: nodeSelector is a node selector enforced by the active
VirtualClusterPolicy.
type: object
priorityClass:
description: priorityClass is the priority class enforced by the
active VirtualClusterPolicy.
type: string
sync:
description: sync is the SyncConfig enforced by the active VirtualClusterPolicy.
properties:
configMaps:
default:
enabled: true
description: ConfigMaps resources sync configuration.
properties:
enabled:
default: true
description: Enabled is an on/off switch for syncing resources.
type: boolean
selector:
additionalProperties:
type: string
description: |-
Selector specifies set of labels of the resources that will be synced, if empty
then all resources of the given type will be synced.
type: object
required:
- enabled
type: object
ingresses:
default:
enabled: false
description: Ingresses resources sync configuration.
properties:
enabled:
default: false
description: Enabled is an on/off switch for syncing resources.
type: boolean
selector:
additionalProperties:
type: string
description: |-
Selector specifies set of labels of the resources that will be synced, if empty
then all resources of the given type will be synced.
type: object
required:
- enabled
type: object
persistentVolumeClaims:
default:
enabled: true
description: PersistentVolumeClaims resources sync configuration.
properties:
enabled:
default: true
description: Enabled is an on/off switch for syncing resources.
type: boolean
selector:
additionalProperties:
type: string
description: |-
Selector specifies set of labels of the resources that will be synced, if empty
then all resources of the given type will be synced.
type: object
required:
- enabled
type: object
priorityClasses:
default:
enabled: false
description: PriorityClasses resources sync configuration.
properties:
enabled:
default: false
description: Enabled is an on/off switch for syncing resources.
type: boolean
selector:
additionalProperties:
type: string
description: |-
Selector specifies set of labels of the resources that will be synced, if empty
then all resources of the given type will be synced.
type: object
required:
- enabled
type: object
secrets:
default:
enabled: true
description: Secrets resources sync configuration.
properties:
enabled:
default: true
description: Enabled is an on/off switch for syncing resources.
type: boolean
selector:
additionalProperties:
type: string
description: |-
Selector specifies set of labels of the resources that will be synced, if empty
then all resources of the given type will be synced.
type: object
type: object
services:
default:
enabled: true
description: Services resources sync configuration.
properties:
enabled:
default: true
description: Enabled is an on/off switch for syncing resources.
type: boolean
selector:
additionalProperties:
type: string
description: |-
Selector specifies set of labels of the resources that will be synced, if empty
then all resources of the given type will be synced.
type: object
required:
- enabled
type: object
storageClasses:
default:
enabled: false
description: StorageClasses resources sync configuration.
properties:
enabled:
default: false
description: Enabled is an on/off switch for syncing resources.
type: boolean
selector:
additionalProperties:
type: string
description: |-
Selector specifies set of labels of the resources that will be synced, if empty
then all resources of the given type will be synced.
type: object
required:
- enabled
type: object
type: object
required:
- name
type: object
policyName:
description: PolicyName specifies the virtual cluster policy name
bound to the virtual cluster.

View File

@@ -343,6 +343,25 @@ spec:
required:
- enabled
type: object
storageClasses:
default:
enabled: false
description: StorageClasses resources sync configuration.
properties:
enabled:
default: false
description: Enabled is an on/off switch for syncing resources.
type: boolean
selector:
additionalProperties:
type: string
description: |-
Selector specifies set of labels of the resources that will be synced, if empty
then all resources of the given type will be synced.
type: object
required:
- enabled
type: object
type: object
type: object
status:

View File

@@ -41,6 +41,30 @@ _Appears In:_
|===
[id="{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-appliedpolicy"]
=== AppliedPolicy
AppliedPolicy defines the observed state of an applied policy.
_Appears In:_
* xref:{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-clusterstatus[$$ClusterStatus$$]
[cols="25a,55a,10a,10a", options="header"]
|===
| Field | Description | Default | Validation
| *`name`* __string__ | name is the name of the VirtualClusterPolicy currently applied to this cluster. + | | MinLength: 1 +
| *`priorityClass`* __string__ | priorityClass is the priority class enforced by the active VirtualClusterPolicy. + | |
| *`nodeSelector`* __object (keys:string, values:string)__ | nodeSelector is a node selector enforced by the active VirtualClusterPolicy. + | |
| *`sync`* __xref:{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-syncconfig[$$SyncConfig$$]__ | sync is the SyncConfig enforced by the active VirtualClusterPolicy. + | |
|===
[id="{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-cluster"]
=== Cluster
@@ -194,7 +218,7 @@ Each entry defines a secret and its mount path within the pods. + | |
ConfigMapSyncConfig specifies the sync options for services.
ConfigMapSyncConfig specifies the sync options for ConfigMaps.
@@ -329,7 +353,7 @@ _Appears In:_
IngressSyncConfig specifies the sync options for services.
IngressSyncConfig specifies the sync options for Ingresses.
@@ -440,7 +464,7 @@ _Appears In:_
PersistentVolumeClaimSyncConfig specifies the sync options for services.
PersistentVolumeClaimSyncConfig specifies the sync options for PersistentVolumeClaims.
@@ -478,7 +502,7 @@ _Appears In:_
PriorityClassSyncConfig specifies the sync options for services.
PriorityClassSyncConfig specifies the sync options for PriorityClasses.
@@ -545,7 +569,7 @@ This can be 'server', 'agent', or 'all' (for both). + | | Enum: [server agent a
SecretSyncConfig specifies the sync options for services.
SecretSyncConfig specifies the sync options for Secrets.
@@ -567,7 +591,7 @@ then all resources of the given type will be synced. + | |
ServiceSyncConfig specifies the sync options for services.
ServiceSyncConfig specifies the sync options for Services.
@@ -584,6 +608,28 @@ then all resources of the given type will be synced. + | |
|===
[id="{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-storageclasssyncconfig"]
=== StorageClassSyncConfig
StorageClassSyncConfig specifies the sync options for StorageClasses.
_Appears In:_
* xref:{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-syncconfig[$$SyncConfig$$]
[cols="25a,55a,10a,10a", options="header"]
|===
| Field | Description | Default | Validation
| *`enabled`* __boolean__ | Enabled is an on/off switch for syncing resources. + | false |
| *`selector`* __object (keys:string, values:string)__ | Selector specifies set of labels of the resources that will be synced, if empty +
then all resources of the given type will be synced. + | |
|===
[id="{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-syncconfig"]
=== SyncConfig
@@ -595,6 +641,7 @@ SyncConfig will contain the resources that should be synced from virtual cluster
_Appears In:_
* xref:{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-appliedpolicy[$$AppliedPolicy$$]
* xref:{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-clusterspec[$$ClusterSpec$$]
* xref:{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-virtualclusterpolicyspec[$$VirtualClusterPolicySpec$$]
@@ -607,6 +654,7 @@ _Appears In:_
| *`ingresses`* __xref:{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-ingresssyncconfig[$$IngressSyncConfig$$]__ | Ingresses resources sync configuration. + | { enabled:false } |
| *`persistentVolumeClaims`* __xref:{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-persistentvolumeclaimsyncconfig[$$PersistentVolumeClaimSyncConfig$$]__ | PersistentVolumeClaims resources sync configuration. + | { enabled:true } |
| *`priorityClasses`* __xref:{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-priorityclasssyncconfig[$$PriorityClassSyncConfig$$]__ | PriorityClasses resources sync configuration. + | { enabled:false } |
| *`storageClasses`* __xref:{anchor_prefix}-github-com-rancher-k3k-pkg-apis-k3k-io-v1beta1-storageclasssyncconfig[$$StorageClassSyncConfig$$]__ | StorageClasses resources sync configuration. + | { enabled:false } |
|===

View File

@@ -32,6 +32,25 @@ _Appears in:_
| `secretRef` _string_ | SecretRef is the name of the Secret. | | |
#### AppliedPolicy
AppliedPolicy defines the observed state of an applied policy.
_Appears in:_
- [ClusterStatus](#clusterstatus)
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `name` _string_ | name is the name of the VirtualClusterPolicy currently applied to this cluster. | | MinLength: 1 <br /> |
| `priorityClass` _string_ | priorityClass is the priority class enforced by the active VirtualClusterPolicy. | | |
| `nodeSelector` _object (keys:string, values:string)_ | nodeSelector is a node selector enforced by the active VirtualClusterPolicy. | | |
| `sync` _[SyncConfig](#syncconfig)_ | sync is the SyncConfig enforced by the active VirtualClusterPolicy. | | |
#### Cluster
@@ -144,7 +163,7 @@ _Appears in:_
ConfigMapSyncConfig specifies the sync options for services.
ConfigMapSyncConfig specifies the sync options for ConfigMaps.
@@ -252,7 +271,7 @@ _Appears in:_
IngressSyncConfig specifies the sync options for services.
IngressSyncConfig specifies the sync options for Ingresses.
@@ -334,7 +353,7 @@ _Appears in:_
PersistentVolumeClaimSyncConfig specifies the sync options for services.
PersistentVolumeClaimSyncConfig specifies the sync options for PersistentVolumeClaims.
@@ -365,7 +384,7 @@ _Appears in:_
PriorityClassSyncConfig specifies the sync options for services.
PriorityClassSyncConfig specifies the sync options for PriorityClasses.
@@ -405,7 +424,7 @@ _Appears in:_
SecretSyncConfig specifies the sync options for services.
SecretSyncConfig specifies the sync options for Secrets.
@@ -422,7 +441,7 @@ _Appears in:_
ServiceSyncConfig specifies the sync options for services.
ServiceSyncConfig specifies the sync options for Services.
@@ -435,6 +454,23 @@ _Appears in:_
| `selector` _object (keys:string, values:string)_ | Selector specifies set of labels of the resources that will be synced, if empty<br />then all resources of the given type will be synced. | | |
#### StorageClassSyncConfig
StorageClassSyncConfig specifies the sync options for StorageClasses.
_Appears in:_
- [SyncConfig](#syncconfig)
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `enabled` _boolean_ | Enabled is an on/off switch for syncing resources. | false | |
| `selector` _object (keys:string, values:string)_ | Selector specifies set of labels of the resources that will be synced, if empty<br />then all resources of the given type will be synced. | | |
#### SyncConfig
@@ -444,6 +480,7 @@ SyncConfig will contain the resources that should be synced from virtual cluster
_Appears in:_
- [AppliedPolicy](#appliedpolicy)
- [ClusterSpec](#clusterspec)
- [VirtualClusterPolicySpec](#virtualclusterpolicyspec)
@@ -455,6 +492,7 @@ _Appears in:_
| `ingresses` _[IngressSyncConfig](#ingresssyncconfig)_ | Ingresses resources sync configuration. | \{ enabled:false \} | |
| `persistentVolumeClaims` _[PersistentVolumeClaimSyncConfig](#persistentvolumeclaimsyncconfig)_ | PersistentVolumeClaims resources sync configuration. | \{ enabled:true \} | |
| `priorityClasses` _[PriorityClassSyncConfig](#priorityclasssyncconfig)_ | PriorityClasses resources sync configuration. | \{ enabled:false \} | |
| `storageClasses` _[StorageClassSyncConfig](#storageclasssyncconfig)_ | StorageClasses resources sync configuration. | \{ enabled:false \} | |
#### VirtualClusterPolicy

View File

@@ -100,7 +100,7 @@ func (c *ConfigMapSyncer) Reconcile(ctx context.Context, req reconcile.Request)
syncedConfigMap := c.translateConfigMap(&virtualConfigMap)
if err := controllerutil.SetControllerReference(&cluster, syncedConfigMap, c.HostClient.Scheme()); err != nil {
if err := controllerutil.SetOwnerReference(&cluster, syncedConfigMap, c.HostClient.Scheme()); err != nil {
return reconcile.Result{}, err
}

View File

@@ -98,7 +98,7 @@ func (r *IngressReconciler) Reconcile(ctx context.Context, req reconcile.Request
syncedIngress := r.ingress(&virtIngress)
if err := controllerutil.SetControllerReference(&cluster, syncedIngress, r.HostClient.Scheme()); err != nil {
if err := controllerutil.SetOwnerReference(&cluster, syncedIngress, r.HostClient.Scheme()); err != nil {
return reconcile.Result{}, err
}

View File

@@ -98,7 +98,7 @@ func (r *PVCReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
}
syncedPVC := r.pvc(&virtPVC)
if err := controllerutil.SetControllerReference(&cluster, syncedPVC, r.HostClient.Scheme()); err != nil {
if err := controllerutil.SetOwnerReference(&cluster, syncedPVC, r.HostClient.Scheme()); err != nil {
return reconcile.Result{}, err
}

View File

@@ -117,7 +117,7 @@ func (r *PriorityClassSyncer) Reconcile(ctx context.Context, req reconcile.Reque
hostPriorityClass := r.translatePriorityClass(priorityClass)
if err := controllerutil.SetControllerReference(&cluster, hostPriorityClass, r.HostClient.Scheme()); err != nil {
if err := controllerutil.SetOwnerReference(&cluster, hostPriorityClass, r.HostClient.Scheme()); err != nil {
return reconcile.Result{}, err
}

View File

@@ -100,7 +100,7 @@ func (s *SecretSyncer) Reconcile(ctx context.Context, req reconcile.Request) (re
syncedSecret := s.translateSecret(&virtualSecret)
if err := controllerutil.SetControllerReference(&cluster, syncedSecret, s.HostClient.Scheme()); err != nil {
if err := controllerutil.SetOwnerReference(&cluster, syncedSecret, s.HostClient.Scheme()); err != nil {
return reconcile.Result{}, err
}

View File

@@ -76,7 +76,7 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request
syncedService := r.service(&virtService)
if err := controllerutil.SetControllerReference(&cluster, syncedService, r.HostClient.Scheme()); err != nil {
if err := controllerutil.SetOwnerReference(&cluster, syncedService, r.HostClient.Scheme()); err != nil {
return reconcile.Result{}, err
}

View File

@@ -276,7 +276,7 @@ func (k *kubelet) newProviderFunc(cfg config) nodeutil.NewProviderFunc {
cfg.AgentHostname,
k.port,
k.agentIP,
utilProvider.HostClient,
k.hostMgr,
utilProvider.VirtualClient,
k.virtualCluster,
cfg.Version,

View File

@@ -6,6 +6,7 @@ import (
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -13,13 +14,13 @@ import (
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
)
func ConfigureNode(logger logr.Logger, node *corev1.Node, hostname string, servicePort int, ip string, hostClient client.Client, virtualClient client.Client, virtualCluster v1beta1.Cluster, version string, mirrorHostNodes bool) {
func ConfigureNode(logger logr.Logger, node *corev1.Node, hostname string, servicePort int, ip string, hostMgr manager.Manager, virtualClient client.Client, virtualCluster v1beta1.Cluster, version string, mirrorHostNodes bool) {
ctx := context.Background()
if mirrorHostNodes {
var hostNode corev1.Node
if err := hostClient.Get(ctx, types.NamespacedName{Name: node.Name}, &hostNode); err != nil {
logger.Error(err, "error getting host node for mirroring", err)
if err := hostMgr.GetAPIReader().Get(ctx, types.NamespacedName{Name: node.Name}, &hostNode); err != nil {
logger.Error(err, "error getting host node for mirroring")
}
node.Spec = *hostNode.Spec.DeepCopy()
@@ -48,7 +49,7 @@ func ConfigureNode(logger logr.Logger, node *corev1.Node, hostname string, servi
// configure versions
node.Status.NodeInfo.KubeletVersion = version
startNodeCapacityUpdater(ctx, logger, hostClient, virtualClient, virtualCluster, node.Name)
startNodeCapacityUpdater(ctx, logger, hostMgr.GetClient(), virtualClient, virtualCluster, node.Name)
}
}

View File

@@ -2,6 +2,7 @@ package provider
import (
"context"
"maps"
"sort"
"time"
@@ -83,11 +84,30 @@ func updateNodeCapacity(ctx context.Context, logger logr.Logger, hostClient clie
mergedResourceLists := mergeResourceLists(resourceLists...)
m, err := distributeQuotas(ctx, logger, virtualClient, mergedResourceLists)
if err != nil {
logger.Error(err, "error distributing policy quota")
var virtualNodeList, hostNodeList corev1.NodeList
if err := virtualClient.List(ctx, &virtualNodeList); err != nil {
logger.Error(err, "error listing virtual nodes for stable capacity distribution")
}
virtResourceMap := make(map[string]corev1.ResourceList)
for _, vNode := range virtualNodeList.Items {
virtResourceMap[vNode.Name] = corev1.ResourceList{}
}
if err := hostClient.List(ctx, &hostNodeList); err != nil {
logger.Error(err, "error listing host nodes for stable capacity distribution")
}
hostResourceMap := make(map[string]corev1.ResourceList)
for _, hNode := range hostNodeList.Items {
if _, ok := virtResourceMap[hNode.Name]; ok {
hostResourceMap[hNode.Name] = hNode.Status.Allocatable
}
}
m := distributeQuotas(hostResourceMap, virtResourceMap, mergedResourceLists)
allocatable = m[virtualNodeName]
}
@@ -125,76 +145,99 @@ func mergeResourceLists(resourceLists ...corev1.ResourceList) corev1.ResourceLis
return merged
}
// distributeQuotas divides the total resource quotas evenly among all active virtual nodes.
// This ensures that each virtual node reports a fair share of the available resources,
// preventing the scheduler from overloading a single node.
// distributeQuotas divides the total resource quotas among all active virtual nodes,
// capped by each node's actual host capacity. This ensures that each virtual node
// reports a fair share of the available resources without exceeding what its
// underlying host node can provide.
//
// The algorithm iterates over each resource, divides it as evenly as possible among the
// sorted virtual nodes, and distributes any remainder to the first few nodes to ensure
// all resources are allocated. Sorting the nodes by name guarantees a deterministic
// distribution.
func distributeQuotas(ctx context.Context, logger logr.Logger, virtualClient client.Client, quotas corev1.ResourceList) (map[string]corev1.ResourceList, error) {
// List all virtual nodes to distribute the quota stably.
var virtualNodeList corev1.NodeList
if err := virtualClient.List(ctx, &virtualNodeList); err != nil {
logger.Error(err, "error listing virtual nodes for stable capacity distribution, falling back to full quota")
return nil, err
}
// If there are no virtual nodes, there's nothing to distribute.
numNodes := int64(len(virtualNodeList.Items))
if numNodes == 0 {
logger.Info("error listing virtual nodes for stable capacity distribution, falling back to full quota")
return nil, nil
}
// Sort nodes by name for a deterministic distribution of resources.
sort.Slice(virtualNodeList.Items, func(i, j int) bool {
return virtualNodeList.Items[i].Name < virtualNodeList.Items[j].Name
})
// Initialize the resource map for each virtual node.
resourceMap := make(map[string]corev1.ResourceList)
for _, virtualNode := range virtualNodeList.Items {
resourceMap[virtualNode.Name] = corev1.ResourceList{}
}
// For each resource type the algorithm uses a multi-pass redistribution loop:
// 1. Divide the remaining quota evenly among eligible nodes (sorted by name for
// determinism), assigning any integer remainder to the first nodes alphabetically.
// 2. Cap each node's share at its host allocatable capacity.
// 3. Remove nodes that have reached their host capacity.
// 4. If there is still unallocated quota (because some nodes were capped below their
// even share), repeat from step 1 with the remaining quota and remaining nodes.
//
// The loop terminates when the quota is fully distributed or no eligible nodes remain.
func distributeQuotas(hostResourceMap, virtResourceMap map[string]corev1.ResourceList, quotas corev1.ResourceList) map[string]corev1.ResourceList {
resourceMap := make(map[string]corev1.ResourceList, len(virtResourceMap))
maps.Copy(resourceMap, virtResourceMap)
// Distribute each resource type from the policy's hard quota
for resourceName, totalQuantity := range quotas {
// Use MilliValue for precise division, especially for resources like CPU,
// which are often expressed in milli-units. Otherwise, use the standard Value().
var totalValue int64
if _, found := milliScaleResources[resourceName]; found {
totalValue = totalQuantity.MilliValue()
} else {
totalValue = totalQuantity.Value()
_, useMilli := milliScaleResources[resourceName]
// eligible nodes for each distribution cycle
var eligibleNodes []string
hostCap := make(map[string]int64)
// Populate the host nodes capacity map and the initial effective nodes
for vn := range virtResourceMap {
hostNodeResources := hostResourceMap[vn]
if hostNodeResources == nil {
continue
}
resourceQuantity, found := hostNodeResources[resourceName]
if !found {
// skip the node if the resource does not exist on the host node
continue
}
hostCap[vn] = resourceQuantity.Value()
if useMilli {
hostCap[vn] = resourceQuantity.MilliValue()
}
eligibleNodes = append(eligibleNodes, vn)
}
// Calculate the base quantity of the resource to be allocated per node.
// and the remainder that needs to be distributed among the nodes.
//
// For example, if totalValue is 2000 (e.g., 2 CPU) and there are 3 nodes:
// - quantityPerNode would be 666 (2000 / 3)
// - remainder would be 2 (2000 % 3)
// The first two nodes would get 667 (666 + 1), and the last one would get 666.
quantityPerNode := totalValue / numNodes
remainder := totalValue % numNodes
sort.Strings(eligibleNodes)
// Iterate through the sorted virtual nodes to distribute the resource.
for _, virtualNode := range virtualNodeList.Items {
nodeQuantity := quantityPerNode
if remainder > 0 {
nodeQuantity++
remainder--
totalValue := totalQuantity.Value()
if useMilli {
totalValue = totalQuantity.MilliValue()
}
// Start of the distribution cycle, each cycle will distribute the quota resource
// evenly between nodes, each node can not exceed the corresponding host node capacity
for totalValue > 0 && len(eligibleNodes) > 0 {
nodeNum := int64(len(eligibleNodes))
quantityPerNode := totalValue / nodeNum
remainder := totalValue % nodeNum
remainingNodes := []string{}
for _, virtualNodeName := range eligibleNodes {
nodeQuantity := quantityPerNode
if remainder > 0 {
nodeQuantity++
remainder--
}
// We cap the quantity to the hostNode capacity
nodeQuantity = min(nodeQuantity, hostCap[virtualNodeName])
if nodeQuantity > 0 {
existing := resourceMap[virtualNodeName][resourceName]
if useMilli {
resourceMap[virtualNodeName][resourceName] = *resource.NewMilliQuantity(existing.MilliValue()+nodeQuantity, totalQuantity.Format)
} else {
resourceMap[virtualNodeName][resourceName] = *resource.NewQuantity(existing.Value()+nodeQuantity, totalQuantity.Format)
}
}
totalValue -= nodeQuantity
hostCap[virtualNodeName] -= nodeQuantity
if hostCap[virtualNodeName] > 0 {
remainingNodes = append(remainingNodes, virtualNodeName)
}
}
if _, found := milliScaleResources[resourceName]; found {
resourceMap[virtualNode.Name][resourceName] = *resource.NewMilliQuantity(nodeQuantity, totalQuantity.Format)
} else {
resourceMap[virtualNode.Name][resourceName] = *resource.NewQuantity(nodeQuantity, totalQuantity.Format)
}
eligibleNodes = remainingNodes
}
}
return resourceMap, nil
return resourceMap
}

View File

@@ -1,19 +1,13 @@
package provider
import (
"context"
"testing"
"github.com/go-logr/zapr"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func Test_distributeQuotas(t *testing.T) {
@@ -21,39 +15,56 @@ func Test_distributeQuotas(t *testing.T) {
err := corev1.AddToScheme(scheme)
assert.NoError(t, err)
node1 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}
node2 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2"}}
node3 := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-3"}}
// Large allocatable so capping doesn't interfere with basic distribution tests.
largeAllocatable := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100"),
corev1.ResourceMemory: resource.MustParse("100Gi"),
corev1.ResourcePods: resource.MustParse("1000"),
}
tests := []struct {
name string
virtualNodes []client.Object
quotas corev1.ResourceList
want map[string]corev1.ResourceList
wantErr bool
name string
virtResourceMap map[string]corev1.ResourceList
hostResourceMap map[string]corev1.ResourceList
quotas corev1.ResourceList
want map[string]corev1.ResourceList
}{
{
name: "no virtual nodes",
virtualNodes: []client.Object{},
name: "no virtual nodes",
virtResourceMap: map[string]corev1.ResourceList{},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
},
want: map[string]corev1.ResourceList{},
wantErr: false,
want: map[string]corev1.ResourceList{},
},
{
name: "no quotas",
virtualNodes: []client.Object{node1, node2},
quotas: corev1.ResourceList{},
name: "no quotas",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": largeAllocatable,
"node-2": largeAllocatable,
},
quotas: corev1.ResourceList{},
want: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
},
wantErr: false,
},
{
name: "even distribution of cpu and memory",
virtualNodes: []client.Object{node1, node2},
name: "fewer virtual nodes than host nodes",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": largeAllocatable,
"node-2": largeAllocatable,
"node-3": largeAllocatable,
"node-4": largeAllocatable,
},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("4Gi"),
@@ -68,65 +79,203 @@ func Test_distributeQuotas(t *testing.T) {
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
},
wantErr: false,
},
{
name: "uneven distribution with remainder",
virtualNodes: []client.Object{node1, node2, node3},
name: "even distribution of cpu and memory",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": largeAllocatable,
"node-2": largeAllocatable,
},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"), // 2000m / 3 = 666m with 2m remainder
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("4Gi"),
},
want: map[string]corev1.ResourceList{
"node-1": {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
"node-2": {
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
},
},
{
name: "uneven distribution with remainder",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
"node-3": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": largeAllocatable,
"node-2": largeAllocatable,
"node-3": largeAllocatable,
},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
},
want: map[string]corev1.ResourceList{
"node-1": {corev1.ResourceCPU: resource.MustParse("667m")},
"node-2": {corev1.ResourceCPU: resource.MustParse("667m")},
"node-3": {corev1.ResourceCPU: resource.MustParse("666m")},
},
wantErr: false,
},
{
name: "distribution of number resources",
virtualNodes: []client.Object{node1, node2, node3},
name: "distribution of number resources",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
"node-3": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": largeAllocatable,
"node-2": largeAllocatable,
"node-3": largeAllocatable,
},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourcePods: resource.MustParse("11"),
corev1.ResourceSecrets: resource.MustParse("9"),
"custom": resource.MustParse("8"),
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourcePods: resource.MustParse("11"),
},
want: map[string]corev1.ResourceList{
"node-1": {
corev1.ResourceCPU: resource.MustParse("667m"),
corev1.ResourcePods: resource.MustParse("4"),
corev1.ResourceSecrets: resource.MustParse("3"),
"custom": resource.MustParse("3"),
corev1.ResourceCPU: resource.MustParse("667m"),
corev1.ResourcePods: resource.MustParse("4"),
},
"node-2": {
corev1.ResourceCPU: resource.MustParse("667m"),
corev1.ResourcePods: resource.MustParse("4"),
corev1.ResourceSecrets: resource.MustParse("3"),
"custom": resource.MustParse("3"),
corev1.ResourceCPU: resource.MustParse("667m"),
corev1.ResourcePods: resource.MustParse("4"),
},
"node-3": {
corev1.ResourceCPU: resource.MustParse("666m"),
corev1.ResourcePods: resource.MustParse("3"),
corev1.ResourceSecrets: resource.MustParse("3"),
"custom": resource.MustParse("2"),
corev1.ResourceCPU: resource.MustParse("666m"),
corev1.ResourcePods: resource.MustParse("3"),
},
},
wantErr: false,
},
{
name: "extended resource distributed only to nodes that have it",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
"node-3": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": {
corev1.ResourceCPU: resource.MustParse("100"),
"nvidia.com/gpu": resource.MustParse("2"),
},
"node-2": {
corev1.ResourceCPU: resource.MustParse("100"),
},
"node-3": {
corev1.ResourceCPU: resource.MustParse("100"),
"nvidia.com/gpu": resource.MustParse("4"),
},
},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("3"),
"nvidia.com/gpu": resource.MustParse("4"),
},
want: map[string]corev1.ResourceList{
"node-1": {
corev1.ResourceCPU: resource.MustParse("1"),
"nvidia.com/gpu": resource.MustParse("2"),
},
"node-2": {
corev1.ResourceCPU: resource.MustParse("1"),
},
"node-3": {
corev1.ResourceCPU: resource.MustParse("1"),
"nvidia.com/gpu": resource.MustParse("2"),
},
},
},
{
name: "capping at host capacity with redistribution",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": {
corev1.ResourceCPU: resource.MustParse("8"),
},
"node-2": {
corev1.ResourceCPU: resource.MustParse("2"),
},
},
quotas: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
},
// Even split would be 3 each, but node-2 only has 2 CPU.
// node-2 gets capped at 2, the remaining 1 goes to node-1.
want: map[string]corev1.ResourceList{
"node-1": {corev1.ResourceCPU: resource.MustParse("4")},
"node-2": {corev1.ResourceCPU: resource.MustParse("2")},
},
},
{
name: "gpu capping with uneven host capacity",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": {
"nvidia.com/gpu": resource.MustParse("6"),
},
"node-2": {
"nvidia.com/gpu": resource.MustParse("1"),
},
},
quotas: corev1.ResourceList{
"nvidia.com/gpu": resource.MustParse("4"),
},
// Even split would be 2 each, but node-2 only has 1 GPU.
// node-2 gets capped at 1, the remaining 1 goes to node-1.
want: map[string]corev1.ResourceList{
"node-1": {"nvidia.com/gpu": resource.MustParse("3")},
"node-2": {"nvidia.com/gpu": resource.MustParse("1")},
},
},
{
name: "quota exceeds total host capacity",
virtResourceMap: map[string]corev1.ResourceList{
"node-1": {},
"node-2": {},
"node-3": {},
},
hostResourceMap: map[string]corev1.ResourceList{
"node-1": {
"nvidia.com/gpu": resource.MustParse("2"),
},
"node-2": {
"nvidia.com/gpu": resource.MustParse("1"),
},
"node-3": {
"nvidia.com/gpu": resource.MustParse("1"),
},
},
quotas: corev1.ResourceList{
"nvidia.com/gpu": resource.MustParse("10"),
},
// Total host capacity is 4, quota is 10. Each node gets its full capacity.
want: map[string]corev1.ResourceList{
"node-1": {"nvidia.com/gpu": resource.MustParse("2")},
"node-2": {"nvidia.com/gpu": resource.MustParse("1")},
"node-3": {"nvidia.com/gpu": resource.MustParse("1")},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(tt.virtualNodes...).Build()
logger := zapr.NewLogger(zap.NewNop())
got, gotErr := distributeQuotas(context.Background(), logger, fakeClient, tt.quotas)
if tt.wantErr {
assert.Error(t, gotErr)
} else {
assert.NoError(t, gotErr)
}
got := distributeQuotas(tt.hostResourceMap, tt.virtResourceMap, tt.quotas)
assert.Equal(t, len(tt.want), len(got), "Number of nodes in result should match")

View File

@@ -401,28 +401,45 @@ func (p *Provider) createPod(ctx context.Context, pod *corev1.Pod) error {
// Schedule the host pod in the same host node of the virtual kubelet
hostPod.Spec.NodeName = p.agentHostname
// The pod's own nodeSelector is ignored.
// The final selector is determined by the cluster spec, but overridden by a policy if present.
hostPod.Spec.NodeSelector = cluster.Spec.NodeSelector
if cluster.Status.Policy != nil && len(cluster.Status.Policy.NodeSelector) > 0 {
hostPod.Spec.NodeSelector = cluster.Status.Policy.NodeSelector
}
// setting the hostname for the pod if its not set
if virtualPod.Spec.Hostname == "" {
hostPod.Spec.Hostname = k3kcontroller.SafeConcatName(virtualPod.Name)
}
// if the priorityClass for the virtual cluster is set then override the provided value
// When a PriorityClass is set we will use the translated one in the HostCluster.
// If the Cluster or a Policy defines a PriorityClass of the host we are going to use that one.
// Note: the core-dns and local-path-provisioner pod are scheduled by k3s with the
// 'system-cluster-critical' and 'system-node-critical' default priority classes.
if !strings.HasPrefix(hostPod.Spec.PriorityClassName, "system-") {
if hostPod.Spec.PriorityClassName != "" {
tPriorityClassName := p.Translator.TranslateName("", hostPod.Spec.PriorityClassName)
hostPod.Spec.PriorityClassName = tPriorityClassName
//
// TODO: we probably need to define a custom "intermediate" k3k-system-* priority
if strings.HasPrefix(virtualPod.Spec.PriorityClassName, "system-") {
hostPod.Spec.PriorityClassName = virtualPod.Spec.PriorityClassName
} else {
enforcedPriorityClassName := cluster.Spec.PriorityClass
if cluster.Status.Policy != nil && cluster.Status.Policy.PriorityClass != nil {
enforcedPriorityClassName = *cluster.Status.Policy.PriorityClass
}
if cluster.Spec.PriorityClass != "" {
hostPod.Spec.PriorityClassName = cluster.Spec.PriorityClass
if enforcedPriorityClassName != "" {
hostPod.Spec.PriorityClassName = enforcedPriorityClassName
} else if virtualPod.Spec.PriorityClassName != "" {
hostPod.Spec.PriorityClassName = p.Translator.TranslateName("", virtualPod.Spec.PriorityClassName)
hostPod.Spec.Priority = nil
}
}
// if the priority class is set we need to remove the priority
if hostPod.Spec.PriorityClassName != "" {
hostPod.Spec.Priority = nil
}
p.configurePodEnvs(hostPod, &virtualPod)
// fieldpath annotations

View File

@@ -249,9 +249,14 @@ type SyncConfig struct {
// +kubebuilder:default={"enabled": false}
// +optional
PriorityClasses PriorityClassSyncConfig `json:"priorityClasses"`
// StorageClasses resources sync configuration.
//
// +kubebuilder:default={"enabled": false}
// +optional
StorageClasses StorageClassSyncConfig `json:"storageClasses"`
}
// SecretSyncConfig specifies the sync options for services.
// SecretSyncConfig specifies the sync options for Secrets.
type SecretSyncConfig struct {
// Enabled is an on/off switch for syncing resources.
//
@@ -266,7 +271,7 @@ type SecretSyncConfig struct {
Selector map[string]string `json:"selector,omitempty"`
}
// ServiceSyncConfig specifies the sync options for services.
// ServiceSyncConfig specifies the sync options for Services.
type ServiceSyncConfig struct {
// Enabled is an on/off switch for syncing resources.
//
@@ -281,7 +286,7 @@ type ServiceSyncConfig struct {
Selector map[string]string `json:"selector,omitempty"`
}
// ConfigMapSyncConfig specifies the sync options for services.
// ConfigMapSyncConfig specifies the sync options for ConfigMaps.
type ConfigMapSyncConfig struct {
// Enabled is an on/off switch for syncing resources.
//
@@ -296,7 +301,7 @@ type ConfigMapSyncConfig struct {
Selector map[string]string `json:"selector,omitempty"`
}
// IngressSyncConfig specifies the sync options for services.
// IngressSyncConfig specifies the sync options for Ingresses.
type IngressSyncConfig struct {
// Enabled is an on/off switch for syncing resources.
//
@@ -311,7 +316,7 @@ type IngressSyncConfig struct {
Selector map[string]string `json:"selector,omitempty"`
}
// PersistentVolumeClaimSyncConfig specifies the sync options for services.
// PersistentVolumeClaimSyncConfig specifies the sync options for PersistentVolumeClaims.
type PersistentVolumeClaimSyncConfig struct {
// Enabled is an on/off switch for syncing resources.
//
@@ -326,7 +331,7 @@ type PersistentVolumeClaimSyncConfig struct {
Selector map[string]string `json:"selector,omitempty"`
}
// PriorityClassSyncConfig specifies the sync options for services.
// PriorityClassSyncConfig specifies the sync options for PriorityClasses.
type PriorityClassSyncConfig struct {
// Enabled is an on/off switch for syncing resources.
//
@@ -341,6 +346,21 @@ type PriorityClassSyncConfig struct {
Selector map[string]string `json:"selector,omitempty"`
}
// StorageClassSyncConfig specifies the sync options for StorageClasses.
type StorageClassSyncConfig struct {
// Enabled is an on/off switch for syncing resources.
//
// +kubebuilder:default=false
// +required
Enabled bool `json:"enabled"`
// Selector specifies set of labels of the resources that will be synced, if empty
// then all resources of the given type will be synced.
//
// +optional
Selector map[string]string `json:"selector,omitempty"`
}
// ClusterMode is the possible provisioning mode of a Cluster.
//
// +kubebuilder:validation:Enum=shared;virtual
@@ -538,6 +558,12 @@ type ClusterStatus struct {
// +optional
PolicyName string `json:"policyName,omitempty"`
// policy represents the status of the policy applied to this cluster.
// This field is set by the VirtualClusterPolicy controller.
//
// +optional
Policy *AppliedPolicy `json:"policy,omitempty"`
// KubeletPort specefies the port used by k3k-kubelet in shared mode.
//
// +optional
@@ -561,6 +587,30 @@ type ClusterStatus struct {
Phase ClusterPhase `json:"phase,omitempty"`
}
// AppliedPolicy defines the observed state of an applied policy.
type AppliedPolicy struct {
// name is the name of the VirtualClusterPolicy currently applied to this cluster.
//
// +kubebuilder:validation:MinLength:=1
// +required
Name string `json:"name,omitempty"`
// priorityClass is the priority class enforced by the active VirtualClusterPolicy.
//
// +optional
PriorityClass *string `json:"priorityClass,omitempty"`
// nodeSelector is a node selector enforced by the active VirtualClusterPolicy.
//
// +optional
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
// sync is the SyncConfig enforced by the active VirtualClusterPolicy.
//
// +optional
Sync *SyncConfig `json:"sync,omitempty"`
}
// ClusterPhase is a high-level summary of the cluster's current lifecycle state.
type ClusterPhase string

View File

@@ -25,6 +25,38 @@ func (in *Addon) DeepCopy() *Addon {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *AppliedPolicy) DeepCopyInto(out *AppliedPolicy) {
*out = *in
if in.PriorityClass != nil {
in, out := &in.PriorityClass, &out.PriorityClass
*out = new(string)
**out = **in
}
if in.NodeSelector != nil {
in, out := &in.NodeSelector, &out.NodeSelector
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
if in.Sync != nil {
in, out := &in.Sync, &out.Sync
*out = new(SyncConfig)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AppliedPolicy.
func (in *AppliedPolicy) DeepCopy() *AppliedPolicy {
if in == nil {
return nil
}
out := new(AppliedPolicy)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Cluster) DeepCopyInto(out *Cluster) {
*out = *in
@@ -200,6 +232,11 @@ func (in *ClusterStatus) DeepCopyInto(out *ClusterStatus) {
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.Policy != nil {
in, out := &in.Policy, &out.Policy
*out = new(AppliedPolicy)
(*in).DeepCopyInto(*out)
}
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make([]metav1.Condition, len(*in))
@@ -546,6 +583,28 @@ func (in *ServiceSyncConfig) DeepCopy() *ServiceSyncConfig {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *StorageClassSyncConfig) DeepCopyInto(out *StorageClassSyncConfig) {
*out = *in
if in.Selector != nil {
in, out := &in.Selector, &out.Selector
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StorageClassSyncConfig.
func (in *StorageClassSyncConfig) DeepCopy() *StorageClassSyncConfig {
if in == nil {
return nil
}
out := new(StorageClassSyncConfig)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SyncConfig) DeepCopyInto(out *SyncConfig) {
*out = *in
@@ -555,6 +614,7 @@ func (in *SyncConfig) DeepCopyInto(out *SyncConfig) {
in.Ingresses.DeepCopyInto(&out.Ingresses)
in.PersistentVolumeClaims.DeepCopyInto(&out.PersistentVolumeClaims)
in.PriorityClasses.DeepCopyInto(&out.PriorityClasses)
in.StorageClasses.DeepCopyInto(&out.StorageClasses)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SyncConfig.

View File

@@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
@@ -28,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
@@ -47,11 +49,18 @@ const (
clusterFinalizerName = "cluster.k3k.io/finalizer"
ClusterInvalidName = "system"
SyncEnabledLabelKey = "k3k.io/sync-enabled"
SyncSourceLabelKey = "k3k.io/sync-source"
SyncSourceHostLabel = "host"
defaultVirtualClusterCIDR = "10.52.0.0/16"
defaultVirtualServiceCIDR = "10.53.0.0/16"
defaultSharedClusterCIDR = "10.42.0.0/16"
defaultSharedServiceCIDR = "10.43.0.0/16"
memberRemovalTimeout = time.Minute * 1
storageClassEnabledIndexField = "spec.sync.storageClasses.enabled"
storageClassStatusEnabledIndexField = "status.policy.sync.storageClasses.enabled"
)
var (
@@ -115,15 +124,82 @@ func Add(ctx context.Context, mgr manager.Manager, config *Config, maxConcurrent
},
}
// index the 'spec.sync.storageClasses.enabled' field
err = mgr.GetCache().IndexField(ctx, &v1beta1.Cluster{}, storageClassEnabledIndexField, func(rawObj client.Object) []string {
vc := rawObj.(*v1beta1.Cluster)
if vc.Spec.Sync != nil && vc.Spec.Sync.StorageClasses.Enabled {
return []string{"true"}
}
return []string{"false"}
})
if err != nil {
return err
}
// index the 'status.policy.sync.storageClasses.enabled' field
err = mgr.GetCache().IndexField(ctx, &v1beta1.Cluster{}, storageClassStatusEnabledIndexField, func(rawObj client.Object) []string {
vc := rawObj.(*v1beta1.Cluster)
if vc.Status.Policy != nil && vc.Status.Policy.Sync != nil && vc.Status.Policy.Sync.StorageClasses.Enabled {
return []string{"true"}
}
return []string{"false"}
})
if err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.Cluster{}).
Watches(&v1.Namespace{}, namespaceEventHandler(&reconciler)).
Watches(&storagev1.StorageClass{},
handler.EnqueueRequestsFromMapFunc(reconciler.mapStorageClassToCluster),
).
Owns(&apps.StatefulSet{}).
Owns(&v1.Service{}).
WithOptions(ctrlcontroller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}).
Complete(&reconciler)
}
func (r *ClusterReconciler) mapStorageClassToCluster(ctx context.Context, obj client.Object) []reconcile.Request {
log := ctrl.LoggerFrom(ctx)
if _, ok := obj.(*storagev1.StorageClass); !ok {
return nil
}
// Merge and deduplicate clusters
allClusters := make(map[types.NamespacedName]struct{})
var specClusterList v1beta1.ClusterList
if err := r.Client.List(ctx, &specClusterList, client.MatchingFields{storageClassEnabledIndexField: "true"}); err != nil {
log.Error(err, "error listing clusters with spec sync enabled for storageclass sync")
} else {
for _, cluster := range specClusterList.Items {
allClusters[client.ObjectKeyFromObject(&cluster)] = struct{}{}
}
}
var statusClusterList v1beta1.ClusterList
if err := r.Client.List(ctx, &statusClusterList, client.MatchingFields{storageClassStatusEnabledIndexField: "true"}); err != nil {
log.Error(err, "error listing clusters with status sync enabled for storageclass sync")
} else {
for _, cluster := range statusClusterList.Items {
allClusters[client.ObjectKeyFromObject(&cluster)] = struct{}{}
}
}
requests := make([]reconcile.Request, 0, len(allClusters))
for key := range allClusters {
requests = append(requests, reconcile.Request{NamespacedName: key})
}
return requests
}
func namespaceEventHandler(r *ClusterReconciler) handler.Funcs {
return handler.Funcs{
// We don't need to update for create or delete events
@@ -350,11 +426,22 @@ func (c *ClusterReconciler) reconcile(ctx context.Context, cluster *v1beta1.Clus
return err
}
if err := c.bindClusterRoles(ctx, cluster); err != nil {
return err
}
if err := c.ensureKubeconfigSecret(ctx, cluster, serviceIP, 443); err != nil {
return err
}
return c.bindClusterRoles(ctx, cluster)
// Important: if you need to call the Server API of the Virtual Cluster
// this needs to be done AFTER he kubeconfig has been generated
if err := c.ensureStorageClasses(ctx, cluster); err != nil {
return err
}
return nil
}
// ensureBootstrapSecret will create or update the Secret containing the bootstrap data from the k3s server
@@ -620,6 +707,120 @@ func (c *ClusterReconciler) ensureIngress(ctx context.Context, cluster *v1beta1.
return nil
}
func (c *ClusterReconciler) ensureStorageClasses(ctx context.Context, cluster *v1beta1.Cluster) error {
log := ctrl.LoggerFrom(ctx)
log.V(1).Info("Ensuring cluster StorageClasses")
virtualClient, err := newVirtualClient(ctx, c.Client, cluster.Name, cluster.Namespace)
if err != nil {
return fmt.Errorf("failed creating virtual client: %w", err)
}
appliedSync := cluster.Spec.Sync.DeepCopy()
// If a policy is applied to the virtual cluster we need to use its SyncConfig, if available
if cluster.Status.Policy != nil && cluster.Status.Policy.Sync != nil {
appliedSync = cluster.Status.Policy.Sync
}
// If storageclass sync is disabled, clean up any managed storage classes.
if appliedSync == nil || !appliedSync.StorageClasses.Enabled {
err := virtualClient.DeleteAllOf(ctx, &storagev1.StorageClass{}, client.MatchingLabels{SyncSourceLabelKey: SyncSourceHostLabel})
return client.IgnoreNotFound(err)
}
var hostStorageClasses storagev1.StorageClassList
if err := c.Client.List(ctx, &hostStorageClasses); err != nil {
return fmt.Errorf("failed listing host storageclasses: %w", err)
}
// filter the StorageClasses disabled for the sync, and the one not matching the selector
filteredHostStorageClasses := make(map[string]storagev1.StorageClass)
for _, sc := range hostStorageClasses.Items {
syncEnabled, found := sc.Labels[SyncEnabledLabelKey]
// if sync is disabled -> continue
if found && syncEnabled != "true" {
log.V(1).Info("sync is disabled", "sc-name", sc.Name)
continue
}
// if selector doesn't match -> continue
// an empty selector matche everything
selector := labels.SelectorFromSet(appliedSync.StorageClasses.Selector)
if !selector.Matches(labels.Set(sc.Labels)) {
log.V(1).Info("selector not matching", "sc-name", sc.Name)
continue
}
log.V(1).Info("keeping storageclass", "sc-name", sc.Name)
filteredHostStorageClasses[sc.Name] = sc
}
var virtStorageClasses storagev1.StorageClassList
if err = virtualClient.List(ctx, &virtStorageClasses, client.MatchingLabels{SyncSourceLabelKey: SyncSourceHostLabel}); err != nil {
return fmt.Errorf("failed listing virtual storageclasses: %w", err)
}
// delete StorageClasses with the sync disabled
for _, sc := range virtStorageClasses.Items {
if _, found := filteredHostStorageClasses[sc.Name]; !found {
log.V(1).Info("deleting storageclass", "sc-name", sc.Name)
if errDelete := virtualClient.Delete(ctx, &sc); errDelete != nil {
log.Error(errDelete, "failed to delete virtual storageclass", "name", sc.Name)
err = errors.Join(err, errDelete)
}
}
}
for _, hostSc := range filteredHostStorageClasses {
log.V(1).Info("updating storageclass", "sc-name", hostSc.Name)
virtualSc := hostSc.DeepCopy()
virtualSc.ObjectMeta = metav1.ObjectMeta{
Name: hostSc.Name,
Labels: hostSc.Labels,
Annotations: hostSc.Annotations,
}
_, errCreateOrUpdate := controllerutil.CreateOrUpdate(ctx, virtualClient, virtualSc, func() error {
virtualSc.Annotations = hostSc.Annotations
virtualSc.Labels = hostSc.Labels
if len(virtualSc.Labels) == 0 {
virtualSc.Labels = make(map[string]string)
}
virtualSc.Labels[SyncSourceLabelKey] = SyncSourceHostLabel
virtualSc.Provisioner = hostSc.Provisioner
virtualSc.Parameters = hostSc.Parameters
virtualSc.ReclaimPolicy = hostSc.ReclaimPolicy
virtualSc.MountOptions = hostSc.MountOptions
virtualSc.AllowVolumeExpansion = hostSc.AllowVolumeExpansion
virtualSc.VolumeBindingMode = hostSc.VolumeBindingMode
virtualSc.AllowedTopologies = hostSc.AllowedTopologies
return nil
})
if errCreateOrUpdate != nil {
log.Error(errCreateOrUpdate, "failed to create or update virtual storageclass", "name", virtualSc.Name)
err = errors.Join(err, errCreateOrUpdate)
}
}
if err != nil {
return fmt.Errorf("failed to sync storageclasses: %w", err)
}
return nil
}
func (c *ClusterReconciler) server(ctx context.Context, cluster *v1beta1.Cluster, server *server.Server) error {
log := ctrl.LoggerFrom(ctx)
@@ -742,11 +943,6 @@ func (c *ClusterReconciler) validate(cluster *v1beta1.Cluster, policy v1beta1.Vi
}
}
// validate sync policy
if !equality.Semantic.DeepEqual(cluster.Spec.Sync, policy.Spec.Sync) {
return fmt.Errorf("sync configuration %v is not allowed by the policy %q", cluster.Spec.Sync, policy.Name)
}
return nil
}

View File

@@ -2,13 +2,17 @@ package policy
import (
"context"
"errors"
"fmt"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
@@ -52,15 +56,36 @@ func (c *VirtualClusterPolicyReconciler) cleanupNamespaces(ctx context.Context)
}
for _, ns := range namespaces.Items {
selector := labels.NewSelector()
currentPolicyName := ns.Labels[PolicyNameLabelKey]
if req, err := labels.NewRequirement(ManagedByLabelKey, selection.Equals, []string{VirtualPolicyControllerName}); err == nil {
selector = selector.Add(*req)
}
// This will match all the resources managed by the K3k Policy controller
// that have the app.kubernetes.io/managed-by=k3k-policy-controller label
selector := labels.SelectorFromSet(labels.Set{
ManagedByLabelKey: VirtualPolicyControllerName,
})
// if the namespace is bound to a policy -> cleanup resources of other policies
if ns.Labels[PolicyNameLabelKey] != "" {
requirement, err := labels.NewRequirement(PolicyNameLabelKey, selection.NotEquals, []string{ns.Labels[PolicyNameLabelKey]})
// If the namespace is not bound to any policy, or if the policy it was bound to no longer exists,
// we need to clear policy-related fields on its Cluster objects.
if currentPolicyName == "" {
if err := c.clearPolicyFieldsForClustersInNamespace(ctx, ns.Name); err != nil {
log.Error(err, "error clearing policy fields for clusters in unbound namespace", "namespace", ns.Name)
}
} else {
var policy v1beta1.VirtualClusterPolicy
if err := c.Client.Get(ctx, types.NamespacedName{Name: currentPolicyName}, &policy); err != nil {
if apierrors.IsNotFound(err) {
if err := c.clearPolicyFieldsForClustersInNamespace(ctx, ns.Name); err != nil {
log.Error(err, "error clearing policy fields for clusters in namespace with non-existent policy", "namespace", ns.Name, "policy", currentPolicyName)
}
} else {
log.Error(err, "error getting policy for namespace", "namespace", ns.Name, "policy", currentPolicyName)
}
}
// if the namespace is bound to a policy -> cleanup resources of other policies
requirement, err := labels.NewRequirement(
PolicyNameLabelKey, selection.NotEquals, []string{currentPolicyName},
)
// log the error but continue cleaning up the other namespaces
if err != nil {
@@ -90,3 +115,30 @@ func (c *VirtualClusterPolicyReconciler) cleanupNamespaces(ctx context.Context)
return nil
}
// clearPolicyFieldsForClustersInNamespace sets the policy status on Cluster objects in the given namespace to nil.
func (c *VirtualClusterPolicyReconciler) clearPolicyFieldsForClustersInNamespace(ctx context.Context, namespace string) error {
log := ctrl.LoggerFrom(ctx)
var clusters v1beta1.ClusterList
if err := c.Client.List(ctx, &clusters, client.InNamespace(namespace)); err != nil {
return fmt.Errorf("failed listing clusters in namespace %s: %w", namespace, err)
}
var updateErrs []error
for i := range clusters.Items {
cluster := clusters.Items[i]
if cluster.Status.Policy != nil {
log.V(1).Info("Clearing policy status for Cluster", "cluster", cluster.Name, "namespace", namespace)
cluster.Status.Policy = nil
if updateErr := c.Client.Status().Update(ctx, &cluster); updateErr != nil {
updateErr = fmt.Errorf("failed updating Status for Cluster %s: %w", cluster.Name, updateErr)
updateErrs = append(updateErrs, updateErr)
}
}
}
return errors.Join(updateErrs...)
}

View File

@@ -470,16 +470,22 @@ func (c *VirtualClusterPolicyReconciler) reconcileClusters(ctx context.Context,
var clusterUpdateErrs []error
for _, cluster := range clusters.Items {
orig := cluster.DeepCopy()
origStatus := cluster.Status.DeepCopy()
cluster.Spec.PriorityClass = policy.Spec.DefaultPriorityClass
cluster.Spec.NodeSelector = policy.Spec.DefaultNodeSelector
cluster.Status.Policy = &v1beta1.AppliedPolicy{
Name: policy.Name,
PriorityClass: &policy.Spec.DefaultPriorityClass,
NodeSelector: policy.Spec.DefaultNodeSelector,
Sync: policy.Spec.Sync,
}
if !reflect.DeepEqual(orig, cluster) {
if !reflect.DeepEqual(origStatus, &cluster.Status) {
log.V(1).Info("Updating Cluster", "cluster", cluster.Name, "namespace", namespace.Name)
// continue updating also the other clusters even if an error occurred
clusterUpdateErrs = append(clusterUpdateErrs, c.Client.Update(ctx, &cluster))
if err := c.Client.Status().Update(ctx, &cluster); err != nil {
clusterUpdateErrs = append(clusterUpdateErrs, err)
}
}
}

View File

@@ -2,7 +2,6 @@ package policy_test
import (
"context"
"reflect"
"time"
"k8s.io/apimachinery/pkg/api/resource"
@@ -307,7 +306,7 @@ var _ = Describe("VirtualClusterPolicy Controller", Label("controller"), Label("
Expect(ns.Labels).Should(HaveKeyWithValue("pod-security.kubernetes.io/enforce-version", "latest"))
})
It("should update Cluster's PriorityClass", func() {
It("updates the Cluster's policy status with the DefaultPriorityClass", func() {
policy := newPolicy(v1beta1.VirtualClusterPolicySpec{
DefaultPriorityClass: "foobar",
})
@@ -329,19 +328,22 @@ var _ = Describe("VirtualClusterPolicy Controller", Label("controller"), Label("
err := k8sClient.Create(ctx, cluster)
Expect(err).To(Not(HaveOccurred()))
// wait a bit
Eventually(func() bool {
Eventually(func(g Gomega) {
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
err = k8sClient.Get(ctx, key, cluster)
Expect(err).To(Not(HaveOccurred()))
return cluster.Spec.PriorityClass == policy.Spec.DefaultPriorityClass
g.Expect(err).To(Not(HaveOccurred()))
g.Expect(cluster.Spec.PriorityClass).To(BeEmpty())
g.Expect(cluster.Status.Policy).To(Not(BeNil()))
g.Expect(cluster.Status.Policy.PriorityClass).To(Not(BeNil()))
g.Expect(*cluster.Status.Policy.PriorityClass).To(Equal(policy.Spec.DefaultPriorityClass))
}).
WithTimeout(time.Second * 10).
WithPolling(time.Second).
Should(BeTrue())
Should(Succeed())
})
It("should update Cluster's NodeSelector", func() {
It("updates the Cluster's policy status with the DefaultNodeSelector", func() {
policy := newPolicy(v1beta1.VirtualClusterPolicySpec{
DefaultNodeSelector: map[string]string{"label-1": "value-1"},
})
@@ -366,18 +368,21 @@ var _ = Describe("VirtualClusterPolicy Controller", Label("controller"), Label("
Expect(err).To(Not(HaveOccurred()))
// wait a bit
Eventually(func() bool {
Eventually(func(g Gomega) {
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
err = k8sClient.Get(ctx, key, cluster)
Expect(err).To(Not(HaveOccurred()))
return reflect.DeepEqual(cluster.Spec.NodeSelector, policy.Spec.DefaultNodeSelector)
g.Expect(cluster.Spec.NodeSelector).To(BeEmpty())
g.Expect(cluster.Status.Policy).To(Not(BeNil()))
g.Expect(cluster.Status.Policy.NodeSelector).To(Equal(map[string]string{"label-1": "value-1"}))
}).
WithTimeout(time.Second * 10).
WithPolling(time.Second).
Should(BeTrue())
Should(Succeed())
})
It("should update the nodeSelector if changed", func() {
It("updates the Cluster's policy status when the VCP nodeSelector changes", func() {
policy := newPolicy(v1beta1.VirtualClusterPolicySpec{
DefaultNodeSelector: map[string]string{"label-1": "value-1"},
})
@@ -399,43 +404,56 @@ var _ = Describe("VirtualClusterPolicy Controller", Label("controller"), Label("
err := k8sClient.Create(ctx, cluster)
Expect(err).To(Not(HaveOccurred()))
Expect(cluster.Spec.NodeSelector).To(Equal(policy.Spec.DefaultNodeSelector))
// Cluster Spec should not change, VCP NodeSelector should be present in the Status
Eventually(func(g Gomega) {
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
err = k8sClient.Get(ctx, key, cluster)
Expect(err).To(Not(HaveOccurred()))
g.Expect(cluster.Spec.NodeSelector).To(Equal(map[string]string{"label-1": "value-1"}))
g.Expect(cluster.Status.Policy).To(Not(BeNil()))
g.Expect(cluster.Status.Policy.NodeSelector).To(Equal(map[string]string{"label-1": "value-1"}))
}).
WithTimeout(time.Second * 10).
WithPolling(time.Second).
Should(Succeed())
// update the VirtualClusterPolicy
policy.Spec.DefaultNodeSelector["label-2"] = "value-2"
err = k8sClient.Update(ctx, policy)
Expect(err).To(Not(HaveOccurred()))
Expect(cluster.Spec.NodeSelector).To(Not(Equal(policy.Spec.DefaultNodeSelector)))
// wait a bit
Eventually(func() bool {
Eventually(func(g Gomega) {
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
err = k8sClient.Get(ctx, key, cluster)
Expect(err).To(Not(HaveOccurred()))
return reflect.DeepEqual(cluster.Spec.NodeSelector, policy.Spec.DefaultNodeSelector)
g.Expect(cluster.Spec.NodeSelector).To(Equal(map[string]string{"label-1": "value-1"}))
g.Expect(cluster.Status.Policy).To(Not(BeNil()))
g.Expect(cluster.Status.Policy.NodeSelector).To(Equal(map[string]string{"label-1": "value-1", "label-2": "value-2"}))
}).
WithTimeout(time.Second * 10).
WithPolling(time.Second).
Should(BeTrue())
Should(Succeed())
// Update the Cluster
err = k8sClient.Get(ctx, client.ObjectKeyFromObject(cluster), cluster)
Expect(err).To(Not(HaveOccurred()))
cluster.Spec.NodeSelector["label-3"] = "value-3"
err = k8sClient.Update(ctx, cluster)
Expect(err).To(Not(HaveOccurred()))
Expect(cluster.Spec.NodeSelector).To(Not(Equal(policy.Spec.DefaultNodeSelector)))
// wait a bit and check it's restored
Eventually(func() bool {
var updatedCluster v1beta1.Cluster
Consistently(func(g Gomega) {
key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}
err = k8sClient.Get(ctx, key, &updatedCluster)
Expect(err).To(Not(HaveOccurred()))
return reflect.DeepEqual(updatedCluster.Spec.NodeSelector, policy.Spec.DefaultNodeSelector)
err = k8sClient.Get(ctx, key, cluster)
g.Expect(err).To(Not(HaveOccurred()))
g.Expect(cluster.Spec.NodeSelector).To(Equal(map[string]string{"label-1": "value-1", "label-3": "value-3"}))
}).
WithTimeout(time.Second * 10).
WithPolling(time.Second).
Should(BeTrue())
Should(Succeed())
})
It("should create a ResourceQuota if Quota is enabled", func() {

View File

@@ -0,0 +1,199 @@
package k3k_test
import (
"context"
"time"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
"github.com/rancher/k3k/pkg/controller/cluster"
"github.com/rancher/k3k/pkg/controller/policy"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = When("a shared mode cluster is created in a namespace with a policy", Ordered, Label(e2eTestLabel), func() {
var (
ctx context.Context
virtualCluster *VirtualCluster
vcp *v1beta1.VirtualClusterPolicy
)
BeforeAll(func() {
ctx = context.Background()
// 1. Create StorageClasses in host
storageClassEnabled := &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "sc-policy-enabled-",
Labels: map[string]string{
cluster.SyncEnabledLabelKey: "true",
},
},
Provisioner: "my-provisioner",
}
var err error
storageClassEnabled, err = k8s.StorageV1().StorageClasses().Create(ctx, storageClassEnabled, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
storageClassDisabled := &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "sc-policy-disabled-",
Labels: map[string]string{
cluster.SyncEnabledLabelKey: "false",
},
},
Provisioner: "my-provisioner",
}
storageClassDisabled, err = k8s.StorageV1().StorageClasses().Create(ctx, storageClassDisabled, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
// 2. Create VirtualClusterPolicy with StorageClass sync enabled
vcp = &v1beta1.VirtualClusterPolicy{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "vcp-sync-sc-",
},
Spec: v1beta1.VirtualClusterPolicySpec{
Sync: &v1beta1.SyncConfig{
StorageClasses: v1beta1.StorageClassSyncConfig{
Enabled: true,
},
},
},
}
err = k8sClient.Create(ctx, vcp)
Expect(err).To(Not(HaveOccurred()))
// 3. Create Namespace with policy label
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "ns-vcp-",
Labels: map[string]string{
policy.PolicyNameLabelKey: vcp.Name,
},
},
}
// We use the k8s clientset for namespace creation to stay consistent with other tests
ns, err = k8s.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
// 4. Create VirtualCluster in that namespace
// The cluster doesn't have storage class sync enabled in its spec
clusterObj := NewCluster(ns.Name)
clusterObj.Spec.Sync = &v1beta1.SyncConfig{
StorageClasses: v1beta1.StorageClassSyncConfig{
Enabled: false,
},
}
clusterObj.Spec.Expose.NodePort.ServerPort = ptr.To[int32](30000)
CreateCluster(clusterObj)
client, restConfig, kubeconfig := NewVirtualK8sClientAndKubeconfig(clusterObj)
virtualCluster = &VirtualCluster{
Cluster: clusterObj,
RestConfig: restConfig,
Client: client,
Kubeconfig: kubeconfig,
}
DeferCleanup(func() {
DeleteNamespaces(ns.Name)
err = k8s.StorageV1().StorageClasses().Delete(ctx, storageClassEnabled.Name, metav1.DeleteOptions{})
Expect(err).To(Not(HaveOccurred()))
err = k8s.StorageV1().StorageClasses().Delete(ctx, storageClassDisabled.Name, metav1.DeleteOptions{})
Expect(err).To(Not(HaveOccurred()))
err = k8sClient.Delete(ctx, vcp)
Expect(err).To(Not(HaveOccurred()))
})
})
It("has the storage classes sync enabled from the policy", func() {
Eventually(func(g Gomega) {
key := client.ObjectKeyFromObject(virtualCluster.Cluster)
g.Expect(k8sClient.Get(ctx, key, virtualCluster.Cluster)).To(Succeed())
g.Expect(virtualCluster.Cluster.Status.Policy).To(Not(BeNil()))
g.Expect(virtualCluster.Cluster.Status.Policy.Sync).To(Not(BeNil()))
g.Expect(virtualCluster.Cluster.Status.Policy.Sync.StorageClasses.Enabled).To(BeTrue())
}).
WithTimeout(time.Second * 30).
WithPolling(time.Second).
Should(Succeed())
})
It("will sync host storage classes with the sync enabled in the host", func() {
Eventually(func(g Gomega) {
hostStorageClasses, err := k8s.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
g.Expect(err).To(Not(HaveOccurred()))
for _, hostSC := range hostStorageClasses.Items {
// We only care about the storage classes we created for this test to avoid noise
if hostSC.Labels[cluster.SyncEnabledLabelKey] == "true" {
_, err := virtualCluster.Client.StorageV1().StorageClasses().Get(ctx, hostSC.Name, metav1.GetOptions{})
g.Expect(err).To(Not(HaveOccurred()))
}
}
}).
WithPolling(time.Second).
WithTimeout(time.Second * 60).
Should(Succeed())
})
It("will not sync host storage classes with the sync disabled in the host", func() {
Eventually(func(g Gomega) {
hostStorageClasses, err := k8s.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
g.Expect(err).To(Not(HaveOccurred()))
for _, hostSC := range hostStorageClasses.Items {
if hostSC.Labels[cluster.SyncEnabledLabelKey] == "false" {
_, err := virtualCluster.Client.StorageV1().StorageClasses().Get(ctx, hostSC.Name, metav1.GetOptions{})
g.Expect(err).To(HaveOccurred())
g.Expect(apierrors.IsNotFound(err)).To(BeTrue())
}
}
}).
WithPolling(time.Second).
WithTimeout(time.Second * 60).
Should(Succeed())
})
When("disabling the storage class sync in the policy", Ordered, func() {
BeforeAll(func() {
original := vcp.DeepCopy()
vcp.Spec.Sync.StorageClasses.Enabled = false
err := k8sClient.Patch(ctx, vcp, client.MergeFrom(original))
Expect(err).To(Not(HaveOccurred()))
})
It("will remove the synced storage classes from the virtual cluster", func() {
Eventually(func(g Gomega) {
hostStorageClasses, err := k8s.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
g.Expect(err).To(Not(HaveOccurred()))
for _, hostSC := range hostStorageClasses.Items {
if hostSC.Labels[cluster.SyncEnabledLabelKey] == "true" {
_, err := virtualCluster.Client.StorageV1().StorageClasses().Get(ctx, hostSC.Name, metav1.GetOptions{})
g.Expect(err).To(HaveOccurred())
g.Expect(apierrors.IsNotFound(err)).To(BeTrue())
}
}
}).
WithPolling(time.Second).
WithTimeout(time.Second * 60).
Should(Succeed())
})
})
})

View File

@@ -8,6 +8,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
corev1 "k8s.io/api/core/v1"
schedv1 "k8s.io/api/scheduling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
@@ -99,6 +100,100 @@ var _ = When("a cluster's status is tracked", Label(e2eTestLabel), Label(statusT
WithPolling(time.Second * 5).
Should(Succeed())
})
It("created with field controlled from a policy", func() {
ctx := context.Background()
priorityClass := &schedv1.PriorityClass{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "pc-",
},
Value: 100,
}
Expect(k8sClient.Create(ctx, priorityClass)).To(Succeed())
clusterObj := &v1beta1.Cluster{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "status-cluster-",
Namespace: namespace.Name,
},
Spec: v1beta1.ClusterSpec{
PriorityClass: priorityClass.Name,
},
}
Expect(k8sClient.Create(ctx, clusterObj)).To(Succeed())
DeferCleanup(func() {
Expect(k8sClient.Delete(ctx, priorityClass)).To(Succeed())
})
clusterKey := client.ObjectKeyFromObject(clusterObj)
// Check for the initial status to be set
Eventually(func(g Gomega) {
err := k8sClient.Get(ctx, clusterKey, clusterObj)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(clusterObj.Status.Phase).To(Equal(v1beta1.ClusterProvisioning))
cond := meta.FindStatusCondition(clusterObj.Status.Conditions, cluster.ConditionReady)
g.Expect(cond).NotTo(BeNil())
g.Expect(cond.Status).To(Equal(metav1.ConditionFalse))
g.Expect(cond.Reason).To(Equal(cluster.ReasonProvisioning))
}).
WithPolling(time.Second * 2).
WithTimeout(time.Second * 20).
Should(Succeed())
// Check for the status to be updated to Ready
Eventually(func(g Gomega) {
err := k8sClient.Get(ctx, clusterKey, clusterObj)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(clusterObj.Status.Phase).To(Equal(v1beta1.ClusterReady))
g.Expect(clusterObj.Status.Policy).To(Not(BeNil()))
g.Expect(clusterObj.Status.Policy.Name).To(Equal(vcp.Name))
cond := meta.FindStatusCondition(clusterObj.Status.Conditions, cluster.ConditionReady)
g.Expect(cond).NotTo(BeNil())
g.Expect(cond.Status).To(Equal(metav1.ConditionTrue))
g.Expect(cond.Reason).To(Equal(cluster.ReasonProvisioned))
}).
WithTimeout(time.Minute * 3).
WithPolling(time.Second * 5).
Should(Succeed())
// update policy
priorityClassVCP := &schedv1.PriorityClass{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "pc-",
},
Value: 100,
}
Expect(k8sClient.Create(ctx, priorityClassVCP)).To(Succeed())
DeferCleanup(func() {
Expect(k8sClient.Delete(ctx, priorityClassVCP)).To(Succeed())
})
vcp.Spec.DefaultPriorityClass = priorityClassVCP.Name
Expect(k8sClient.Update(ctx, vcp)).To(Succeed())
// Check for the status to be updated to Ready
Eventually(func(g Gomega) {
err := k8sClient.Get(ctx, clusterKey, clusterObj)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(clusterObj.Status.Policy).To(Not(BeNil()))
g.Expect(clusterObj.Status.Policy.PriorityClass).To(Not(BeNil()))
g.Expect(*clusterObj.Status.Policy.PriorityClass).To(Equal(priorityClassVCP.Name))
g.Expect(clusterObj.Spec.PriorityClass).To(Equal(priorityClass.Name))
}).
WithTimeout(time.Minute * 3).
WithPolling(time.Second * 5).
Should(Succeed())
})
})
Context("and the cluster has validation errors", func() {

View File

@@ -0,0 +1,194 @@
package k3k_test
import (
"context"
"time"
"sigs.k8s.io/controller-runtime/pkg/client"
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/rancher/k3k/pkg/controller/cluster"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = When("a shared mode cluster is created", Ordered, Label(e2eTestLabel), func() {
var (
ctx context.Context
virtualCluster *VirtualCluster
)
BeforeAll(func() {
ctx = context.Background()
virtualCluster = NewVirtualCluster()
storageClassEnabled := &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "sc-",
Labels: map[string]string{
cluster.SyncEnabledLabelKey: "true",
},
},
Provisioner: "my-provisioner",
}
storageClassEnabled, err := k8s.StorageV1().StorageClasses().Create(ctx, storageClassEnabled, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
storageClassDisabled := &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "sc-",
Labels: map[string]string{
cluster.SyncEnabledLabelKey: "false",
},
},
Provisioner: "my-provisioner",
}
storageClassDisabled, err = k8s.StorageV1().StorageClasses().Create(ctx, storageClassDisabled, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
DeferCleanup(func() {
DeleteNamespaces(virtualCluster.Cluster.Namespace)
err = k8s.StorageV1().StorageClasses().Delete(ctx, storageClassEnabled.Name, metav1.DeleteOptions{})
Expect(err).To(Not(HaveOccurred()))
err = k8s.StorageV1().StorageClasses().Delete(ctx, storageClassDisabled.Name, metav1.DeleteOptions{})
Expect(err).To(Not(HaveOccurred()))
})
})
It("has disabled the storage classes sync", func() {
Expect(virtualCluster.Cluster.Spec.Sync).To(Not(BeNil()))
Expect(virtualCluster.Cluster.Spec.Sync.StorageClasses.Enabled).To(BeFalse())
})
It("doesn't have storage classes", func() {
virtualStorageClasses, err := virtualCluster.Client.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
Expect(err).To(Not(HaveOccurred()))
Expect(virtualStorageClasses.Items).To(HaveLen(0))
})
It("has some storage classes in the host", func() {
hostStorageClasses, err := k8s.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
Expect(err).To(Not(HaveOccurred()))
Expect(hostStorageClasses.Items).To(Not(HaveLen(0)))
})
It("can create storage classes in the virtual cluster", func() {
storageClass := &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "sc-",
},
Provisioner: "my-provisioner",
}
storageClass, err := virtualCluster.Client.StorageV1().StorageClasses().Create(ctx, storageClass, metav1.CreateOptions{})
Expect(err).To(Not(HaveOccurred()))
virtualStorageClasses, err := virtualCluster.Client.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
Expect(err).To(Not(HaveOccurred()))
Expect(virtualStorageClasses.Items).To(HaveLen(1))
Expect(virtualStorageClasses.Items[0].Name).To(Equal(storageClass.Name))
})
When("enabling the storage class sync", Ordered, func() {
BeforeAll(func() {
GinkgoWriter.Println("Enabling the storage class sync")
original := virtualCluster.Cluster.DeepCopy()
virtualCluster.Cluster.Spec.Sync.StorageClasses.Enabled = true
err := k8sClient.Patch(ctx, virtualCluster.Cluster, client.MergeFrom(original))
Expect(err).To(Not(HaveOccurred()))
Eventually(func(g Gomega) {
key := client.ObjectKeyFromObject(virtualCluster.Cluster)
g.Expect(k8sClient.Get(ctx, key, virtualCluster.Cluster)).To(Succeed())
g.Expect(virtualCluster.Cluster.Spec.Sync.StorageClasses.Enabled).To(BeTrue())
}).
WithTimeout(time.Second * 10).
WithPolling(time.Second).
Should(Succeed())
})
It("will sync host storage classes with the sync enabled", func() {
Eventually(func(g Gomega) {
hostStorageClasses, err := k8s.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
Expect(err).To(Not(HaveOccurred()))
for _, hostSC := range hostStorageClasses.Items {
_, err := virtualCluster.Client.StorageV1().StorageClasses().Get(ctx, hostSC.Name, metav1.GetOptions{})
if syncEnabled, found := hostSC.Labels[cluster.SyncEnabledLabelKey]; !found || syncEnabled == "true" {
g.Expect(err).To(Not(HaveOccurred()))
}
}
}).
MustPassRepeatedly(5).
WithPolling(time.Second).
WithTimeout(time.Second * 30).
Should(Succeed())
})
It("will not sync host storage classes with the sync disabled", func() {
Eventually(func(g Gomega) {
hostStorageClasses, err := k8s.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
Expect(err).To(Not(HaveOccurred()))
for _, hostSC := range hostStorageClasses.Items {
_, err := virtualCluster.Client.StorageV1().StorageClasses().Get(ctx, hostSC.Name, metav1.GetOptions{})
if hostSC.Labels[cluster.SyncEnabledLabelKey] == "false" {
g.Expect(err).To(HaveOccurred())
g.Expect(apierrors.IsNotFound(err)).To(BeTrue())
}
}
}).
MustPassRepeatedly(5).
WithPolling(time.Second).
WithTimeout(time.Second * 30).
Should(Succeed())
})
})
When("editing a synced storage class in the host cluster", Ordered, func() {
var syncedStorageClass *storagev1.StorageClass
BeforeAll(func() {
hostStorageClasses, err := k8s.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{})
Expect(err).To(Not(HaveOccurred()))
for _, hostSC := range hostStorageClasses.Items {
if syncEnabled, found := hostSC.Labels[cluster.SyncEnabledLabelKey]; !found || syncEnabled == "true" {
syncedStorageClass = &hostSC
break
}
}
Expect(syncedStorageClass).To(Not(BeNil()))
syncedStorageClass.Labels["foo"] = "bar"
_, err = k8s.StorageV1().StorageClasses().Update(ctx, syncedStorageClass, metav1.UpdateOptions{})
Expect(err).To(Not(HaveOccurred()))
})
It("will update the synced storage class in the virtual cluster", func() {
Eventually(func(g Gomega) {
_, err := virtualCluster.Client.StorageV1().StorageClasses().Get(ctx, syncedStorageClass.Name, metav1.GetOptions{})
g.Expect(err).To(Not(HaveOccurred()))
g.Expect(syncedStorageClass.Labels).Should(HaveKeyWithValue("foo", "bar"))
}).
MustPassRepeatedly(5).
WithPolling(time.Second).
WithTimeout(time.Second * 30).
Should(Succeed())
})
})
})

View File

@@ -37,6 +37,7 @@ import (
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1beta1"
@@ -77,7 +78,6 @@ var (
k8s *kubernetes.Clientset
k8sClient client.Client
kubeconfigPath string
repo string
helmActionConfig *action.Configuration
)
@@ -86,17 +86,17 @@ var _ = BeforeSuite(func() {
GinkgoWriter.Println("GOCOVERDIR:", os.Getenv("GOCOVERDIR"))
repo = os.Getenv("REPO")
if repo == "" {
repo = "rancher"
}
_, dockerInstallEnabled := os.LookupEnv("K3K_DOCKER_INSTALL")
if dockerInstallEnabled {
installK3SDocker(ctx)
repo := os.Getenv("REPO")
if repo == "" {
repo = "rancher"
}
installK3SDocker(ctx, repo+"/k3k", repo+"/k3k-kubelet")
initKubernetesClient(ctx)
installK3kChart()
installK3kChart(repo+"/k3k", repo+"/k3k-kubelet")
} else {
initKubernetesClient(ctx)
}
@@ -110,6 +110,11 @@ func initKubernetesClient(ctx context.Context) {
kubeconfig []byte
)
logger, err := zap.NewDevelopment()
Expect(err).NotTo(HaveOccurred())
log.SetLogger(zapr.NewLogger(logger))
kubeconfigPath := os.Getenv("KUBECONFIG")
Expect(kubeconfigPath).To(Not(BeEmpty()))
@@ -128,21 +133,12 @@ func initKubernetesClient(ctx context.Context) {
scheme := buildScheme()
k8sClient, err = client.New(restcfg, client.Options{Scheme: scheme})
Expect(err).NotTo(HaveOccurred())
logger, err := zap.NewDevelopment()
Expect(err).NotTo(HaveOccurred())
log.SetLogger(zapr.NewLogger(logger))
}
func buildScheme() *runtime.Scheme {
scheme := runtime.NewScheme()
err := v1.AddToScheme(scheme)
Expect(err).NotTo(HaveOccurred())
err = appsv1.AddToScheme(scheme)
Expect(err).NotTo(HaveOccurred())
err = networkingv1.AddToScheme(scheme)
err := clientgoscheme.AddToScheme(scheme)
Expect(err).NotTo(HaveOccurred())
err = v1beta1.AddToScheme(scheme)
Expect(err).NotTo(HaveOccurred())
@@ -150,7 +146,7 @@ func buildScheme() *runtime.Scheme {
return scheme
}
func installK3SDocker(ctx context.Context) {
func installK3SDocker(ctx context.Context, controllerImage, kubeletImage string) {
var (
err error
kubeconfig []byte
@@ -182,16 +178,15 @@ func installK3SDocker(ctx context.Context) {
Expect(tmpFile.Close()).To(Succeed())
kubeconfigPath = tmpFile.Name()
err = k3sContainer.LoadImages(ctx, repo+"/k3k:dev", repo+"/k3k-kubelet:dev")
err = k3sContainer.LoadImages(ctx, controllerImage+":dev", kubeletImage+":dev")
Expect(err).To(Not(HaveOccurred()))
DeferCleanup(os.Remove, kubeconfigPath)
Expect(os.Setenv("KUBECONFIG", kubeconfigPath)).To(Succeed())
GinkgoWriter.Print(kubeconfigPath)
GinkgoWriter.Print(string(kubeconfig))
GinkgoWriter.Printf("KUBECONFIG set to: %s\n", kubeconfigPath)
}
func installK3kChart() {
func installK3kChart(controllerImage, kubeletImage string) {
pwd, err := os.Getwd()
Expect(err).To(Not(HaveOccurred()))
@@ -207,7 +202,7 @@ func installK3kChart() {
Expect(err).To(Not(HaveOccurred()))
err = helmActionConfig.Init(restClientGetter, k3kNamespace, os.Getenv("HELM_DRIVER"), func(format string, v ...any) {
GinkgoWriter.Printf("helm debug: "+format+"\n", v...)
GinkgoWriter.Printf("[Helm] "+format+"\n", v...)
})
Expect(err).To(Not(HaveOccurred()))
@@ -219,9 +214,17 @@ func installK3kChart() {
iCli.Wait = true
controllerMap, _ := k3kChart.Values["controller"].(map[string]any)
extraEnvArray, _ := controllerMap["extraEnv"].([]map[string]any)
extraEnvArray = append(extraEnvArray, map[string]any{
"name": "DEBUG",
"value": "true",
})
controllerMap["extraEnv"] = extraEnvArray
imageMap, _ := controllerMap["image"].(map[string]any)
maps.Copy(imageMap, map[string]any{
"repository": repo + "/k3k",
"repository": controllerImage,
"tag": "dev",
"pullPolicy": "IfNotPresent",
})
@@ -230,14 +233,14 @@ func installK3kChart() {
sharedAgentMap, _ := agentMap["shared"].(map[string]any)
sharedAgentImageMap, _ := sharedAgentMap["image"].(map[string]any)
maps.Copy(sharedAgentImageMap, map[string]any{
"repository": repo + "/k3k-kubelet",
"repository": kubeletImage,
"tag": "dev",
})
release, err := iCli.Run(k3kChart, k3kChart.Values)
Expect(err).To(Not(HaveOccurred()))
GinkgoWriter.Printf("Release %s installed in %s namespace\n", release.Name, release.Namespace)
GinkgoWriter.Printf("Helm release '%s' installed in '%s' namespace\n", release.Name, release.Namespace)
}
func patchPVC(ctx context.Context, clientset *kubernetes.Clientset) {
@@ -300,36 +303,28 @@ func patchPVC(ctx context.Context, clientset *kubernetes.Clientset) {
_, err = clientset.AppsV1().Deployments(k3kNamespace).Update(ctx, k3kDeployment, metav1.UpdateOptions{})
Expect(err).To(Not(HaveOccurred()))
Eventually(func() bool {
Eventually(func(g Gomega) {
GinkgoWriter.Println("Checking K3k deployment status")
dep, err := clientset.AppsV1().Deployments(k3kNamespace).Get(ctx, k3kDeployment.Name, metav1.GetOptions{})
Expect(err).To(Not(HaveOccurred()))
g.Expect(err).To(Not(HaveOccurred()))
g.Expect(dep.Generation).To(Equal(dep.Status.ObservedGeneration))
// 1. Check if the controller has observed the latest generation
if dep.Generation > dep.Status.ObservedGeneration {
GinkgoWriter.Printf("K3k deployment generation: %d, observed generation: %d\n", dep.Generation, dep.Status.ObservedGeneration)
return false
var availableCond appsv1.DeploymentCondition
for _, cond := range dep.Status.Conditions {
if cond.Type == appsv1.DeploymentAvailable {
availableCond = cond
break
}
}
// 2. Check if all replicas have been updated
if dep.Spec.Replicas != nil && dep.Status.UpdatedReplicas < *dep.Spec.Replicas {
GinkgoWriter.Printf("K3k deployment replicas: %d, updated replicas: %d\n", *dep.Spec.Replicas, dep.Status.UpdatedReplicas)
return false
}
// 3. Check if all updated replicas are available
if dep.Status.AvailableReplicas < dep.Status.UpdatedReplicas {
GinkgoWriter.Printf("K3k deployment available replicas: %d, updated replicas: %d\n", dep.Status.AvailableReplicas, dep.Status.UpdatedReplicas)
return false
}
return true
g.Expect(availableCond.Type).To(Equal(appsv1.DeploymentAvailable))
g.Expect(availableCond.Status).To(Equal(v1.ConditionTrue))
}).
MustPassRepeatedly(5).
WithPolling(time.Second).
WithTimeout(time.Second * 30).
Should(BeTrue())
Should(Succeed())
}
var _ = AfterSuite(func() {