feat: extend Gateway API support to Konnectivity addons (#1054)

This change extends Gateway API support to Konnectivity addons.
When `spec.controlPlane.gateway` is configured and Konnectivity addon is
enabled, Kamaji automatically creates two TLSRoutes:
1. A Control plane TLSRoute (port 6443, sectionName "kube-apiserver")
2. A Konnectivity TLSRoute (port 8132, sectionName "konnectivity-server")

Both routes use the hostname specified in `gateway.hostname` and reference
the same Gateway resource via `parentRefs`, with `port` and `sectionName`
set automatically by Kamaji.

This patch also adds CEL validation to prevent users from specifying
`port` or `sectionName` in Gateway `parentRefs`, as these fields are now
managed automatically by Kamaji.

Signed-off-by: Parth Yadav <parth@coredge.io>
This commit is contained in:
Parth Yadav
2026-01-11 16:01:24 +05:30
committed by GitHub
parent b6b4888177
commit 87242ff005
22 changed files with 2711 additions and 199 deletions

View File

@@ -6,14 +6,10 @@ package resources
import (
"context"
"fmt"
"net/url"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -56,66 +52,12 @@ func (r *KubernetesGatewayResource) gatewayStatusNeedsUpdate(tcp *kamajiv1alpha1
currentStatus := tcp.Status.Kubernetes.Gateway
// Check if route reference has changed
if currentStatus.RouteRef.Name != r.resource.Name {
if currentStatus != nil && currentStatus.RouteRef.Name != r.resource.Name {
return true
}
// Compare RouteStatus - check if number of parents changed
if len(currentStatus.RouteStatus.Parents) != len(r.resource.Status.RouteStatus.Parents) {
return true
}
// Compare individual parent statuses
// NOTE: Multiple Parent References are assumed.
for i, currentParent := range currentStatus.RouteStatus.Parents {
if i >= len(r.resource.Status.RouteStatus.Parents) {
return true
}
resourceParent := r.resource.Status.RouteStatus.Parents[i]
// Compare parent references
if currentParent.ParentRef.Name != resourceParent.ParentRef.Name ||
(currentParent.ParentRef.Namespace == nil) != (resourceParent.ParentRef.Namespace == nil) ||
(currentParent.ParentRef.Namespace != nil && resourceParent.ParentRef.Namespace != nil &&
*currentParent.ParentRef.Namespace != *resourceParent.ParentRef.Namespace) ||
(currentParent.ParentRef.SectionName == nil) != (resourceParent.ParentRef.SectionName == nil) ||
(currentParent.ParentRef.SectionName != nil && resourceParent.ParentRef.SectionName != nil &&
*currentParent.ParentRef.SectionName != *resourceParent.ParentRef.SectionName) {
return true
}
if len(currentParent.Conditions) != len(resourceParent.Conditions) {
return true
}
// Compare each condition
for j, currentCondition := range currentParent.Conditions {
if j >= len(resourceParent.Conditions) {
return true
}
resourceCondition := resourceParent.Conditions[j]
if currentCondition.Type != resourceCondition.Type ||
currentCondition.Status != resourceCondition.Status ||
currentCondition.Reason != resourceCondition.Reason ||
currentCondition.Message != resourceCondition.Message ||
!currentCondition.LastTransitionTime.Equal(&resourceCondition.LastTransitionTime) {
return true
}
}
}
// Since access points are derived from route status and gateway conditions,
// and we've already compared the route status above, we can assume that
// if the route status hasn't changed, the access points calculation
// will produce the same result. This avoids the need for complex
// gateway fetching in the status comparison.
//
// If there are edge cases where gateway state changes but route status doesn't,
// those will be caught in the next reconciliation cycle anyway.
return false
return IsGatewayRouteStatusChanged(currentStatus, r.resource.Status.RouteStatus)
}
func (r *KubernetesGatewayResource) ShouldCleanup(tcp *kamajiv1alpha1.TenantControlPlane) bool {
@@ -125,95 +67,18 @@ func (r *KubernetesGatewayResource) ShouldCleanup(tcp *kamajiv1alpha1.TenantCont
func (r *KubernetesGatewayResource) CleanUp(ctx context.Context, tcp *kamajiv1alpha1.TenantControlPlane) (bool, error) {
logger := log.FromContext(ctx, "resource", r.GetName())
route := gatewayv1alpha2.TLSRoute{}
if err := r.Client.Get(ctx, client.ObjectKey{
Namespace: r.resource.GetNamespace(),
Name: r.resource.GetName(),
}, &route); err != nil {
if !k8serrors.IsNotFound(err) {
logger.Error(err, "failed to get TLSRoute before cleanup")
cleaned, err := CleanupTLSRoute(ctx, r.Client, r.resource.GetName(), r.resource.GetNamespace(), tcp)
if err != nil {
logger.Error(err, "failed to cleanup tcp route")
return false, err
}
return false, nil
return false, err
}
if !metav1.IsControlledBy(&route, tcp) {
logger.Info("skipping cleanup: route is not managed by Kamaji", "name", route.Name, "namespace", route.Namespace)
return false, nil
if cleaned {
logger.V(1).Info("tcp route cleaned up successfully")
}
if err := r.Client.Delete(ctx, &route); err != nil {
if !k8serrors.IsNotFound(err) {
// TODO: Is that an error? Wanted to delete the resource anyways.
logger.Error(err, "cannot cleanup tcp route")
return false, err
}
return false, nil
}
logger.V(1).Info("tcp route cleaned up successfully")
return true, nil
}
// fetchGatewayByListener uses the indexer to efficiently find a gateway with a specific listener.
// This avoids the need to iterate through all listeners in a gateway.
func (r *KubernetesGatewayResource) fetchGatewayByListener(ctx context.Context, ref gatewayv1.ParentReference) (*gatewayv1.Gateway, error) {
if ref.Namespace == nil {
return nil, fmt.Errorf("missing namespace")
}
if ref.SectionName == nil {
return nil, fmt.Errorf("missing sectionName")
}
// Build the composite key that matches our indexer format: namespace/gatewayName/listenerName
listenerKey := fmt.Sprintf("%s/%s/%s", *ref.Namespace, ref.Name, *ref.SectionName)
// Query gateways using the indexer
gatewayList := &gatewayv1.GatewayList{}
if err := r.Client.List(ctx, gatewayList, client.MatchingFieldsSelector{
Selector: fields.OneTermEqualSelector(kamajiv1alpha1.GatewayListenerNameKey, listenerKey),
}); err != nil {
return nil, fmt.Errorf("failed to list gateways by listener: %w", err)
}
if len(gatewayList.Items) == 0 {
return nil, fmt.Errorf("no gateway found with listener '%s'", *ref.SectionName)
}
// Since we're using a composite key with namespace/name/listener, we should get exactly one result
if len(gatewayList.Items) > 1 {
return nil, fmt.Errorf("found multiple gateways with listener '%s', expected exactly one", *ref.SectionName)
}
return &gatewayList.Items[0], nil
}
func FindMatchingListener(listeners []gatewayv1.Listener, ref gatewayv1.ParentReference) (gatewayv1.Listener, error) {
if ref.SectionName == nil {
return gatewayv1.Listener{}, fmt.Errorf("missing sectionName")
}
name := *ref.SectionName
for _, listener := range listeners {
if listener.Name == name {
return listener, nil
}
}
// TODO: Handle the cases according to the spec:
// - When both Port (experimental) and SectionName are
// specified, the name and port of the selected listener
// must match both specified values.
// - When unspecified (empty string) this will reference
// the entire resource [...] an attachment is considered
// successful if at least one section in the parent resource accepts it
return gatewayv1.Listener{}, fmt.Errorf("could not find listener '%s'", name)
return cleaned, nil
}
func (r *KubernetesGatewayResource) UpdateTenantControlPlaneStatus(ctx context.Context, tcp *kamajiv1alpha1.TenantControlPlane) error {
@@ -251,53 +116,9 @@ func (r *KubernetesGatewayResource) UpdateTenantControlPlaneStatus(ctx context.C
}
logger.V(1).Info("updating TenantControlPlane status for Gateway routes")
accessPoints := []kamajiv1alpha1.GatewayAccessPoint{}
for _, routeStatus := range routeStatuses.Parents {
routeAccepted := meta.IsStatusConditionTrue(
routeStatus.Conditions,
string(gatewayv1.RouteConditionAccepted),
)
if !routeAccepted {
continue
}
// Use the indexer to efficiently find the gateway with the specific listener
gateway, err := r.fetchGatewayByListener(ctx, routeStatus.ParentRef)
if err != nil {
return fmt.Errorf("could not fetch gateway with listener '%v': %w",
routeStatus.ParentRef.SectionName, err)
}
gatewayProgrammed := meta.IsStatusConditionTrue(
gateway.Status.Conditions,
string(gatewayv1.GatewayConditionProgrammed),
)
if !gatewayProgrammed {
continue
}
// Since we fetched the gateway using the indexer, we know the listener exists
// but we still need to get its details from the gateway spec
listener, err := FindMatchingListener(
gateway.Spec.Listeners, routeStatus.ParentRef,
)
if err != nil {
return fmt.Errorf("failed to match listener: %w", err)
}
for _, hostname := range r.resource.Spec.Hostnames {
rawURL := fmt.Sprintf("https://%s:%d", hostname, listener.Port)
url, err := url.Parse(rawURL)
if err != nil {
return fmt.Errorf("invalid url: %w", err)
}
hostnameAddressType := gatewayv1.HostnameAddressType
accessPoints = append(accessPoints, kamajiv1alpha1.GatewayAccessPoint{
Type: &hostnameAddressType,
Value: url.String(),
Port: listener.Port,
})
}
accessPoints, err := BuildGatewayAccessPointsStatus(ctx, r.Client, r.resource, routeStatuses)
if err != nil {
return err
}
tcp.Status.Kubernetes.Gateway.AccessPoints = accessPoints
@@ -329,10 +150,6 @@ func (r *KubernetesGatewayResource) mutate(tcp *kamajiv1alpha1.TenantControlPlan
tcp.Spec.ControlPlane.Gateway.AdditionalMetadata.Annotations)
r.resource.SetAnnotations(annotations)
if tcp.Spec.ControlPlane.Gateway.GatewayParentRefs != nil {
r.resource.Spec.ParentRefs = tcp.Spec.ControlPlane.Gateway.GatewayParentRefs
}
serviceName := gatewayv1alpha2.ObjectName(tcp.Status.Kubernetes.Service.Name)
servicePort := tcp.Status.Kubernetes.Service.Port
@@ -340,6 +157,11 @@ func (r *KubernetesGatewayResource) mutate(tcp *kamajiv1alpha1.TenantControlPlan
return fmt.Errorf("service not ready, cannot create TLSRoute")
}
if tcp.Spec.ControlPlane.Gateway.GatewayParentRefs != nil {
// Copy parentRefs and explicitly set port and sectionName fields
r.resource.Spec.ParentRefs = NewParentRefsSpecWithPortAndSection(tcp.Spec.ControlPlane.Gateway.GatewayParentRefs, servicePort, "kube-apiserver")
}
rule := gatewayv1alpha2.TLSRouteRule{
BackendRefs: []gatewayv1alpha2.BackendRef{
{

View File

@@ -12,6 +12,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
@@ -100,6 +101,62 @@ var _ = Describe("KubernetesGatewayResource", func() {
shouldUpdate := resource.ShouldStatusBeUpdated(ctx, tcp)
Expect(shouldUpdate).To(BeTrue())
})
It("should set port and sectionName in parentRefs, overriding any user-provided values", func() {
customPort := gatewayv1.PortNumber(9999)
customSectionName := gatewayv1.SectionName("custom")
tcp.Spec.ControlPlane.Gateway.GatewayParentRefs[0].Port = &customPort
tcp.Spec.ControlPlane.Gateway.GatewayParentRefs[0].SectionName = &customSectionName
err := resource.Define(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
_, err = resource.CreateOrUpdate(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
route := &gatewayv1alpha2.TLSRoute{}
err = resource.Client.Get(ctx, client.ObjectKey{Name: tcp.Name, Namespace: tcp.Namespace}, route)
Expect(err).NotTo(HaveOccurred())
Expect(route.Spec.ParentRefs).To(HaveLen(1))
Expect(route.Spec.ParentRefs[0].Port).NotTo(BeNil())
Expect(*route.Spec.ParentRefs[0].Port).To(Equal(tcp.Status.Kubernetes.Service.Port))
Expect(*route.Spec.ParentRefs[0].Port).NotTo(Equal(customPort))
Expect(route.Spec.ParentRefs[0].SectionName).NotTo(BeNil())
Expect(*route.Spec.ParentRefs[0].SectionName).To(Equal(gatewayv1.SectionName("kube-apiserver")))
Expect(*route.Spec.ParentRefs[0].SectionName).NotTo(Equal(customSectionName))
})
It("should handle multiple parentRefs correctly", func() {
namespace := gatewayv1.Namespace("default")
tcp.Spec.ControlPlane.Gateway.GatewayParentRefs = []gatewayv1alpha2.ParentReference{
{
Name: "test-gateway-1",
Namespace: &namespace,
},
{
Name: "test-gateway-2",
Namespace: &namespace,
},
}
err := resource.Define(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
_, err = resource.CreateOrUpdate(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
route := &gatewayv1alpha2.TLSRoute{}
err = resource.Client.Get(ctx, client.ObjectKey{Name: tcp.Name, Namespace: tcp.Namespace}, route)
Expect(err).NotTo(HaveOccurred())
Expect(route.Spec.ParentRefs).To(HaveLen(2))
for i := range route.Spec.ParentRefs {
Expect(route.Spec.ParentRefs[i].Port).NotTo(BeNil())
Expect(*route.Spec.ParentRefs[i].Port).To(Equal(tcp.Status.Kubernetes.Service.Port))
Expect(route.Spec.ParentRefs[i].SectionName).NotTo(BeNil())
Expect(*route.Spec.ParentRefs[i].SectionName).To(Equal(gatewayv1.SectionName("kube-apiserver")))
}
})
})
Context("When GatewayRoutes is not configured", func() {
@@ -235,4 +292,81 @@ var _ = Describe("KubernetesGatewayResource", func() {
Expect(listener.Port).To(Equal(gatewayv1.PortNumber(80)))
})
})
Describe("NewParentRefsSpecWithPortAndSection", func() {
var (
parentRefs []gatewayv1.ParentReference
testPort int32
testSectionName string
)
BeforeEach(func() {
namespace := gatewayv1.Namespace("default")
namespace2 := gatewayv1.Namespace("other")
testPort = int32(6443)
testSectionName = "kube-apiserver"
originalPort := gatewayv1.PortNumber(9999)
originalSectionName := gatewayv1.SectionName("original")
parentRefs = []gatewayv1.ParentReference{
{
Name: "test-gateway-1",
Namespace: &namespace,
Port: &originalPort,
SectionName: &originalSectionName,
},
{
Name: "test-gateway-2",
Namespace: &namespace2,
},
}
})
It("should create copy of parentRefs with port and sectionName set", func() {
result := resources.NewParentRefsSpecWithPortAndSection(parentRefs, testPort, testSectionName)
Expect(result).To(HaveLen(2))
for i := range result {
Expect(result[i].Name).To(Equal(parentRefs[i].Name))
Expect(result[i].Namespace).To(Equal(parentRefs[i].Namespace))
Expect(result[i].Port).NotTo(BeNil())
Expect(*result[i].Port).To(Equal(testPort))
Expect(result[i].SectionName).NotTo(BeNil())
Expect(*result[i].SectionName).To(Equal(gatewayv1.SectionName(testSectionName)))
}
})
It("should not modify original parentRefs", func() {
// Store original values for verification
originalFirstPort := parentRefs[0].Port
originalFirstSectionName := parentRefs[0].SectionName
originalSecondPort := parentRefs[1].Port
originalSecondSectionName := parentRefs[1].SectionName
result := resources.NewParentRefsSpecWithPortAndSection(parentRefs, testPort, testSectionName)
// Original should remain unchanged
Expect(parentRefs[0].Port).To(Equal(originalFirstPort))
Expect(parentRefs[0].SectionName).To(Equal(originalFirstSectionName))
Expect(parentRefs[1].Port).To(Equal(originalSecondPort))
Expect(parentRefs[1].SectionName).To(Equal(originalSecondSectionName))
// Result should have new values
Expect(result[0].Port).NotTo(BeNil())
Expect(*result[0].Port).To(Equal(testPort))
Expect(result[0].SectionName).NotTo(BeNil())
Expect(*result[0].SectionName).To(Equal(gatewayv1.SectionName(testSectionName)))
Expect(result[1].Port).NotTo(BeNil())
Expect(*result[1].Port).To(Equal(testPort))
Expect(result[1].SectionName).NotTo(BeNil())
Expect(*result[1].SectionName).To(Equal(gatewayv1.SectionName(testSectionName)))
})
It("should handle empty parentRefs slice", func() {
parentRefs = []gatewayv1.ParentReference{}
result := resources.NewParentRefsSpecWithPortAndSection(parentRefs, testPort, testSectionName)
Expect(result).To(BeEmpty())
})
})
})

View File

@@ -0,0 +1,241 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package resources
import (
"context"
"fmt"
"net/url"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"sigs.k8s.io/controller-runtime/pkg/client"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
)
// fetchGatewayByListener uses the indexer to efficiently find a gateway with a specific listener.
// This avoids the need to iterate through all listeners in a gateway.
func fetchGatewayByListener(ctx context.Context, c client.Client, ref gatewayv1.ParentReference) (*gatewayv1.Gateway, error) {
if ref.SectionName == nil {
return nil, fmt.Errorf("missing sectionName")
}
// Build the composite key that matches our indexer format: namespace/gatewayName/listenerName
listenerKey := fmt.Sprintf("%s/%s/%s", *ref.Namespace, ref.Name, *ref.SectionName)
// Query gateways using the indexer
gatewayList := &gatewayv1.GatewayList{}
if err := c.List(ctx, gatewayList, client.MatchingFieldsSelector{
Selector: fields.OneTermEqualSelector(kamajiv1alpha1.GatewayListenerNameKey, listenerKey),
}); err != nil {
return nil, fmt.Errorf("failed to list gateways by listener: %w", err)
}
if len(gatewayList.Items) == 0 {
return nil, fmt.Errorf("no gateway found with listener '%s'", *ref.SectionName)
}
// Since we're using a composite key with namespace/name/listener, we should get exactly one result
if len(gatewayList.Items) > 1 {
return nil, fmt.Errorf("found multiple gateways with listener '%s', expected exactly one", *ref.SectionName)
}
return &gatewayList.Items[0], nil
}
// FindMatchingListener finds a listener in the given list that matches the parent reference.
func FindMatchingListener(listeners []gatewayv1.Listener, ref gatewayv1.ParentReference) (gatewayv1.Listener, error) {
if ref.SectionName == nil {
return gatewayv1.Listener{}, fmt.Errorf("missing sectionName")
}
name := *ref.SectionName
for _, listener := range listeners {
if listener.Name == name {
return listener, nil
}
}
// TODO: Handle the cases according to the spec:
// - When both Port (experimental) and SectionName are
// specified, the name and port of the selected listener
// must match both specified values.
// - When unspecified (empty string) this will reference
// the entire resource [...] an attachment is considered
// successful if at least one section in the parent resource accepts it
return gatewayv1.Listener{}, fmt.Errorf("could not find listener '%s'", name)
}
// IsGatewayRouteStatusChanged checks if the gateway route status has changed compared to the stored status.
// Returns true if the status has changed (update needed), false if it's the same.
func IsGatewayRouteStatusChanged(currentStatus *kamajiv1alpha1.KubernetesGatewayStatus, resourceStatus gatewayv1alpha2.RouteStatus) bool {
if currentStatus == nil {
return true
}
// Compare RouteStatus - check if number of parents changed
if len(currentStatus.RouteStatus.Parents) != len(resourceStatus.Parents) {
return true
}
// Compare individual parent statuses
// NOTE: Multiple Parent References are assumed.
for i, currentParent := range currentStatus.RouteStatus.Parents {
if i >= len(resourceStatus.Parents) {
return true
}
resourceParent := resourceStatus.Parents[i]
// Compare parent references
if currentParent.ParentRef.Name != resourceParent.ParentRef.Name ||
(currentParent.ParentRef.Namespace == nil) != (resourceParent.ParentRef.Namespace == nil) ||
(currentParent.ParentRef.Namespace != nil && resourceParent.ParentRef.Namespace != nil &&
*currentParent.ParentRef.Namespace != *resourceParent.ParentRef.Namespace) ||
(currentParent.ParentRef.SectionName == nil) != (resourceParent.ParentRef.SectionName == nil) ||
(currentParent.ParentRef.SectionName != nil && resourceParent.ParentRef.SectionName != nil &&
*currentParent.ParentRef.SectionName != *resourceParent.ParentRef.SectionName) {
return true
}
if len(currentParent.Conditions) != len(resourceParent.Conditions) {
return true
}
// Compare each condition
for j, currentCondition := range currentParent.Conditions {
if j >= len(resourceParent.Conditions) {
return true
}
resourceCondition := resourceParent.Conditions[j]
if currentCondition.Type != resourceCondition.Type ||
currentCondition.Status != resourceCondition.Status ||
currentCondition.Reason != resourceCondition.Reason ||
currentCondition.Message != resourceCondition.Message ||
!currentCondition.LastTransitionTime.Equal(&resourceCondition.LastTransitionTime) {
return true
}
}
}
// Since access points are derived from route status and gateway conditions,
// and we've already compared the route status above, we can assume that
// if the route status hasn't changed, the access points calculation
// will produce the same result. This avoids the need for complex
// gateway fetching in the status comparison.
//
// If there are edge cases where gateway state changes but route status doesn't,
// those will be caught in the next reconciliation cycle anyway.
return false
}
// CleanupTLSRoute cleans up a TLSRoute resource if it's managed by the given TenantControlPlane.
func CleanupTLSRoute(ctx context.Context, c client.Client, routeName, routeNamespace string, tcp metav1.Object) (bool, error) {
route := gatewayv1alpha2.TLSRoute{}
if err := c.Get(ctx, client.ObjectKey{
Namespace: routeNamespace,
Name: routeName,
}, &route); err != nil {
if !k8serrors.IsNotFound(err) {
return false, fmt.Errorf("failed to get TLSRoute before cleanup: %w", err)
}
return false, nil
}
if !metav1.IsControlledBy(&route, tcp) {
return false, nil
}
if err := c.Delete(ctx, &route); err != nil {
if !k8serrors.IsNotFound(err) {
return false, fmt.Errorf("cannot delete TLSRoute route: %w", err)
}
return false, nil
}
return true, nil
}
// BuildGatewayAccessPointsStatus builds access points from route statuses.
func BuildGatewayAccessPointsStatus(ctx context.Context, c client.Client, route *gatewayv1alpha2.TLSRoute, routeStatuses gatewayv1alpha2.RouteStatus) ([]kamajiv1alpha1.GatewayAccessPoint, error) {
accessPoints := []kamajiv1alpha1.GatewayAccessPoint{}
routeNamespace := gatewayv1.Namespace(route.Namespace)
for _, routeStatus := range routeStatuses.Parents {
routeAccepted := meta.IsStatusConditionTrue(
routeStatus.Conditions,
string(gatewayv1.RouteConditionAccepted),
)
if !routeAccepted {
continue
}
if routeStatus.ParentRef.Namespace == nil {
// Set the namespace to the route namespace if not set
routeStatus.ParentRef.Namespace = &routeNamespace
}
// Use the indexer to efficiently find the gateway with the specific listener
gateway, err := fetchGatewayByListener(ctx, c, routeStatus.ParentRef)
if err != nil {
return nil, fmt.Errorf("could not fetch gateway with listener '%v': %w",
routeStatus.ParentRef.SectionName, err)
}
gatewayProgrammed := meta.IsStatusConditionTrue(
gateway.Status.Conditions,
string(gatewayv1.GatewayConditionProgrammed),
)
if !gatewayProgrammed {
continue
}
// Since we fetched the gateway using the indexer, we know the listener exists
// but we still need to get its details from the gateway spec
listener, err := FindMatchingListener(
gateway.Spec.Listeners, routeStatus.ParentRef,
)
if err != nil {
return nil, fmt.Errorf("failed to match listener: %w", err)
}
for _, hostname := range route.Spec.Hostnames {
rawURL := fmt.Sprintf("https://%s:%d", hostname, listener.Port)
parsedURL, err := url.Parse(rawURL)
if err != nil {
return nil, fmt.Errorf("invalid url: %w", err)
}
hostnameAddressType := gatewayv1.HostnameAddressType
accessPoints = append(accessPoints, kamajiv1alpha1.GatewayAccessPoint{
Type: &hostnameAddressType,
Value: parsedURL.String(),
Port: listener.Port,
})
}
}
return accessPoints, nil
}
// NewParentRefsSpecWithPortAndSection creates a copy of parentRefs with port and sectionName set for each reference.
func NewParentRefsSpecWithPortAndSection(parentRefs []gatewayv1.ParentReference, port int32, sectionName string) []gatewayv1.ParentReference {
result := make([]gatewayv1.ParentReference, len(parentRefs))
sectionNamePtr := gatewayv1.SectionName(sectionName)
for i, parentRef := range parentRefs {
result[i] = *parentRef.DeepCopy()
result[i].Port = &port
result[i].SectionName = &sectionNamePtr
}
return result
}

View File

@@ -182,6 +182,21 @@ func (r *Agent) mutate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.T
return err
}
// Override address with control plane gateway hostname if configured
// Konnectivity TLSRoute uses the same hostname as control plane gateway
if tenantControlPlane.Spec.ControlPlane.Gateway != nil &&
len(tenantControlPlane.Spec.ControlPlane.Gateway.Hostname) > 0 {
hostname := tenantControlPlane.Spec.ControlPlane.Gateway.Hostname
// Extract hostname
if len(hostname) > 0 {
konnectivityHostname, _ := utilities.GetControlPlaneAddressAndPortFromHostname(
string(hostname),
tenantControlPlane.Spec.Addons.Konnectivity.KonnectivityServerSpec.Port)
address = konnectivityHostname
}
}
r.resource.SetLabels(utilities.MergeMaps(r.resource.GetLabels(), utilities.KamajiLabels(tenantControlPlane.GetName(), r.GetName())))
specSelector := &metav1.LabelSelector{

View File

@@ -0,0 +1,232 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package konnectivity
import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/resources"
"github.com/clastix/kamaji/internal/utilities"
)
type KubernetesKonnectivityGatewayResource struct {
resource *gatewayv1alpha2.TLSRoute
Client client.Client
}
func (r *KubernetesKonnectivityGatewayResource) GetHistogram() prometheus.Histogram {
gatewayCollector = resources.LazyLoadHistogramFromResource(gatewayCollector, r)
return gatewayCollector
}
func (r *KubernetesKonnectivityGatewayResource) ShouldStatusBeUpdated(_ context.Context, tcp *kamajiv1alpha1.TenantControlPlane) bool {
switch {
case !r.shouldHaveGateway(tcp) && (tcp.Status.Addons.Konnectivity.Gateway == nil):
return false
case r.shouldHaveGateway(tcp) && (tcp.Status.Addons.Konnectivity.Gateway == nil):
return true
case !r.shouldHaveGateway(tcp) && (tcp.Status.Addons.Konnectivity.Gateway != nil):
return true
case r.shouldHaveGateway(tcp) && (tcp.Status.Addons.Konnectivity.Gateway != nil):
return r.gatewayStatusNeedsUpdate(tcp)
}
return false
}
// shouldHaveGateway checks if Konnectivity gateway should be configured.
// Create when Konnectivity addon is enabled and control plane gateway is configured.
func (r *KubernetesKonnectivityGatewayResource) shouldHaveGateway(tcp *kamajiv1alpha1.TenantControlPlane) bool {
if tcp.Spec.Addons.Konnectivity == nil { // konnectivity addon is disabled
return false
}
// Create when control plane gateway is configured
return tcp.Spec.ControlPlane.Gateway != nil
}
// gatewayStatusNeedsUpdate compares the current gateway resource status with the stored status.
func (r *KubernetesKonnectivityGatewayResource) gatewayStatusNeedsUpdate(tcp *kamajiv1alpha1.TenantControlPlane) bool {
currentStatus := tcp.Status.Addons.Konnectivity.Gateway
// Check if route reference has changed
if currentStatus != nil && currentStatus.RouteRef.Name != r.resource.Name {
return true
}
// Compare RouteStatus - check if number of parents changed
return resources.IsGatewayRouteStatusChanged(currentStatus, r.resource.Status.RouteStatus)
}
func (r *KubernetesKonnectivityGatewayResource) ShouldCleanup(tcp *kamajiv1alpha1.TenantControlPlane) bool {
return !r.shouldHaveGateway(tcp) && tcp.Status.Addons.Konnectivity.Gateway != nil
}
func (r *KubernetesKonnectivityGatewayResource) CleanUp(ctx context.Context, tcp *kamajiv1alpha1.TenantControlPlane) (bool, error) {
logger := log.FromContext(ctx, "resource", r.GetName())
cleaned, err := resources.CleanupTLSRoute(ctx, r.Client, r.resource.GetName(), r.resource.GetNamespace(), tcp)
if err != nil {
logger.Error(err, "failed to cleanup konnectivity route")
return false, err
}
if cleaned {
logger.V(1).Info("konnectivity route cleaned up successfully")
}
return cleaned, nil
}
func (r *KubernetesKonnectivityGatewayResource) UpdateTenantControlPlaneStatus(ctx context.Context, tcp *kamajiv1alpha1.TenantControlPlane) error {
logger := log.FromContext(ctx, "resource", r.GetName())
// Clean up status if Gateway routes are no longer configured
if !r.shouldHaveGateway(tcp) {
tcp.Status.Addons.Konnectivity.Gateway = nil
return nil
}
tcp.Status.Addons.Konnectivity.Gateway = &kamajiv1alpha1.KubernetesGatewayStatus{
RouteStatus: r.resource.Status.RouteStatus,
RouteRef: v1.LocalObjectReference{
Name: r.resource.Name,
},
}
routeStatuses := tcp.Status.Addons.Konnectivity.Gateway.RouteStatus
// TODO: Investigate the implications of having multiple parents / hostnames
// TODO: Use condition to report?
if len(routeStatuses.Parents) == 0 {
return fmt.Errorf("no gateway attached to the konnectivity route")
}
if len(routeStatuses.Parents) > 1 {
return fmt.Errorf("too many gateways attached to the konnectivity route")
}
if len(r.resource.Spec.Hostnames) == 0 {
return fmt.Errorf("no hostname in the konnectivity route")
}
if len(r.resource.Spec.Hostnames) > 1 {
return fmt.Errorf("too many hostnames in the konnectivity route")
}
logger.V(1).Info("updating TenantControlPlane status for Konnectivity Gateway routes")
accessPoints, err := resources.BuildGatewayAccessPointsStatus(ctx, r.Client, r.resource, routeStatuses)
if err != nil {
return err
}
tcp.Status.Addons.Konnectivity.Gateway.AccessPoints = accessPoints
return nil
}
func (r *KubernetesKonnectivityGatewayResource) Define(_ context.Context, tcp *kamajiv1alpha1.TenantControlPlane) error {
r.resource = &gatewayv1alpha2.TLSRoute{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-konnectivity", tcp.GetName()),
Namespace: tcp.GetNamespace(),
},
}
return nil
}
func (r *KubernetesKonnectivityGatewayResource) mutate(tcp *kamajiv1alpha1.TenantControlPlane) controllerutil.MutateFn {
return func() error {
// Use control plane gateway configuration
if tcp.Spec.ControlPlane.Gateway == nil {
return fmt.Errorf("control plane gateway is not configured")
}
labels := utilities.MergeMaps(
r.resource.GetLabels(),
utilities.KamajiLabels(tcp.GetName(), r.GetName()),
tcp.Spec.ControlPlane.Gateway.AdditionalMetadata.Labels,
)
r.resource.SetLabels(labels)
annotations := utilities.MergeMaps(
r.resource.GetAnnotations(),
tcp.Spec.ControlPlane.Gateway.AdditionalMetadata.Annotations,
)
r.resource.SetAnnotations(annotations)
// Use hostname from control plane gateway
if len(tcp.Spec.ControlPlane.Gateway.Hostname) == 0 {
return fmt.Errorf("control plane gateway hostname is not set")
}
serviceName := gatewayv1alpha2.ObjectName(tcp.Status.Addons.Konnectivity.Service.Name)
servicePort := tcp.Status.Addons.Konnectivity.Service.Port
if serviceName == "" || servicePort == 0 {
return fmt.Errorf("konnectivity service not ready, cannot create TLSRoute")
}
// Copy parentRefs from control plane gateway and explicitly set port and sectionName fields
if tcp.Spec.ControlPlane.Gateway.GatewayParentRefs == nil {
return fmt.Errorf("control plane gateway parentRefs are not specified")
}
r.resource.Spec.ParentRefs = resources.NewParentRefsSpecWithPortAndSection(tcp.Spec.ControlPlane.Gateway.GatewayParentRefs, servicePort, "konnectivity-server")
rule := gatewayv1alpha2.TLSRouteRule{
BackendRefs: []gatewayv1alpha2.BackendRef{
{
BackendObjectReference: gatewayv1alpha2.BackendObjectReference{
Name: serviceName,
Port: &servicePort,
},
},
},
}
r.resource.Spec.Hostnames = []gatewayv1.Hostname{tcp.Spec.ControlPlane.Gateway.Hostname}
r.resource.Spec.Rules = []gatewayv1alpha2.TLSRouteRule{rule}
return controllerutil.SetControllerReference(tcp, r.resource, r.Client.Scheme())
}
}
func (r *KubernetesKonnectivityGatewayResource) CreateOrUpdate(ctx context.Context, tenantControlPlane *kamajiv1alpha1.TenantControlPlane) (controllerutil.OperationResult, error) {
logger := log.FromContext(ctx, "resource", r.GetName())
if !r.shouldHaveGateway(tenantControlPlane) {
return controllerutil.OperationResultNone, nil
}
if tenantControlPlane.Spec.ControlPlane.Gateway == nil {
return controllerutil.OperationResultNone, nil
}
if len(tenantControlPlane.Spec.ControlPlane.Gateway.Hostname) == 0 {
return controllerutil.OperationResultNone, fmt.Errorf("missing hostname to expose Konnectivity using a Gateway resource")
}
logger.V(1).Info("creating or updating resource konnectivity gateway routes")
result, err := utilities.CreateOrUpdateWithConflict(ctx, r.Client, r.resource, r.mutate(tenantControlPlane))
if err != nil {
return result, err
}
return result, nil
}
func (r *KubernetesKonnectivityGatewayResource) GetName() string {
return "konnectivity_gateway_routes"
}

View File

@@ -0,0 +1,218 @@
// Copyright 2022 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package konnectivity_test
import (
"context"
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
kamajiv1alpha1 "github.com/clastix/kamaji/api/v1alpha1"
"github.com/clastix/kamaji/internal/resources/konnectivity"
)
func TestKonnectivityGatewayResource(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Konnectivity Gateway Resource Suite")
}
var runtimeScheme *runtime.Scheme
var _ = BeforeSuite(func() {
runtimeScheme = runtime.NewScheme()
Expect(scheme.AddToScheme(runtimeScheme)).To(Succeed())
Expect(kamajiv1alpha1.AddToScheme(runtimeScheme)).To(Succeed())
Expect(gatewayv1alpha2.Install(runtimeScheme)).To(Succeed())
})
var _ = Describe("KubernetesKonnectivityGatewayResource", func() {
var (
tcp *kamajiv1alpha1.TenantControlPlane
resource *konnectivity.KubernetesKonnectivityGatewayResource
ctx context.Context
)
BeforeEach(func() {
ctx = context.Background()
fakeClient := fake.NewClientBuilder().
WithScheme(runtimeScheme).
Build()
resource = &konnectivity.KubernetesKonnectivityGatewayResource{
Client: fakeClient,
}
namespace := gatewayv1.Namespace("default")
tcp = &kamajiv1alpha1.TenantControlPlane{
ObjectMeta: metav1.ObjectMeta{
Name: "test-tcp",
Namespace: "default",
},
Spec: kamajiv1alpha1.TenantControlPlaneSpec{
ControlPlane: kamajiv1alpha1.ControlPlane{
Gateway: &kamajiv1alpha1.GatewaySpec{
Hostname: gatewayv1alpha2.Hostname("test.example.com"),
GatewayParentRefs: []gatewayv1alpha2.ParentReference{
{
Name: "test-gateway",
Namespace: &namespace,
},
},
},
},
Addons: kamajiv1alpha1.AddonsSpec{
Konnectivity: &kamajiv1alpha1.KonnectivitySpec{
KonnectivityServerSpec: kamajiv1alpha1.KonnectivityServerSpec{
Port: 8132,
},
},
},
},
Status: kamajiv1alpha1.TenantControlPlaneStatus{
Addons: kamajiv1alpha1.AddonsStatus{
Konnectivity: kamajiv1alpha1.KonnectivityStatus{
Service: kamajiv1alpha1.KubernetesServiceStatus{
Name: "test-konnectivity-service",
Port: 8132,
},
},
},
},
}
})
Describe("shouldHaveGateway logic", func() {
It("should return false when Konnectivity addon is disabled", func() {
tcp.Spec.Addons.Konnectivity = nil
shouldUpdate := resource.ShouldStatusBeUpdated(ctx, tcp)
Expect(shouldUpdate).To(BeFalse())
Expect(resource.ShouldCleanup(tcp)).To(BeFalse())
})
It("should return false when control plane gateway is not configured", func() {
tcp.Spec.ControlPlane.Gateway = nil
shouldUpdate := resource.ShouldStatusBeUpdated(ctx, tcp)
Expect(shouldUpdate).To(BeFalse())
Expect(resource.ShouldCleanup(tcp)).To(BeFalse())
})
It("should return true when both Konnectivity and gateway are configured", func() {
shouldUpdate := resource.ShouldStatusBeUpdated(ctx, tcp)
Expect(shouldUpdate).To(BeTrue())
Expect(resource.ShouldCleanup(tcp)).To(BeFalse())
})
})
Context("When Konnectivity gateway should be configured", func() {
It("should set correct TLSRoute name with -konnectivity suffix", func() {
err := resource.Define(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
_, err = resource.CreateOrUpdate(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
route := &gatewayv1alpha2.TLSRoute{}
err = resource.Client.Get(ctx, client.ObjectKey{Name: "test-tcp-konnectivity", Namespace: tcp.Namespace}, route)
Expect(err).NotTo(HaveOccurred())
Expect(route.Name).To(Equal("test-tcp-konnectivity"))
})
It("should set sectionName to \"konnectivity-server\" and port from Konnectivity service status", func() {
err := resource.Define(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
_, err = resource.CreateOrUpdate(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
route := &gatewayv1alpha2.TLSRoute{}
err = resource.Client.Get(ctx, client.ObjectKey{Name: "test-tcp-konnectivity", Namespace: tcp.Namespace}, route)
Expect(err).NotTo(HaveOccurred())
Expect(route.Spec.ParentRefs).To(HaveLen(1))
Expect(route.Spec.ParentRefs[0].SectionName).NotTo(BeNil())
Expect(*route.Spec.ParentRefs[0].SectionName).To(Equal(gatewayv1.SectionName("konnectivity-server")))
Expect(route.Spec.ParentRefs[0].Port).NotTo(BeNil())
Expect(*route.Spec.ParentRefs[0].Port).To(Equal(tcp.Status.Addons.Konnectivity.Service.Port))
})
It("should use control plane gateway hostname", func() {
err := resource.Define(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
_, err = resource.CreateOrUpdate(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
route := &gatewayv1alpha2.TLSRoute{}
err = resource.Client.Get(ctx, client.ObjectKey{Name: "test-tcp-konnectivity", Namespace: tcp.Namespace}, route)
Expect(err).NotTo(HaveOccurred())
Expect(route.Spec.Hostnames).To(HaveLen(1))
Expect(route.Spec.Hostnames[0]).To(Equal(tcp.Spec.ControlPlane.Gateway.Hostname))
})
})
Context("Konnectivity-specific error cases", func() {
It("should return early without error when control plane gateway is not configured", func() {
tcp.Spec.ControlPlane.Gateway = nil
err := resource.Define(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
result, err := resource.CreateOrUpdate(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
Expect(result).To(Equal(controllerutil.OperationResultNone))
})
It("should fail when Konnectivity service is not ready", func() {
tcp.Status.Addons.Konnectivity.Service.Name = ""
tcp.Status.Addons.Konnectivity.Service.Port = 0
err := resource.Define(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
_, err = resource.CreateOrUpdate(ctx, tcp)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("konnectivity service not ready"))
})
It("should fail when control plane gateway parentRefs are not specified", func() {
tcp.Spec.ControlPlane.Gateway.GatewayParentRefs = nil
err := resource.Define(ctx, tcp)
Expect(err).NotTo(HaveOccurred())
_, err = resource.CreateOrUpdate(ctx, tcp)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("control plane gateway parentRefs are not specified"))
})
})
Context("When Konnectivity gateway should not be configured", func() {
BeforeEach(func() {
tcp.Spec.Addons.Konnectivity = nil
tcp.Status.Addons.Konnectivity = kamajiv1alpha1.KonnectivityStatus{
Gateway: &kamajiv1alpha1.KubernetesGatewayStatus{
AccessPoints: nil,
},
}
})
It("should cleanup when gateway is removed", func() {
Expect(resource.ShouldCleanup(tcp)).To(BeTrue())
})
})
It("should return correct resource name", func() {
Expect(resource.GetName()).To(Equal("konnectivity_gateway_routes"))
})
})

View File

@@ -13,6 +13,7 @@ var (
clusterrolebindingCollector prometheus.Histogram
deploymentCollector prometheus.Histogram
egressCollector prometheus.Histogram
gatewayCollector prometheus.Histogram
kubeconfigCollector prometheus.Histogram
serviceaccountCollector prometheus.Histogram
serviceCollector prometheus.Histogram