Files
flagger/pkg/router/skipper.go
Samuel Lang ca14a08f9c Skipper Router Implementation
Router implementation for zalan.do/Skipper Ingress -
An HTTP router and reverse proxy for service composition, including use cases like Kubernetes Ingress

https://github.com/zalando/skipper/

* The concept is to define routes with specific weights via the skipper specific annotation predicate of "zalando.org/backend-weights".
* A new "canary ingress" is created that has higher "weight" thus receiving all traffic, which distributes progressively
* After the canary process is finished, this ingress is disabled via the "False()" annotation predicate to route traffic again back to the apex Ingress.
There are certain Skipper principles which are taken into account:

```
Skipper Principles:
* if only one backend has a weight, only one backend will get 100% traffic
* if two of three or more backends have a weight, only those two should get traffic.
* if two backends don't have any weight, it's undefined and right now they get equal amount of traffic.
* weights can be int or float, but always treated as a ratio.

Implementation:
* apex Ingress is immutable
* new canary Ingress contains two paths for primary and canary service
* canary Ingress manages weights on primary & canary service, hence no traffic to apex service
```
2020-08-17 08:23:38 +02:00

236 lines
8.9 KiB
Go

package router
import (
"context"
"encoding/json"
"fmt"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
)
/*
Skipper Principles:
* if only one backend has a weight, only one backend will get 100% traffic
* if two of three or more backends have a weight, only those two should get traffic.
* if two backends don't have any weight, it's undefined and right now they get equal amount of traffic.
* weights can be int or float, but always treated as a ratio.
Implementation:
* apex Ingress is immutable
* new canary Ingress contains two paths for primary and canary service
* canary Ingress manages weights on primary & canary service, hence no traffic to apex service
*/
const (
skipperpredicateAnnotationKey = "zalando.org/skipper-predicate"
skipperBackendWeightsAnnotationKey = "zalando.org/backend-weights"
canaryPatternf = "%s-canary"
canaryRouteWeight = "Weight(100)"
canaryRouteDisable = "False()"
)
type SkipperRouter struct {
kubeClient kubernetes.Interface
logger *zap.SugaredLogger
}
// Reconcile creates or updates the ingresses
func (skp *SkipperRouter) Reconcile(canary *flaggerv1.Canary) error {
if canary.Spec.IngressRef == nil || canary.Spec.IngressRef.Name == "" {
return fmt.Errorf("ingress selector is empty")
}
apexSvcName, primarySvcName, canarySvcName := canary.GetServiceNames()
apexIngressName, canaryIngressName := skp.getIngressNames(canary.Spec.IngressRef.Name)
// retrieving apex ingress
apexIngress, err := skp.kubeClient.NetworkingV1beta1().Ingresses(canary.Namespace).Get(
context.TODO(), apexIngressName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("apexIngress %s.%s get query error: %w", apexIngressName, canary.Namespace, err)
}
// building the canary ingress from apex
iClone := apexIngress.DeepCopy()
for x := range iClone.Spec.Rules {
rule := &iClone.Spec.Rules[x] // ref not value
for y := range rule.HTTP.Paths {
path := &rule.HTTP.Paths[y] // ref not value
if path.Backend.ServiceName == apexSvcName {
// flipping to primary service
path.Backend.ServiceName = primarySvcName
// adding second canary service
canaryBackend := path.DeepCopy()
canaryBackend.Backend.ServiceName = canarySvcName
rule.HTTP.Paths = append(rule.HTTP.Paths, *canaryBackend)
}
}
}
if apexIngress.DeepCopy() == iClone {
return fmt.Errorf("backend %s not found in ingress %s", apexSvcName, apexIngressName)
}
iClone.Annotations = skp.makeAnnotations(iClone.Annotations, map[string]int{primarySvcName: 100, canarySvcName: 0})
iClone.Name = canaryIngressName
iClone.Namespace = canary.Namespace
iClone.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(canary, schema.GroupVersionKind{
Group: flaggerv1.SchemeGroupVersion.Group,
Version: flaggerv1.SchemeGroupVersion.Version,
Kind: flaggerv1.CanaryKind,
}),
}
// search for existence
canaryIngress, err := skp.kubeClient.NetworkingV1beta1().Ingresses(canary.Namespace).Get(
context.TODO(), canaryIngressName, metav1.GetOptions{})
// new ingress
if errors.IsNotFound(err) {
// Let K8s set this. Otherwise K8s API complains with "resourceVersion should not be set on objects to be created"
iClone.ObjectMeta.ResourceVersion = ""
_, err := skp.kubeClient.NetworkingV1beta1().Ingresses(canary.Namespace).Create(context.TODO(), iClone, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("ingress %s.%s create error: %w", iClone.Name, iClone.Namespace, err)
}
skp.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
Infof("Ingress %s.%s created", iClone.GetName(), canary.Namespace)
return nil
} else if err != nil {
return fmt.Errorf("ingress %s.%s query error: %w", canaryIngressName, canary.Namespace, err)
}
// existant, updating
if cmp.Diff(iClone.Spec, canaryIngress.Spec) != "" {
ingressClone := canaryIngress.DeepCopy()
ingressClone.Spec = iClone.Spec
ingressClone.Annotations = iClone.Annotations
_, err := skp.kubeClient.NetworkingV1beta1().Ingresses(canary.Namespace).Update(context.TODO(), ingressClone, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("ingress %s.%s update error: %w", canaryIngressName, ingressClone.Namespace, err)
}
skp.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
Infof("Ingress %s updated", canaryIngressName)
}
return nil
}
func (skp *SkipperRouter) GetRoutes(canary *flaggerv1.Canary) (primaryWeight, canaryWeight int, mirrored bool, err error) {
_, primarySvcName, canarySvcName := canary.GetServiceNames()
_, canaryIngressName := skp.getIngressNames(canary.Spec.IngressRef.Name)
canaryIngress, err := skp.kubeClient.NetworkingV1beta1().Ingresses(canary.Namespace).Get(context.TODO(), canaryIngressName, metav1.GetOptions{})
if err != nil {
err = fmt.Errorf("ingress %s.%s get query error: %w", canaryIngressName, canary.Namespace, err)
return
}
weights, err := skp.backendWeights(canaryIngress.Annotations)
if err != nil {
err = fmt.Errorf("ingress %s.%s get backendWeights error: %w", canaryIngressName, canary.Namespace, err)
return
}
var ok bool
primaryWeight, ok = weights[primarySvcName]
if !ok {
err = fmt.Errorf("ingress %s.%s could not get weights[primarySvcName]", canaryIngressName, canary.Namespace)
return
}
canaryWeight, ok = weights[canarySvcName]
if !ok {
err = fmt.Errorf("ingress %s.%s could not get weights[canarySvcName]", canaryIngressName, canary.Namespace)
return
}
mirrored = false
skp.logger.With("GetRoutes", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
Debugf("GetRoutes primaryWeight: %d, canaryWeight: %d", primaryWeight, canaryWeight)
return
}
func (skp *SkipperRouter) SetRoutes(canary *flaggerv1.Canary, primaryWeight, canaryWeight int, _ bool) (err error) {
_, primarySvcName, canarySvcName := canary.GetServiceNames()
_, canaryIngressName := skp.getIngressNames(canary.Spec.IngressRef.Name)
canaryIngress, err := skp.kubeClient.NetworkingV1beta1().Ingresses(canary.Namespace).Get(context.TODO(), canaryIngressName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("ingress %s.%s get query error: %w", canaryIngressName, canary.Namespace, err)
}
iClone := canaryIngress.DeepCopy()
// TODO: A/B testing
// Canary
iClone.Annotations = skp.makeAnnotations(iClone.Annotations, map[string]int{
primarySvcName: primaryWeight,
canarySvcName: canaryWeight,
})
// Disable the canary-ingress route after the canary process
if canaryWeight == 0 {
iClone.Annotations[skipperpredicateAnnotationKey] = canaryRouteDisable
}
_, err = skp.kubeClient.NetworkingV1beta1().Ingresses(canary.Namespace).Update(
context.TODO(), iClone, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("ingress %s.%s update error %w", iClone.Name, iClone.Namespace, err)
}
skp.logger.With("SetRoutes", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
Debugf("primaryWeight: %d, canaryWeight: %d", primaryWeight, canaryWeight)
return err
}
func (skp *SkipperRouter) Finalize(canary *flaggerv1.Canary) error {
gracePeriodSeconds := int64(2)
_, canaryIngressName := skp.getIngressNames(canary.Spec.IngressRef.Name)
skp.logger.With("deleteCanaryIngress", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
Debugf("Deleting Canary Ingress: %s", canaryIngressName)
err := skp.kubeClient.NetworkingV1beta1().Ingresses(canary.Namespace).Delete(
context.TODO(), canaryIngressName, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
if err != nil {
return fmt.Errorf("ingress %s.%s unable to remove canary ingress: %w", canaryIngressName, canary.Namespace, err)
}
return nil
}
func (skp *SkipperRouter) makeAnnotations(annotations map[string]string, backendWeights map[string]int) map[string]string {
b, err := json.Marshal(backendWeights)
if err != nil {
skp.logger.Errorf("Skipper:makeAnnotations: unable to marshal backendWeights %w", err)
return annotations
}
annotations[skipperBackendWeightsAnnotationKey] = string(b)
// adding more weight to canary route solves traffic bypassing through apexIngress
annotations[skipperpredicateAnnotationKey] = canaryRouteWeight
return annotations
}
// parse backend-weights annotation if it exists
func (skp *SkipperRouter) backendWeights(annotation map[string]string) (backendWeights map[string]int, err error) {
backends, ok := annotation[skipperBackendWeightsAnnotationKey]
if ok {
err = json.Unmarshal([]byte(backends), &backendWeights)
} else {
err = errors.NewNotFound(schema.GroupResource{Group: "Skipper Canary Ingress", Resource: "Annotation"},
skipperBackendWeightsAnnotationKey)
}
return
}
// getIngressNames returns the primary and canary Kubernetes Ingress names
func (skp *SkipperRouter) getIngressNames(name string) (apexName, canaryName string) {
return name, fmt.Sprintf(canaryPatternf, name)
}