mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-02-14 18:09:57 +00:00
support loadBalancer for grpc endpoint type (#1255)
Signed-off-by: Zhiwei Yin <zyin@redhat.com>
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
@@ -932,16 +933,82 @@ func GRPCAuthEnabled(cm *operatorapiv1.ClusterManager) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func GRPCServerHostNames(clustermanagerNamespace string, cm *operatorapiv1.ClusterManager) []string {
|
||||
hostNames := []string{fmt.Sprintf("%s-grpc-server.%s.svc", cm.Name, clustermanagerNamespace)}
|
||||
func GRPCServerHostNames(kubeClient kubernetes.Interface, clusterManagerNamespace string, cm *operatorapiv1.ClusterManager) ([]string, error) {
|
||||
hostNames := []string{fmt.Sprintf("%s-grpc-server.%s.svc", cm.Name, clusterManagerNamespace)}
|
||||
if cm.Spec.ServerConfiguration != nil {
|
||||
for _, endpoint := range cm.Spec.ServerConfiguration.EndpointsExposure {
|
||||
if endpoint.Protocol == "grpc" && endpoint.GRPC != nil && endpoint.GRPC.Type == operatorapiv1.EndpointTypeHostname {
|
||||
if endpoint.GRPC.Hostname != nil && strings.TrimSpace(endpoint.GRPC.Hostname.Host) != "" {
|
||||
if endpoint.Protocol != operatorapiv1.GRPCAuthType {
|
||||
continue
|
||||
}
|
||||
if endpoint.GRPC == nil {
|
||||
continue
|
||||
}
|
||||
switch endpoint.GRPC.Type {
|
||||
case operatorapiv1.EndpointTypeHostname:
|
||||
if endpoint.GRPC.Hostname != nil &&
|
||||
strings.TrimSpace(endpoint.GRPC.Hostname.Host) != "" &&
|
||||
!slices.Contains(hostNames, endpoint.GRPC.Hostname.Host) {
|
||||
hostNames = append(hostNames, endpoint.GRPC.Hostname.Host)
|
||||
}
|
||||
|
||||
case operatorapiv1.EndpointTypeLoadBalancer:
|
||||
if endpoint.GRPC.LoadBalancer != nil &&
|
||||
strings.TrimSpace(endpoint.GRPC.LoadBalancer.Host) != "" &&
|
||||
!slices.Contains(hostNames, endpoint.GRPC.LoadBalancer.Host) {
|
||||
hostNames = append(hostNames, endpoint.GRPC.LoadBalancer.Host)
|
||||
}
|
||||
|
||||
serviceName := fmt.Sprintf("%s-grpc-server", cm.Name)
|
||||
gRPCService, err := kubeClient.CoreV1().Services(clusterManagerNamespace).
|
||||
Get(context.TODO(), serviceName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return hostNames, fmt.Errorf("failed to find service %s in namespace %s",
|
||||
serviceName, clusterManagerNamespace)
|
||||
}
|
||||
|
||||
if len(gRPCService.Status.LoadBalancer.Ingress) == 0 {
|
||||
return hostNames, fmt.Errorf("failed to find ingress in the status of the service %s in namespace %s",
|
||||
serviceName, clusterManagerNamespace)
|
||||
}
|
||||
|
||||
if len(gRPCService.Status.LoadBalancer.Ingress[0].IP) == 0 &&
|
||||
len(gRPCService.Status.LoadBalancer.Ingress[0].Hostname) == 0 {
|
||||
return hostNames, fmt.Errorf("failed to find ip or hostname in the ingress "+
|
||||
"in the status of the service %s in namespace %s", serviceName, clusterManagerNamespace)
|
||||
}
|
||||
|
||||
if len(gRPCService.Status.LoadBalancer.Ingress[0].IP) != 0 &&
|
||||
!slices.Contains(hostNames, gRPCService.Status.LoadBalancer.Ingress[0].IP) {
|
||||
hostNames = append(hostNames, gRPCService.Status.LoadBalancer.Ingress[0].IP)
|
||||
}
|
||||
|
||||
if len(gRPCService.Status.LoadBalancer.Ingress[0].Hostname) != 0 &&
|
||||
!slices.Contains(hostNames, gRPCService.Status.LoadBalancer.Ingress[0].Hostname) {
|
||||
hostNames = append(hostNames, gRPCService.Status.LoadBalancer.Ingress[0].Hostname)
|
||||
}
|
||||
|
||||
case operatorapiv1.EndpointTypeRoute:
|
||||
// TODO: append route.host to the hostName
|
||||
}
|
||||
}
|
||||
}
|
||||
return hostNames
|
||||
|
||||
return hostNames, nil
|
||||
}
|
||||
|
||||
func GRPCServerEndpointType(cm *operatorapiv1.ClusterManager) string {
|
||||
if cm.Spec.ServerConfiguration != nil {
|
||||
// there is only one gRPC endpoint in EndpointsExposure
|
||||
for _, endpoint := range cm.Spec.ServerConfiguration.EndpointsExposure {
|
||||
if endpoint.Protocol != operatorapiv1.GRPCAuthType {
|
||||
continue
|
||||
}
|
||||
if endpoint.GRPC == nil {
|
||||
return string(operatorapiv1.EndpointTypeHostname)
|
||||
}
|
||||
return string(endpoint.GRPC.Type)
|
||||
}
|
||||
}
|
||||
|
||||
return string(operatorapiv1.EndpointTypeHostname)
|
||||
}
|
||||
|
||||
@@ -1879,91 +1879,95 @@ func TestGRPCAuthEnabled(t *testing.T) {
|
||||
|
||||
func TestGRPCServerHostNames(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
cm *operatorapiv1.ClusterManager
|
||||
namespace string
|
||||
desiredResult []string
|
||||
name string
|
||||
cm *operatorapiv1.ClusterManager
|
||||
namespace string
|
||||
existingObjects []runtime.Object
|
||||
expectedResult []string
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "nil registration config",
|
||||
name: "nil server configuration",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
RegistrationConfiguration: nil,
|
||||
ServerConfiguration: nil,
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
desiredResult: []string{"cluster-manager-grpc-server.test.svc"},
|
||||
namespace: "test",
|
||||
expectedResult: []string{"cluster-manager-grpc-server.test.svc"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "no registration drivers",
|
||||
name: "no endpoints exposure",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
RegistrationConfiguration: &operatorapiv1.RegistrationHubConfiguration{
|
||||
RegistrationDrivers: []operatorapiv1.RegistrationDriverHub{},
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{},
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
desiredResult: []string{"cluster-manager-grpc-server.test.svc"},
|
||||
namespace: "test",
|
||||
expectedResult: []string{"cluster-manager-grpc-server.test.svc"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "one non-grpc registration driver",
|
||||
name: "non-grpc endpoint",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
RegistrationConfiguration: &operatorapiv1.RegistrationHubConfiguration{
|
||||
RegistrationDrivers: []operatorapiv1.RegistrationDriverHub{
|
||||
{AuthType: operatorapiv1.CSRAuthType},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
desiredResult: []string{"cluster-manager-grpc-server.test.svc"},
|
||||
},
|
||||
{
|
||||
name: "one grpc registration driver, no hostname",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
RegistrationConfiguration: &operatorapiv1.RegistrationHubConfiguration{
|
||||
RegistrationDrivers: []operatorapiv1.RegistrationDriverHub{
|
||||
{AuthType: operatorapiv1.GRPCAuthType},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
desiredResult: []string{"cluster-manager-grpc-server.test.svc"},
|
||||
},
|
||||
{
|
||||
name: "one grpc registration driver, with hostname",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
RegistrationConfiguration: &operatorapiv1.RegistrationHubConfiguration{
|
||||
RegistrationDrivers: []operatorapiv1.RegistrationDriverHub{
|
||||
{
|
||||
AuthType: operatorapiv1.GRPCAuthType,
|
||||
},
|
||||
},
|
||||
},
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: "grpc",
|
||||
Protocol: "http",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
expectedResult: []string{"cluster-manager-grpc-server.test.svc"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "grpc endpoint with nil GRPC config",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: nil,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
expectedResult: []string{"cluster-manager-grpc-server.test.svc"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "hostname endpoint with valid host",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeHostname,
|
||||
Hostname: &operatorapiv1.HostnameConfig{
|
||||
@@ -1975,16 +1979,448 @@ func TestGRPCServerHostNames(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
desiredResult: []string{"cluster-manager-grpc-server.test.svc", "test.example.com"},
|
||||
namespace: "test",
|
||||
expectedResult: []string{"cluster-manager-grpc-server.test.svc", "test.example.com"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "hostname endpoint with empty host",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeHostname,
|
||||
Hostname: &operatorapiv1.HostnameConfig{
|
||||
Host: " ",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
expectedResult: []string{"cluster-manager-grpc-server.test.svc"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "loadbalancer endpoint with IP",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
LoadBalancer: &operatorapiv1.LoadBalancerConfig{
|
||||
Host: "myhost.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
existingObjects: []runtime.Object{
|
||||
&corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager-grpc-server",
|
||||
Namespace: "test",
|
||||
},
|
||||
Status: corev1.ServiceStatus{
|
||||
LoadBalancer: corev1.LoadBalancerStatus{
|
||||
Ingress: []corev1.LoadBalancerIngress{
|
||||
{
|
||||
IP: "192.168.1.100",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: []string{"cluster-manager-grpc-server.test.svc", "myhost.com", "192.168.1.100"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "loadbalancer endpoint with hostname",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
existingObjects: []runtime.Object{
|
||||
&corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager-grpc-server",
|
||||
Namespace: "test",
|
||||
},
|
||||
Status: corev1.ServiceStatus{
|
||||
LoadBalancer: corev1.LoadBalancerStatus{
|
||||
Ingress: []corev1.LoadBalancerIngress{
|
||||
{
|
||||
Hostname: "lb.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: []string{"cluster-manager-grpc-server.test.svc", "lb.example.com"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "duplicated hostname",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
LoadBalancer: &operatorapiv1.LoadBalancerConfig{
|
||||
Host: "lb.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
existingObjects: []runtime.Object{
|
||||
&corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager-grpc-server",
|
||||
Namespace: "test",
|
||||
},
|
||||
Status: corev1.ServiceStatus{
|
||||
LoadBalancer: corev1.LoadBalancerStatus{
|
||||
Ingress: []corev1.LoadBalancerIngress{
|
||||
{
|
||||
Hostname: "lb.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: []string{"cluster-manager-grpc-server.test.svc", "lb.example.com"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "loadbalancer endpoint with both IP and hostname",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
existingObjects: []runtime.Object{
|
||||
&corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager-grpc-server",
|
||||
Namespace: "test",
|
||||
},
|
||||
Status: corev1.ServiceStatus{
|
||||
LoadBalancer: corev1.LoadBalancerStatus{
|
||||
Ingress: []corev1.LoadBalancerIngress{
|
||||
{
|
||||
IP: "192.168.1.100",
|
||||
Hostname: "lb.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: []string{"cluster-manager-grpc-server.test.svc", "192.168.1.100", "lb.example.com"},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "loadbalancer endpoint - service not found",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
existingObjects: []runtime.Object{},
|
||||
expectedResult: []string{"cluster-manager-grpc-server.test.svc"},
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "loadbalancer endpoint - no ingress in status",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
namespace: "test",
|
||||
existingObjects: []runtime.Object{
|
||||
&corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager-grpc-server",
|
||||
Namespace: "test",
|
||||
},
|
||||
Status: corev1.ServiceStatus{
|
||||
LoadBalancer: corev1.LoadBalancerStatus{
|
||||
Ingress: []corev1.LoadBalancerIngress{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedResult: []string{"cluster-manager-grpc-server.test.svc"},
|
||||
expectError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
hostnames := GRPCServerHostNames(tc.namespace, tc.cm)
|
||||
if !reflect.DeepEqual(hostnames, tc.desiredResult) {
|
||||
t.Errorf("Name: %s, expect hostnames %v, but got %v", tc.name, tc.desiredResult, hostnames)
|
||||
fakeKubeClient := fakekube.NewSimpleClientset(tc.existingObjects...)
|
||||
hostnames, err := GRPCServerHostNames(fakeKubeClient, tc.namespace, tc.cm)
|
||||
|
||||
if tc.expectError && err == nil {
|
||||
t.Errorf("expected error but got none")
|
||||
}
|
||||
if !tc.expectError && err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(hostnames, tc.expectedResult) {
|
||||
t.Errorf("expected hostnames %v, but got %v", tc.expectedResult, hostnames)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGRPCServerEndpointType(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
cm *operatorapiv1.ClusterManager
|
||||
expectedType string
|
||||
}{
|
||||
{
|
||||
name: "nil server configuration",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: nil,
|
||||
},
|
||||
},
|
||||
expectedType: string(operatorapiv1.EndpointTypeHostname),
|
||||
},
|
||||
{
|
||||
name: "empty endpoints exposure",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedType: string(operatorapiv1.EndpointTypeHostname),
|
||||
},
|
||||
{
|
||||
name: "non-grpc endpoint",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: "http",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedType: string(operatorapiv1.EndpointTypeHostname),
|
||||
},
|
||||
{
|
||||
name: "grpc endpoint with nil GRPC config",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: nil,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedType: string(operatorapiv1.EndpointTypeHostname),
|
||||
},
|
||||
{
|
||||
name: "grpc endpoint with hostname type",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeHostname,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedType: string(operatorapiv1.EndpointTypeHostname),
|
||||
},
|
||||
{
|
||||
name: "grpc endpoint with loadBalancer type",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedType: string(operatorapiv1.EndpointTypeLoadBalancer),
|
||||
},
|
||||
{
|
||||
name: "multiple endpoints with grpc as second",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: "http",
|
||||
},
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedType: string(operatorapiv1.EndpointTypeLoadBalancer),
|
||||
},
|
||||
{
|
||||
name: "grpc endpoint with nil hostname config",
|
||||
cm: &operatorapiv1.ClusterManager{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-manager",
|
||||
},
|
||||
Spec: operatorapiv1.ClusterManagerSpec{
|
||||
ServerConfiguration: &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeHostname,
|
||||
Hostname: nil,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedType: string(operatorapiv1.EndpointTypeHostname),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
endpointType := GRPCServerEndpointType(tc.cm)
|
||||
if endpointType != tc.expectedType {
|
||||
t.Errorf("expected endpoint type %s, but got %s", tc.expectedType, endpointType)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package certrotationcontroller
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"github.com/openshift/library-go/pkg/controller/factory"
|
||||
@@ -57,7 +58,7 @@ type certRotationController struct {
|
||||
type rotations struct {
|
||||
signingRotation certrotation.SigningRotation
|
||||
caBundleRotation certrotation.CABundleRotation
|
||||
targetRotations []certrotation.TargetRotation
|
||||
targetRotations map[string]certrotation.TargetRotation
|
||||
}
|
||||
|
||||
func NewCertRotationController(
|
||||
@@ -186,9 +187,8 @@ func (c certRotationController) syncOne(ctx context.Context, clustermanager *ope
|
||||
|
||||
// delete the grpc serving secret if the grpc auth is disabled
|
||||
if !helpers.GRPCAuthEnabled(clustermanager) {
|
||||
if rotations, ok := c.rotationMap[clustermanager.Name]; ok {
|
||||
rotations.targetRotations = removeRotation(rotations.targetRotations, helpers.GRPCServerSecret)
|
||||
c.rotationMap[clustermanager.Name] = rotations
|
||||
if _, ok := c.rotationMap[clustermanager.Name]; ok {
|
||||
delete(c.rotationMap[clustermanager.Name].targetRotations, helpers.GRPCServerSecret)
|
||||
}
|
||||
|
||||
err = c.kubeClient.CoreV1().Secrets(clustermanagerNamespace).Delete(ctx, helpers.GRPCServerSecret, metav1.DeleteOptions{})
|
||||
@@ -213,8 +213,8 @@ func (c certRotationController) syncOne(ctx context.Context, clustermanager *ope
|
||||
Lister: c.configMapInformer.Lister(),
|
||||
Client: c.kubeClient.CoreV1(),
|
||||
}
|
||||
targetRotations := []certrotation.TargetRotation{
|
||||
{
|
||||
targetRotations := map[string]certrotation.TargetRotation{
|
||||
helpers.RegistrationWebhookSecret: {
|
||||
Namespace: clustermanagerNamespace,
|
||||
Name: helpers.RegistrationWebhookSecret,
|
||||
Validity: TargetCertValidity,
|
||||
@@ -222,7 +222,7 @@ func (c certRotationController) syncOne(ctx context.Context, clustermanager *ope
|
||||
Lister: c.secretInformers[helpers.RegistrationWebhookSecret].Lister(),
|
||||
Client: c.kubeClient.CoreV1(),
|
||||
},
|
||||
{
|
||||
helpers.WorkWebhookSecret: {
|
||||
Namespace: clustermanagerNamespace,
|
||||
Name: helpers.WorkWebhookSecret,
|
||||
Validity: TargetCertValidity,
|
||||
@@ -239,38 +239,49 @@ func (c certRotationController) syncOne(ctx context.Context, clustermanager *ope
|
||||
}
|
||||
}
|
||||
|
||||
var errs []error
|
||||
// Ensure certificates are exists
|
||||
rotations := c.rotationMap[clustermanagerName]
|
||||
cmRotations := c.rotationMap[clustermanagerName]
|
||||
|
||||
if helpers.GRPCAuthEnabled(clustermanager) && !hasRotation(rotations.targetRotations, helpers.GRPCServerSecret) {
|
||||
if helpers.GRPCAuthEnabled(clustermanager) {
|
||||
// maintain the grpc serving certs
|
||||
// TODO may support user provided certs
|
||||
rotations.targetRotations = append(rotations.targetRotations, certrotation.TargetRotation{
|
||||
Namespace: clustermanagerNamespace,
|
||||
Name: helpers.GRPCServerSecret,
|
||||
Validity: TargetCertValidity,
|
||||
HostNames: helpers.GRPCServerHostNames(clustermanagerNamespace, clustermanager),
|
||||
Lister: c.secretInformers[helpers.GRPCServerSecret].Lister(),
|
||||
Client: c.kubeClient.CoreV1(),
|
||||
})
|
||||
c.rotationMap[clustermanagerName] = rotations
|
||||
hostNames, grpcErr := helpers.GRPCServerHostNames(c.kubeClient, clustermanagerNamespace, clustermanager)
|
||||
if grpcErr != nil {
|
||||
errs = append(errs, grpcErr)
|
||||
} else if targetRotation, ok := cmRotations.targetRotations[helpers.GRPCServerSecret]; ok {
|
||||
if !slices.Equal(targetRotation.HostNames, hostNames) {
|
||||
targetRotation.HostNames = hostNames
|
||||
cmRotations.targetRotations[helpers.GRPCServerSecret] = targetRotation
|
||||
klog.Warningf("the hosts of grpc server are changed, will update the grpc serving cert")
|
||||
}
|
||||
|
||||
} else {
|
||||
c.rotationMap[clustermanagerName].targetRotations[helpers.GRPCServerSecret] = certrotation.TargetRotation{
|
||||
Namespace: clustermanagerNamespace,
|
||||
Name: helpers.GRPCServerSecret,
|
||||
Validity: TargetCertValidity,
|
||||
HostNames: hostNames,
|
||||
Lister: c.secretInformers[helpers.GRPCServerSecret].Lister(),
|
||||
Client: c.kubeClient.CoreV1(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reconcile cert/key pair for signer
|
||||
signingCertKeyPair, err := rotations.signingRotation.EnsureSigningCertKeyPair()
|
||||
signingCertKeyPair, err := cmRotations.signingRotation.EnsureSigningCertKeyPair()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// reconcile ca bundle
|
||||
cabundleCerts, err := rotations.caBundleRotation.EnsureConfigMapCABundle(signingCertKeyPair)
|
||||
cabundleCerts, err := cmRotations.caBundleRotation.EnsureConfigMapCABundle(signingCertKeyPair)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// reconcile target cert/key pairs
|
||||
var errs []error
|
||||
for _, targetRotation := range rotations.targetRotations {
|
||||
for _, targetRotation := range cmRotations.targetRotations {
|
||||
if err := targetRotation.EnsureTargetCertKeyPair(signingCertKeyPair, cabundleCerts); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
@@ -278,22 +289,3 @@ func (c certRotationController) syncOne(ctx context.Context, clustermanager *ope
|
||||
|
||||
return errorhelpers.NewMultiLineAggregate(errs)
|
||||
}
|
||||
|
||||
func hasRotation(targetRotations []certrotation.TargetRotation, name string) bool {
|
||||
for _, rotation := range targetRotations {
|
||||
if rotation.Name == name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func removeRotation(targetRotations []certrotation.TargetRotation, name string) []certrotation.TargetRotation {
|
||||
rs := []certrotation.TargetRotation{}
|
||||
for _, rotation := range targetRotations {
|
||||
if rotation.Name != name {
|
||||
rs = append(rs, rotation)
|
||||
}
|
||||
}
|
||||
return rs
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@ package certrotationcontroller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -20,6 +22,7 @@ import (
|
||||
fakeoperatorclient "open-cluster-management.io/api/client/operator/clientset/versioned/fake"
|
||||
operatorinformers "open-cluster-management.io/api/client/operator/informers/externalversions"
|
||||
operatorapiv1 "open-cluster-management.io/api/operator/v1"
|
||||
"open-cluster-management.io/sdk-go/pkg/certrotation"
|
||||
|
||||
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
|
||||
"open-cluster-management.io/ocm/pkg/operator/helpers"
|
||||
@@ -236,18 +239,26 @@ func TestCertRotationGRPCAuth(t *testing.T) {
|
||||
},
|
||||
validate: func(t *testing.T, kubeClient kubernetes.Interface, controller *certRotationController) {
|
||||
// Check that GRPC server secret was created after update
|
||||
_, err := kubeClient.CoreV1().Secrets(namespace).Get(context.Background(), helpers.GRPCServerSecret, metav1.GetOptions{})
|
||||
secret, err := kubeClient.CoreV1().Secrets(namespace).Get(context.Background(), helpers.GRPCServerSecret, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("expected grpc server secret to be created after update, but got error: %v", err)
|
||||
}
|
||||
|
||||
// Verify the secret has the expected certificate fields
|
||||
if _, ok := secret.Data["tls.crt"]; !ok {
|
||||
t.Fatalf("expected tls.crt in secret data")
|
||||
}
|
||||
if _, ok := secret.Data["tls.key"]; !ok {
|
||||
t.Fatalf("expected tls.key in secret data")
|
||||
}
|
||||
|
||||
// Check that rotation was added to the map
|
||||
rotations, ok := controller.rotationMap[testClusterManagerNameDefault]
|
||||
cmRotations, ok := controller.rotationMap[testClusterManagerNameDefault]
|
||||
if !ok {
|
||||
t.Fatalf("expected rotations to exist in map")
|
||||
}
|
||||
if !hasRotation(rotations.targetRotations, helpers.GRPCServerSecret) {
|
||||
t.Fatalf("expected grpc server rotation to be added after update, %v", rotations)
|
||||
if _, ok := cmRotations.targetRotations[helpers.GRPCServerSecret]; !ok {
|
||||
t.Fatalf("expected grpc server rotation to be added after update, %v", cmRotations)
|
||||
}
|
||||
},
|
||||
},
|
||||
@@ -282,9 +293,12 @@ func TestCertRotationGRPCAuth(t *testing.T) {
|
||||
}
|
||||
|
||||
// Check that rotation was removed from the map
|
||||
rotations, ok := controller.rotationMap[testClusterManagerNameDefault]
|
||||
if ok && hasRotation(rotations.targetRotations, helpers.GRPCServerSecret) {
|
||||
t.Fatalf("expected GRPC server rotation to be removed after update")
|
||||
cmRotations, ok := controller.rotationMap[testClusterManagerNameDefault]
|
||||
if !ok {
|
||||
t.Fatalf("expected rotations to exist in map")
|
||||
}
|
||||
if _, ok := cmRotations.targetRotations[helpers.GRPCServerSecret]; ok {
|
||||
t.Fatalf("expected grpc server rotation to be removed after update, %v", cmRotations)
|
||||
}
|
||||
},
|
||||
},
|
||||
@@ -348,6 +362,362 @@ func TestCertRotationGRPCAuth(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCertRotationGRPCServerHostNames(t *testing.T) {
|
||||
namespace := helpers.ClusterManagerNamespace(testClusterManagerNameDefault, operatorapiv1.InstallModeDefault)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
clusterManager *operatorapiv1.ClusterManager
|
||||
existingObjects []runtime.Object
|
||||
validate func(t *testing.T, kubeClient kubernetes.Interface, controller *certRotationController)
|
||||
expectedErrorSubstr string
|
||||
}{
|
||||
{
|
||||
name: "GRPC with LoadBalancer endpoint type and IP",
|
||||
clusterManager: func() *operatorapiv1.ClusterManager {
|
||||
cm := newClusterManager(testClusterManagerNameDefault, operatorapiv1.InstallModeDefault)
|
||||
cm.Spec.ServerConfiguration = &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return cm
|
||||
}(),
|
||||
existingObjects: []runtime.Object{
|
||||
&corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: namespace,
|
||||
},
|
||||
},
|
||||
&corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: testClusterManagerNameDefault + "-grpc-server",
|
||||
Namespace: namespace,
|
||||
},
|
||||
Status: corev1.ServiceStatus{
|
||||
LoadBalancer: corev1.LoadBalancerStatus{
|
||||
Ingress: []corev1.LoadBalancerIngress{
|
||||
{
|
||||
IP: "192.168.1.100",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
validate: func(t *testing.T, kubeClient kubernetes.Interface, controller *certRotationController) {
|
||||
// Check that the GRPC rotation was added with LoadBalancer IP in hostnames
|
||||
rotations, ok := controller.rotationMap[testClusterManagerNameDefault]
|
||||
if !ok {
|
||||
t.Fatalf("expected rotations to exist in map")
|
||||
}
|
||||
|
||||
var grpcRotation *certrotation.TargetRotation
|
||||
for _, rotation := range rotations.targetRotations {
|
||||
if rotation.Name == helpers.GRPCServerSecret {
|
||||
grpcRotation = &rotation
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if grpcRotation == nil {
|
||||
t.Fatalf("expected GRPC rotation to exist")
|
||||
}
|
||||
|
||||
// Should have default service hostname and LoadBalancer IP
|
||||
expectedHostnames := []string{
|
||||
fmt.Sprintf("%s-grpc-server.%s.svc", testClusterManagerNameDefault, namespace),
|
||||
"192.168.1.100",
|
||||
}
|
||||
if len(grpcRotation.HostNames) != len(expectedHostnames) {
|
||||
t.Fatalf("expected %d hostnames, got %d", len(expectedHostnames), len(grpcRotation.HostNames))
|
||||
}
|
||||
for i, hostname := range expectedHostnames {
|
||||
if grpcRotation.HostNames[i] != hostname {
|
||||
t.Errorf("expected hostname[%d] to be %s, got %s", i, hostname, grpcRotation.HostNames[i])
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "GRPC with LoadBalancer endpoint type and Hostname",
|
||||
clusterManager: func() *operatorapiv1.ClusterManager {
|
||||
cm := newClusterManager(testClusterManagerNameDefault, operatorapiv1.InstallModeDefault)
|
||||
cm.Spec.ServerConfiguration = &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return cm
|
||||
}(),
|
||||
existingObjects: []runtime.Object{
|
||||
&corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: namespace,
|
||||
},
|
||||
},
|
||||
&corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: testClusterManagerNameDefault + "-grpc-server",
|
||||
Namespace: namespace,
|
||||
},
|
||||
Status: corev1.ServiceStatus{
|
||||
LoadBalancer: corev1.LoadBalancerStatus{
|
||||
Ingress: []corev1.LoadBalancerIngress{
|
||||
{
|
||||
Hostname: "grpc.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
validate: func(t *testing.T, kubeClient kubernetes.Interface, controller *certRotationController) {
|
||||
rotations, ok := controller.rotationMap[testClusterManagerNameDefault]
|
||||
if !ok {
|
||||
t.Fatalf("expected rotations to exist in map")
|
||||
}
|
||||
|
||||
var grpcRotation *certrotation.TargetRotation
|
||||
for _, rotation := range rotations.targetRotations {
|
||||
if rotation.Name == helpers.GRPCServerSecret {
|
||||
grpcRotation = &rotation
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if grpcRotation == nil {
|
||||
t.Fatalf("expected GRPC rotation to exist")
|
||||
}
|
||||
|
||||
// Should have default service hostname and LoadBalancer hostname
|
||||
expectedHostnames := []string{
|
||||
fmt.Sprintf("%s-grpc-server.%s.svc", testClusterManagerNameDefault, namespace),
|
||||
"grpc.example.com",
|
||||
}
|
||||
if len(grpcRotation.HostNames) != len(expectedHostnames) {
|
||||
t.Fatalf("expected %d hostnames, got %d", len(expectedHostnames), len(grpcRotation.HostNames))
|
||||
}
|
||||
for i, hostname := range expectedHostnames {
|
||||
if grpcRotation.HostNames[i] != hostname {
|
||||
t.Errorf("expected hostname[%d] to be %s, got %s", i, hostname, grpcRotation.HostNames[i])
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "GRPC with Hostname endpoint type",
|
||||
clusterManager: func() *operatorapiv1.ClusterManager {
|
||||
cm := newClusterManager(testClusterManagerNameDefault, operatorapiv1.InstallModeDefault)
|
||||
cm.Spec.ServerConfiguration = &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeHostname,
|
||||
Hostname: &operatorapiv1.HostnameConfig{
|
||||
Host: "custom.grpc.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return cm
|
||||
}(),
|
||||
existingObjects: []runtime.Object{
|
||||
&corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: namespace,
|
||||
},
|
||||
},
|
||||
},
|
||||
validate: func(t *testing.T, kubeClient kubernetes.Interface, controller *certRotationController) {
|
||||
rotations, ok := controller.rotationMap[testClusterManagerNameDefault]
|
||||
if !ok {
|
||||
t.Fatalf("expected rotations to exist in map")
|
||||
}
|
||||
|
||||
var grpcRotation *certrotation.TargetRotation
|
||||
for _, rotation := range rotations.targetRotations {
|
||||
if rotation.Name == helpers.GRPCServerSecret {
|
||||
grpcRotation = &rotation
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if grpcRotation == nil {
|
||||
t.Fatalf("expected GRPC rotation to exist")
|
||||
}
|
||||
|
||||
// Should have default service hostname and custom hostname
|
||||
expectedHostnames := []string{
|
||||
fmt.Sprintf("%s-grpc-server.%s.svc", testClusterManagerNameDefault, namespace),
|
||||
"custom.grpc.example.com",
|
||||
}
|
||||
if len(grpcRotation.HostNames) != len(expectedHostnames) {
|
||||
t.Fatalf("expected %d hostnames, got %d", len(expectedHostnames), len(grpcRotation.HostNames))
|
||||
}
|
||||
for i, hostname := range expectedHostnames {
|
||||
if grpcRotation.HostNames[i] != hostname {
|
||||
t.Errorf("expected hostname[%d] to be %s, got %s", i, hostname, grpcRotation.HostNames[i])
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "GRPC with loadBalancer but service not found",
|
||||
clusterManager: func() *operatorapiv1.ClusterManager {
|
||||
cm := newClusterManager(testClusterManagerNameDefault, operatorapiv1.InstallModeDefault)
|
||||
cm.Spec.ServerConfiguration = &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return cm
|
||||
}(),
|
||||
existingObjects: []runtime.Object{
|
||||
&corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: namespace,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedErrorSubstr: "failed to find service",
|
||||
validate: func(t *testing.T, kubeClient kubernetes.Interface, controller *certRotationController) {
|
||||
// Service not found error should prevent GRPC rotation from being added
|
||||
cmRotations, ok := controller.rotationMap[testClusterManagerNameDefault]
|
||||
if !ok {
|
||||
t.Fatalf("expected rotations to exist in map")
|
||||
}
|
||||
|
||||
// GRPC rotation should not be added due to error
|
||||
if _, ok := cmRotations.targetRotations[helpers.GRPCServerSecret]; ok {
|
||||
t.Fatalf("expected GRPC rotation not to be added due to service not found error")
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "GRPC with LoadBalancer but no ingress status",
|
||||
clusterManager: func() *operatorapiv1.ClusterManager {
|
||||
cm := newClusterManager(testClusterManagerNameDefault, operatorapiv1.InstallModeDefault)
|
||||
cm.Spec.ServerConfiguration = &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: operatorapiv1.GRPCAuthType,
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return cm
|
||||
}(),
|
||||
existingObjects: []runtime.Object{
|
||||
&corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: namespace,
|
||||
},
|
||||
},
|
||||
&corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: testClusterManagerNameDefault + "-grpc-server",
|
||||
Namespace: namespace,
|
||||
},
|
||||
Status: corev1.ServiceStatus{
|
||||
LoadBalancer: corev1.LoadBalancerStatus{
|
||||
Ingress: []corev1.LoadBalancerIngress{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedErrorSubstr: "failed to find ingress",
|
||||
validate: func(t *testing.T, kubeClient kubernetes.Interface, controller *certRotationController) {
|
||||
// No ingress status error should prevent GRPC rotation from being added
|
||||
cmRotations, ok := controller.rotationMap[testClusterManagerNameDefault]
|
||||
if !ok {
|
||||
t.Fatalf("expected rotations to exist in map")
|
||||
}
|
||||
|
||||
// GRPC rotation should not be added due to error
|
||||
if _, ok := cmRotations.targetRotations[helpers.GRPCServerSecret]; ok {
|
||||
t.Fatalf("expected GRPC rotation not to be added due to no ingress status error")
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
kubeClient := fakekube.NewSimpleClientset(c.existingObjects...)
|
||||
|
||||
newOnTermInformer := func(name string) kubeinformers.SharedInformerFactory {
|
||||
return kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 5*time.Minute,
|
||||
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
|
||||
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", name).String()
|
||||
}))
|
||||
}
|
||||
|
||||
secretInformers := map[string]corev1informers.SecretInformer{
|
||||
helpers.SignerSecret: newOnTermInformer(helpers.SignerSecret).Core().V1().Secrets(),
|
||||
helpers.RegistrationWebhookSecret: newOnTermInformer(helpers.RegistrationWebhookSecret).Core().V1().Secrets(),
|
||||
helpers.WorkWebhookSecret: newOnTermInformer(helpers.WorkWebhookSecret).Core().V1().Secrets(),
|
||||
helpers.GRPCServerSecret: newOnTermInformer(helpers.GRPCServerSecret).Core().V1().Secrets(),
|
||||
}
|
||||
|
||||
configmapInformer := newOnTermInformer(helpers.CaBundleConfigmap).Core().V1().ConfigMaps()
|
||||
|
||||
operatorClient := fakeoperatorclient.NewSimpleClientset(c.clusterManager)
|
||||
operatorInformers := operatorinformers.NewSharedInformerFactory(operatorClient, 5*time.Minute)
|
||||
clusterManagerStore := operatorInformers.Operator().V1().ClusterManagers().Informer().GetStore()
|
||||
if err := clusterManagerStore.Add(c.clusterManager); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
syncContext := testingcommon.NewFakeSyncContext(t, testClusterManagerNameDefault)
|
||||
recorder := syncContext.Recorder()
|
||||
|
||||
controller := &certRotationController{
|
||||
rotationMap: make(map[string]rotations),
|
||||
kubeClient: kubeClient,
|
||||
secretInformers: secretInformers,
|
||||
configMapInformer: configmapInformer,
|
||||
recorder: recorder,
|
||||
clusterManagerLister: operatorInformers.Operator().V1().ClusterManagers().Lister(),
|
||||
}
|
||||
|
||||
// Sync the controller
|
||||
err := controller.sync(context.TODO(), syncContext)
|
||||
|
||||
// Check if we expect an error
|
||||
if c.expectedErrorSubstr != "" {
|
||||
if err == nil {
|
||||
t.Fatalf("expected error containing %q, but got no error", c.expectedErrorSubstr)
|
||||
}
|
||||
if !strings.Contains(err.Error(), c.expectedErrorSubstr) {
|
||||
t.Fatalf("expected error containing %q, but got: %v", c.expectedErrorSubstr, err)
|
||||
}
|
||||
}
|
||||
|
||||
c.validate(t, kubeClient, controller)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func assertResourcesExistAndValid(t *testing.T, kubeClient kubernetes.Interface, namespace string) {
|
||||
configmap, err := kubeClient.CoreV1().ConfigMaps(namespace).Get(context.Background(), "ca-bundle-configmap", metav1.GetOptions{})
|
||||
if err != nil {
|
||||
|
||||
@@ -230,9 +230,14 @@ func (n *clusterManagerController) sync(ctx context.Context, controllerContext f
|
||||
config.Labels = helpers.GetClusterManagerHubLabels(clusterManager, n.enableSyncLabels)
|
||||
config.LabelsString = helpers.GetRegistrationLabelString(config.Labels)
|
||||
|
||||
// Determine if the gGRPC auth is enabled
|
||||
// Determine if the gRPC auth is enabled
|
||||
config.GRPCAuthEnabled = helpers.GRPCAuthEnabled(clusterManager)
|
||||
|
||||
// Get gRPC endpoint type
|
||||
if config.GRPCAuthEnabled {
|
||||
config.GRPCEndpointType = helpers.GRPCServerEndpointType(clusterManager)
|
||||
}
|
||||
|
||||
// Update finalizer at first
|
||||
if clusterManager.DeletionTimestamp.IsZero() {
|
||||
updated, err := n.patcher.AddFinalizer(ctx, clusterManager, clusterManagerFinalizer)
|
||||
|
||||
@@ -822,6 +822,141 @@ func newSecret(name, namespace string) *corev1.Secret {
|
||||
}
|
||||
}
|
||||
|
||||
// TestGRPCServiceLoadBalancerType tests that the GRPC service is of LoadBalancer type when configured
|
||||
func TestGRPCServiceLoadBalancerType(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
clusterManager *operatorapiv1.ClusterManager
|
||||
expectedServiceType corev1.ServiceType
|
||||
expectedPort int32
|
||||
description string
|
||||
}{
|
||||
{
|
||||
name: "GRPC service with LoadBalancer type",
|
||||
clusterManager: func() *operatorapiv1.ClusterManager {
|
||||
cm := newClusterManager("testhub")
|
||||
cm.Spec.RegistrationConfiguration = &operatorapiv1.RegistrationHubConfiguration{
|
||||
RegistrationDrivers: []operatorapiv1.RegistrationDriverHub{
|
||||
{
|
||||
AuthType: operatorapiv1.GRPCAuthType,
|
||||
},
|
||||
},
|
||||
}
|
||||
cm.Spec.ServerConfiguration = &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: "grpc",
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeLoadBalancer,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return cm
|
||||
}(),
|
||||
expectedServiceType: corev1.ServiceTypeLoadBalancer,
|
||||
expectedPort: 443,
|
||||
description: "GRPC service should be LoadBalancer type when endpoint type is loadBalancer",
|
||||
},
|
||||
{
|
||||
name: "GRPC service with ClusterIP type (hostname endpoint)",
|
||||
clusterManager: func() *operatorapiv1.ClusterManager {
|
||||
cm := newClusterManager("testhub")
|
||||
cm.Spec.RegistrationConfiguration = &operatorapiv1.RegistrationHubConfiguration{
|
||||
RegistrationDrivers: []operatorapiv1.RegistrationDriverHub{
|
||||
{
|
||||
AuthType: operatorapiv1.GRPCAuthType,
|
||||
},
|
||||
},
|
||||
}
|
||||
cm.Spec.ServerConfiguration = &operatorapiv1.ServerConfiguration{
|
||||
EndpointsExposure: []operatorapiv1.EndpointExposure{
|
||||
{
|
||||
Protocol: "grpc",
|
||||
GRPC: &operatorapiv1.Endpoint{
|
||||
Type: operatorapiv1.EndpointTypeHostname,
|
||||
Hostname: &operatorapiv1.HostnameConfig{
|
||||
Host: "grpc.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return cm
|
||||
}(),
|
||||
expectedServiceType: corev1.ServiceTypeClusterIP,
|
||||
expectedPort: 8090,
|
||||
description: "GRPC service should be ClusterIP type when endpoint type is hostname",
|
||||
},
|
||||
{
|
||||
name: "GRPC service with default ClusterIP type (no server configuration)",
|
||||
clusterManager: func() *operatorapiv1.ClusterManager {
|
||||
cm := newClusterManager("testhub")
|
||||
cm.Spec.RegistrationConfiguration = &operatorapiv1.RegistrationHubConfiguration{
|
||||
RegistrationDrivers: []operatorapiv1.RegistrationDriverHub{
|
||||
{
|
||||
AuthType: operatorapiv1.GRPCAuthType,
|
||||
},
|
||||
},
|
||||
}
|
||||
return cm
|
||||
}(),
|
||||
expectedServiceType: corev1.ServiceTypeClusterIP,
|
||||
expectedPort: 8090,
|
||||
description: "GRPC service should be ClusterIP type when no server configuration is specified",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
tc := newTestController(t, test.clusterManager)
|
||||
clusterManagerNamespace := helpers.ClusterManagerNamespace(test.clusterManager.Name, test.clusterManager.Spec.DeployOption.Mode)
|
||||
cd := setDeployment(test.clusterManager.Name, clusterManagerNamespace)
|
||||
setup(t, tc, cd)
|
||||
|
||||
syncContext := testingcommon.NewFakeSyncContext(t, test.clusterManager.Name)
|
||||
|
||||
// Call sync to create resources
|
||||
err := tc.clusterManagerController.sync(ctx, syncContext)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error when sync, %v", err)
|
||||
}
|
||||
|
||||
// Find the GRPC service in the created objects
|
||||
grpcServiceName := test.clusterManager.Name + "-grpc-server"
|
||||
var grpcServiceFound bool
|
||||
var actualServiceType corev1.ServiceType
|
||||
var actualServicePort int32
|
||||
|
||||
kubeActions := append(tc.hubKubeClient.Actions(), tc.managementKubeClient.Actions()...)
|
||||
for _, action := range kubeActions {
|
||||
if action.GetVerb() == createVerb {
|
||||
object := action.(clienttesting.CreateActionImpl).Object
|
||||
if service, ok := object.(*corev1.Service); ok {
|
||||
if service.Name == grpcServiceName && service.Namespace == clusterManagerNamespace {
|
||||
grpcServiceFound = true
|
||||
actualServiceType = service.Spec.Type
|
||||
actualServicePort = service.Spec.Ports[0].Port
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !grpcServiceFound {
|
||||
t.Fatalf("Test %q failed: GRPC service %s not found in namespace %s", test.name, grpcServiceName, clusterManagerNamespace)
|
||||
}
|
||||
|
||||
if actualServiceType != test.expectedServiceType {
|
||||
t.Errorf("Test %q failed: %s. Expected service type %q, but got %q", test.name, test.description, test.expectedServiceType, actualServiceType)
|
||||
}
|
||||
if actualServicePort != test.expectedPort {
|
||||
t.Errorf("Test %q failed: Expected service port %d, but got %d", test.name, test.expectedPort, actualServicePort)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestWorkControllerEnabledByFeatureGates tests that work controller is enabled when specific feature gates are enabled
|
||||
func TestWorkControllerEnabledByFeatureGates(t *testing.T) {
|
||||
tests := []struct {
|
||||
|
||||
Reference in New Issue
Block a user