[traefik] Implement router interface

This commit is contained in:
nmlc
2020-11-25 07:51:49 +05:00
parent a3b9ed126d
commit 2c1d998c43
3 changed files with 341 additions and 0 deletions

View File

@@ -128,6 +128,11 @@ func (factory *Factory) MeshRouter(provider string, labelSelector string) Interf
logger: factory.logger,
kubeClient: factory.kubeClient,
}
case provider == flaggerv1.TraefikProvider:
return &TraefikRouter{
logger: factory.logger,
traefikClient: factory.meshClient,
}
case provider == flaggerv1.KubernetesProvider:
return &NopRouter{}
default:

194
pkg/router/traefik.go Normal file
View File

@@ -0,0 +1,194 @@
package router
import (
"context"
"fmt"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
traefikv1alpha1 "github.com/weaveworks/flagger/pkg/apis/traefik/v1alpha1"
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// TraefikRouter is managing Traefik service
type TraefikRouter struct {
traefikClient clientset.Interface
logger *zap.SugaredLogger
}
// Reconcile creates or updates the Traefik service
func (tr *TraefikRouter) Reconcile(canary *flaggerv1.Canary) error {
apexName, primaryName, canaryName := canary.GetServiceNames()
newSpec := traefikv1alpha1.ServiceSpec{
Weighted: &traefikv1alpha1.WeightedRoundRobin{
Services: []traefikv1alpha1.Service{
{
Name: primaryName,
Namespace: canary.Namespace,
Port: canary.Spec.Service.Port,
Weight: 100,
},
},
},
}
traefikService, err := tr.traefikClient.TraefikV1alpha1().TraefikServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{})
if errors.IsNotFound(err) {
tsMetadata := canary.Spec.Service.TraefikService
if tsMetadata == nil {
tsMetadata = &flaggerv1.CustomMetadata{}
}
if tsMetadata.Labels == nil {
tsMetadata.Labels = make(map[string]string)
}
if tsMetadata.Annotations == nil {
tsMetadata.Annotations = make(map[string]string)
}
traefikService = &traefikv1alpha1.TraefikService{
ObjectMeta: metav1.ObjectMeta{
Name: apexName,
Namespace: canary.Namespace,
Labels: tsMetadata.Labels,
Annotations: tsMetadata.Annotations,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(canary, schema.GroupVersionKind{
Group: flaggerv1.SchemeGroupVersion.Group,
Version: flaggerv1.SchemeGroupVersion.Version,
Kind: flaggerv1.CanaryKind,
}),
},
},
Spec: newSpec,
}
_, err = tr.traefikClient.TraefikV1alpha1().TraefikServices(canary.Namespace).Create(context.TODO(), traefikService, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("TraefikService %s.%s create error: %w", apexName, canary.Namespace, err)
}
tr.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
Infof("TraefikService %s.%s created", traefikService.GetName(), canary.Namespace)
return nil
} else if err != nil {
return fmt.Errorf("TraefikService %s.%s get query error: %w", apexName, canary.Namespace, err)
}
// update TraefikService but keep the original service weights
if traefikService != nil {
if len(traefikService.Spec.Weighted.Services) == 2 {
newSpec.Weighted.Services = append(
newSpec.Weighted.Services,
traefikv1alpha1.Service{
Name: canaryName,
Namespace: canary.Namespace,
Port: canary.Spec.Service.Port,
Weight: 100,
},
)
}
if diff := cmp.Diff(
newSpec,
traefikService.Spec,
cmpopts.IgnoreFields(traefikv1alpha1.Service{}, "Weight"),
); diff != "" {
clone := traefikService.DeepCopy()
clone.Spec = newSpec
_, err = tr.traefikClient.TraefikV1alpha1().TraefikServices(canary.Namespace).Update(context.TODO(), clone, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("TraefikService %s.%s update error: %w", apexName, canary.Namespace, err)
}
tr.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
Infof("TraefikService %s.%s updated", traefikService.GetName(), canary.Namespace)
}
}
return nil
}
// GetRoutes returns the destinations weight for primary and canary
func (tr *TraefikRouter) GetRoutes(canary *flaggerv1.Canary) (
primaryWeight int,
canaryWeight int,
mirrored bool,
err error,
) {
apexName, primaryName, _ := canary.GetServiceNames()
traefikService, err := tr.traefikClient.TraefikV1alpha1().TraefikServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{})
if err != nil {
err = fmt.Errorf("TraefikService %s.%s query error: %w", apexName, canary.Namespace, err)
return
}
if len(traefikService.Spec.Weighted.Services) < 1 {
err = fmt.Errorf("TraefikService %s.%s services not found", apexName, canary.Namespace)
return
}
for _, s := range traefikService.Spec.Weighted.Services {
if s.Name == primaryName {
primaryWeight = int(s.Weight)
canaryWeight = 100 - primaryWeight
return
}
}
return
}
// SetRoutes updates the destinations weight for primary and canary
func (tr *TraefikRouter) SetRoutes(
canary *flaggerv1.Canary,
primaryWeight int,
canaryWeight int,
_ bool,
) error {
apexName, primaryName, canaryName := canary.GetServiceNames()
if primaryWeight == 0 && canaryWeight == 0 {
return fmt.Errorf("RoutingRule %s.%s update failed: no valid weights", apexName, canary.Namespace)
}
traefikService, err := tr.traefikClient.TraefikV1alpha1().TraefikServices(canary.Namespace).Get(context.TODO(), apexName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("TraefikService %s.%s query error: %w", apexName, canary.Namespace, err)
}
services := []traefikv1alpha1.Service{
{
Name: primaryName,
Namespace: canary.Namespace,
Port: canary.Spec.Service.Port,
Weight: uint(primaryWeight),
},
}
if canaryWeight > 0 {
services = append(services, traefikv1alpha1.Service{
Name: canaryName,
Namespace: canary.Namespace,
Port: canary.Spec.Service.Port,
Weight: uint(canaryWeight),
})
}
traefikService.Spec.Weighted.Services = services
_, err = tr.traefikClient.TraefikV1alpha1().TraefikServices(canary.Namespace).Update(context.TODO(), traefikService, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("TraefikService %s.%s update error: %w", apexName, canary.Namespace, err)
}
return nil
}
func (tr *TraefikRouter) Finalize(_ *flaggerv1.Canary) error {
return nil
}

142
pkg/router/traefik_test.go Normal file
View File

@@ -0,0 +1,142 @@
package router
import (
"context"
"testing"
flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestTraefikRouter_Reconcile(t *testing.T) {
mocks := newFixture(nil)
mocks.canary.Spec.Service.TraefikService = &flaggerv1.CustomMetadata{
Labels: map[string]string{
"test": "label",
},
Annotations: map[string]string{
"test": "annotation",
},
}
router := &TraefikRouter{
traefikClient: mocks.meshClient,
logger: mocks.logger,
}
assert.NoError(t, router.Reconcile(mocks.canary))
ts, err := router.traefikClient.TraefikV1alpha1().TraefikServices("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
assert.NoError(t, err)
services := ts.Spec.Weighted.Services
assert.Len(t, services, 1)
assert.Equal(t, uint(100), services[0].Weight)
assert.Equal(t, ts.ObjectMeta.Labels, mocks.canary.Spec.Service.TraefikService.Labels)
assert.Equal(t, ts.ObjectMeta.Annotations, mocks.canary.Spec.Service.TraefikService.Annotations)
for _, tt := range []struct {
name string
primary int
canary int
servicesLen int
}{
{
name: "should not change weights when canary is progressing",
primary: 60,
canary: 40,
servicesLen: 2,
},
{
name: "should not change weights when canary isn't progressing",
primary: 100,
canary: 0,
servicesLen: 1,
},
} {
tt := tt
t.Run(tt.name, func(t *testing.T) {
assert.NoError(t, router.Reconcile(mocks.canary))
assert.NoError(t, router.SetRoutes(mocks.canary, tt.primary, tt.canary, false))
assert.NoError(t, router.Reconcile(mocks.canary))
ts, err := router.traefikClient.TraefikV1alpha1().TraefikServices("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
assert.NoError(t, err)
services := ts.Spec.Weighted.Services
assert.Len(t, services, tt.servicesLen)
assert.Equal(t, uint(tt.primary), services[0].Weight)
if tt.canary > 0 {
assert.Equal(t, uint(tt.canary), services[1].Weight)
}
})
}
}
func TestTraefikRouter_SetRoutes(t *testing.T) {
mocks := newFixture(nil)
router := &TraefikRouter{
traefikClient: mocks.meshClient,
logger: mocks.logger,
}
err := router.Reconcile(mocks.canary)
require.NoError(t, err)
_, _, _, err = router.GetRoutes(mocks.canary)
require.NoError(t, err)
for _, tt := range []struct {
name string
primary int
canary int
servicesLen int
}{
{name: "0%", primary: 100, canary: 0, servicesLen: 1},
{name: "20%", primary: 80, canary: 20, servicesLen: 2},
{name: "40%", primary: 60, canary: 40, servicesLen: 2},
{name: "60%", primary: 40, canary: 60, servicesLen: 2},
{name: "80%", primary: 20, canary: 80, servicesLen: 2},
{name: "100%", primary: 0, canary: 100, servicesLen: 2},
{name: "0% (promote)", primary: 100, canary: 0, servicesLen: 1},
} {
tt := tt
t.Run(tt.name, func(t *testing.T) {
err = router.SetRoutes(mocks.canary, tt.primary, tt.canary, false)
require.NoError(t, err)
ts, err := router.traefikClient.TraefikV1alpha1().TraefikServices("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
assert.NoError(t, err)
services := ts.Spec.Weighted.Services
assert.Len(t, services, tt.servicesLen)
assert.Equal(t, uint(tt.primary), services[0].Weight)
if tt.canary > 0 {
assert.Equal(t, uint(tt.canary), services[1].Weight)
}
})
}
}
func TestTraefikRouter_GetRoutes(t *testing.T) {
mocks := newFixture(nil)
router := &TraefikRouter{
traefikClient: mocks.meshClient,
logger: mocks.logger,
}
err := router.Reconcile(mocks.canary)
require.NoError(t, err)
p, c, m, err := router.GetRoutes(mocks.canary)
require.NoError(t, err)
assert.Equal(t, 100, p)
assert.Equal(t, 0, c)
assert.False(t, m)
}