From 2c1d998c435217a1eccc52b30bca55710d7323da Mon Sep 17 00:00:00 2001 From: nmlc Date: Wed, 25 Nov 2020 07:51:49 +0500 Subject: [PATCH] [traefik] Implement router interface --- pkg/router/factory.go | 5 + pkg/router/traefik.go | 194 +++++++++++++++++++++++++++++++++++++ pkg/router/traefik_test.go | 142 +++++++++++++++++++++++++++ 3 files changed, 341 insertions(+) create mode 100644 pkg/router/traefik.go create mode 100644 pkg/router/traefik_test.go diff --git a/pkg/router/factory.go b/pkg/router/factory.go index cd1835b3..2d85aedf 100644 --- a/pkg/router/factory.go +++ b/pkg/router/factory.go @@ -128,6 +128,11 @@ func (factory *Factory) MeshRouter(provider string, labelSelector string) Interf logger: factory.logger, kubeClient: factory.kubeClient, } + case provider == flaggerv1.TraefikProvider: + return &TraefikRouter{ + logger: factory.logger, + traefikClient: factory.meshClient, + } case provider == flaggerv1.KubernetesProvider: return &NopRouter{} default: diff --git a/pkg/router/traefik.go b/pkg/router/traefik.go new file mode 100644 index 00000000..7e57316a --- /dev/null +++ b/pkg/router/traefik.go @@ -0,0 +1,194 @@ +package router + +import ( + "context" + "fmt" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1" + traefikv1alpha1 "github.com/weaveworks/flagger/pkg/apis/traefik/v1alpha1" + clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// TraefikRouter is managing Traefik service +type TraefikRouter struct { + traefikClient clientset.Interface + logger *zap.SugaredLogger +} + +// Reconcile creates or updates the Traefik service +func (tr *TraefikRouter) Reconcile(canary *flaggerv1.Canary) error { + apexName, primaryName, canaryName := canary.GetServiceNames() + + newSpec := traefikv1alpha1.ServiceSpec{ + Weighted: &traefikv1alpha1.WeightedRoundRobin{ + Services: []traefikv1alpha1.Service{ + { + Name: primaryName, + Namespace: canary.Namespace, + Port: canary.Spec.Service.Port, + Weight: 100, + }, + }, + }, + } + + traefikService, err := tr.traefikClient.TraefikV1alpha1().TraefikServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{}) + if errors.IsNotFound(err) { + tsMetadata := canary.Spec.Service.TraefikService + if tsMetadata == nil { + tsMetadata = &flaggerv1.CustomMetadata{} + } + if tsMetadata.Labels == nil { + tsMetadata.Labels = make(map[string]string) + } + if tsMetadata.Annotations == nil { + tsMetadata.Annotations = make(map[string]string) + } + + traefikService = &traefikv1alpha1.TraefikService{ + ObjectMeta: metav1.ObjectMeta{ + Name: apexName, + Namespace: canary.Namespace, + Labels: tsMetadata.Labels, + Annotations: tsMetadata.Annotations, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(canary, schema.GroupVersionKind{ + Group: flaggerv1.SchemeGroupVersion.Group, + Version: flaggerv1.SchemeGroupVersion.Version, + Kind: flaggerv1.CanaryKind, + }), + }, + }, + Spec: newSpec, + } + + _, err = tr.traefikClient.TraefikV1alpha1().TraefikServices(canary.Namespace).Create(context.TODO(), traefikService, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("TraefikService %s.%s create error: %w", apexName, canary.Namespace, err) + } + tr.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)). + Infof("TraefikService %s.%s created", traefikService.GetName(), canary.Namespace) + return nil + } else if err != nil { + return fmt.Errorf("TraefikService %s.%s get query error: %w", apexName, canary.Namespace, err) + } + + // update TraefikService but keep the original service weights + if traefikService != nil { + if len(traefikService.Spec.Weighted.Services) == 2 { + newSpec.Weighted.Services = append( + newSpec.Weighted.Services, + traefikv1alpha1.Service{ + Name: canaryName, + Namespace: canary.Namespace, + Port: canary.Spec.Service.Port, + Weight: 100, + }, + ) + } + + if diff := cmp.Diff( + newSpec, + traefikService.Spec, + cmpopts.IgnoreFields(traefikv1alpha1.Service{}, "Weight"), + ); diff != "" { + + clone := traefikService.DeepCopy() + clone.Spec = newSpec + + _, err = tr.traefikClient.TraefikV1alpha1().TraefikServices(canary.Namespace).Update(context.TODO(), clone, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("TraefikService %s.%s update error: %w", apexName, canary.Namespace, err) + } + tr.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)). + Infof("TraefikService %s.%s updated", traefikService.GetName(), canary.Namespace) + } + } + + return nil +} + +// GetRoutes returns the destinations weight for primary and canary +func (tr *TraefikRouter) GetRoutes(canary *flaggerv1.Canary) ( + primaryWeight int, + canaryWeight int, + mirrored bool, + err error, +) { + apexName, primaryName, _ := canary.GetServiceNames() + + traefikService, err := tr.traefikClient.TraefikV1alpha1().TraefikServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{}) + if err != nil { + err = fmt.Errorf("TraefikService %s.%s query error: %w", apexName, canary.Namespace, err) + return + } + + if len(traefikService.Spec.Weighted.Services) < 1 { + err = fmt.Errorf("TraefikService %s.%s services not found", apexName, canary.Namespace) + return + } + + for _, s := range traefikService.Spec.Weighted.Services { + if s.Name == primaryName { + primaryWeight = int(s.Weight) + canaryWeight = 100 - primaryWeight + return + + } + } + + return +} + +// SetRoutes updates the destinations weight for primary and canary +func (tr *TraefikRouter) SetRoutes( + canary *flaggerv1.Canary, + primaryWeight int, + canaryWeight int, + _ bool, +) error { + apexName, primaryName, canaryName := canary.GetServiceNames() + + if primaryWeight == 0 && canaryWeight == 0 { + return fmt.Errorf("RoutingRule %s.%s update failed: no valid weights", apexName, canary.Namespace) + } + traefikService, err := tr.traefikClient.TraefikV1alpha1().TraefikServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("TraefikService %s.%s query error: %w", apexName, canary.Namespace, err) + } + + services := []traefikv1alpha1.Service{ + { + Name: primaryName, + Namespace: canary.Namespace, + Port: canary.Spec.Service.Port, + Weight: uint(primaryWeight), + }, + } + if canaryWeight > 0 { + services = append(services, traefikv1alpha1.Service{ + Name: canaryName, + Namespace: canary.Namespace, + Port: canary.Spec.Service.Port, + Weight: uint(canaryWeight), + }) + } + + traefikService.Spec.Weighted.Services = services + + _, err = tr.traefikClient.TraefikV1alpha1().TraefikServices(canary.Namespace).Update(context.TODO(), traefikService, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("TraefikService %s.%s update error: %w", apexName, canary.Namespace, err) + } + return nil +} + +func (tr *TraefikRouter) Finalize(_ *flaggerv1.Canary) error { + return nil +} diff --git a/pkg/router/traefik_test.go b/pkg/router/traefik_test.go new file mode 100644 index 00000000..e4c84dbc --- /dev/null +++ b/pkg/router/traefik_test.go @@ -0,0 +1,142 @@ +package router + +import ( + "context" + "testing" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestTraefikRouter_Reconcile(t *testing.T) { + mocks := newFixture(nil) + mocks.canary.Spec.Service.TraefikService = &flaggerv1.CustomMetadata{ + Labels: map[string]string{ + "test": "label", + }, + Annotations: map[string]string{ + "test": "annotation", + }, + } + + router := &TraefikRouter{ + traefikClient: mocks.meshClient, + logger: mocks.logger, + } + + assert.NoError(t, router.Reconcile(mocks.canary)) + ts, err := router.traefikClient.TraefikV1alpha1().TraefikServices("default").Get(context.TODO(), "podinfo", metav1.GetOptions{}) + assert.NoError(t, err) + + services := ts.Spec.Weighted.Services + assert.Len(t, services, 1) + assert.Equal(t, uint(100), services[0].Weight) + + assert.Equal(t, ts.ObjectMeta.Labels, mocks.canary.Spec.Service.TraefikService.Labels) + assert.Equal(t, ts.ObjectMeta.Annotations, mocks.canary.Spec.Service.TraefikService.Annotations) + + for _, tt := range []struct { + name string + primary int + canary int + servicesLen int + }{ + { + name: "should not change weights when canary is progressing", + primary: 60, + canary: 40, + servicesLen: 2, + }, + { + name: "should not change weights when canary isn't progressing", + primary: 100, + canary: 0, + servicesLen: 1, + }, + } { + tt := tt + t.Run(tt.name, func(t *testing.T) { + assert.NoError(t, router.Reconcile(mocks.canary)) + assert.NoError(t, router.SetRoutes(mocks.canary, tt.primary, tt.canary, false)) + assert.NoError(t, router.Reconcile(mocks.canary)) + + ts, err := router.traefikClient.TraefikV1alpha1().TraefikServices("default").Get(context.TODO(), "podinfo", metav1.GetOptions{}) + assert.NoError(t, err) + + services := ts.Spec.Weighted.Services + assert.Len(t, services, tt.servicesLen) + assert.Equal(t, uint(tt.primary), services[0].Weight) + if tt.canary > 0 { + assert.Equal(t, uint(tt.canary), services[1].Weight) + } + }) + + } +} + +func TestTraefikRouter_SetRoutes(t *testing.T) { + mocks := newFixture(nil) + router := &TraefikRouter{ + traefikClient: mocks.meshClient, + logger: mocks.logger, + } + + err := router.Reconcile(mocks.canary) + require.NoError(t, err) + + _, _, _, err = router.GetRoutes(mocks.canary) + require.NoError(t, err) + + for _, tt := range []struct { + name string + primary int + canary int + servicesLen int + }{ + {name: "0%", primary: 100, canary: 0, servicesLen: 1}, + {name: "20%", primary: 80, canary: 20, servicesLen: 2}, + {name: "40%", primary: 60, canary: 40, servicesLen: 2}, + {name: "60%", primary: 40, canary: 60, servicesLen: 2}, + {name: "80%", primary: 20, canary: 80, servicesLen: 2}, + {name: "100%", primary: 0, canary: 100, servicesLen: 2}, + {name: "0% (promote)", primary: 100, canary: 0, servicesLen: 1}, + } { + tt := tt + t.Run(tt.name, func(t *testing.T) { + err = router.SetRoutes(mocks.canary, tt.primary, tt.canary, false) + require.NoError(t, err) + + ts, err := router.traefikClient.TraefikV1alpha1().TraefikServices("default").Get(context.TODO(), "podinfo", metav1.GetOptions{}) + assert.NoError(t, err) + + services := ts.Spec.Weighted.Services + assert.Len(t, services, tt.servicesLen) + assert.Equal(t, uint(tt.primary), services[0].Weight) + if tt.canary > 0 { + assert.Equal(t, uint(tt.canary), services[1].Weight) + } + + }) + } +} + +func TestTraefikRouter_GetRoutes(t *testing.T) { + mocks := newFixture(nil) + router := &TraefikRouter{ + traefikClient: mocks.meshClient, + logger: mocks.logger, + } + + err := router.Reconcile(mocks.canary) + require.NoError(t, err) + + p, c, m, err := router.GetRoutes(mocks.canary) + require.NoError(t, err) + + assert.Equal(t, 100, p) + assert.Equal(t, 0, c) + assert.False(t, m) +}