mirror of
https://github.com/fluxcd/flagger.git
synced 2026-04-15 06:57:34 +00:00
Merge remote-tracking branch 'upstream/master' into gloo2
This commit is contained in:
@@ -52,6 +52,10 @@ type CanarySpec struct {
|
||||
// +optional
|
||||
AutoscalerRef *hpav1.CrossVersionObjectReference `json:"autoscalerRef,omitempty"`
|
||||
|
||||
// reference to NGINX ingress resource
|
||||
// +optional
|
||||
IngressRef *hpav1.CrossVersionObjectReference `json:"ingressRef,omitempty"`
|
||||
|
||||
// virtual service spec
|
||||
Service CanaryService `json:"service"`
|
||||
|
||||
|
||||
@@ -205,6 +205,11 @@ func (in *CanarySpec) DeepCopyInto(out *CanarySpec) {
|
||||
*out = new(v1.CrossVersionObjectReference)
|
||||
**out = **in
|
||||
}
|
||||
if in.IngressRef != nil {
|
||||
in, out := &in.IngressRef, &out.IngressRef
|
||||
*out = new(v1.CrossVersionObjectReference)
|
||||
**out = **in
|
||||
}
|
||||
in.Service.DeepCopyInto(&out.Service)
|
||||
in.CanaryAnalysis.DeepCopyInto(&out.CanaryAnalysis)
|
||||
if in.ProgressDeadlineSeconds != nil {
|
||||
|
||||
@@ -632,6 +632,41 @@ func (c *Controller) analyseCanary(r *flaggerv1.Canary) bool {
|
||||
}
|
||||
}
|
||||
|
||||
// NGINX checks
|
||||
if c.meshProvider == "nginx" {
|
||||
if metric.Name == "request-success-rate" {
|
||||
val, err := c.observer.GetNginxSuccessRate(r.Spec.IngressRef.Name, r.Namespace, metric.Name, metric.Interval)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "no values found") {
|
||||
c.recordEventWarningf(r, "Halt advancement no values found for metric %s probably %s.%s is not receiving traffic",
|
||||
metric.Name, r.Spec.TargetRef.Name, r.Namespace)
|
||||
} else {
|
||||
c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.GetMetricsServer(), err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
if float64(metric.Threshold) > val {
|
||||
c.recordEventWarningf(r, "Halt %s.%s advancement success rate %.2f%% < %v%%",
|
||||
r.Name, r.Namespace, val, metric.Threshold)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if metric.Name == "request-duration" {
|
||||
val, err := c.observer.GetNginxRequestDuration(r.Spec.IngressRef.Name, r.Namespace, metric.Name, metric.Interval)
|
||||
if err != nil {
|
||||
c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.GetMetricsServer(), err)
|
||||
return false
|
||||
}
|
||||
t := time.Duration(metric.Threshold) * time.Millisecond
|
||||
if val > t {
|
||||
c.recordEventWarningf(r, "Halt %s.%s advancement request duration %v > %v",
|
||||
r.Name, r.Namespace, val, t)
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// custom checks
|
||||
if metric.Query != "" {
|
||||
val, err := c.observer.GetScalar(metric.Query)
|
||||
|
||||
122
pkg/metrics/nginx.go
Normal file
122
pkg/metrics/nginx.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
const nginxSuccessRateQuery = `
|
||||
sum(rate(
|
||||
nginx_ingress_controller_requests{namespace="{{ .Namespace }}",
|
||||
ingress="{{ .Name }}",
|
||||
status!~"5.*"}
|
||||
[{{ .Interval }}]))
|
||||
/
|
||||
sum(rate(
|
||||
nginx_ingress_controller_requests{namespace="{{ .Namespace }}",
|
||||
ingress="{{ .Name }}"}
|
||||
[{{ .Interval }}]))
|
||||
* 100
|
||||
`
|
||||
|
||||
// GetNginxSuccessRate returns the requests success rate (non 5xx) using nginx_ingress_controller_requests metric
|
||||
func (c *Observer) GetNginxSuccessRate(name string, namespace string, metric string, interval string) (float64, error) {
|
||||
if c.metricsServer == "fake" {
|
||||
return 100, nil
|
||||
}
|
||||
|
||||
meta := struct {
|
||||
Name string
|
||||
Namespace string
|
||||
Interval string
|
||||
}{
|
||||
name,
|
||||
namespace,
|
||||
interval,
|
||||
}
|
||||
|
||||
query, err := render(meta, nginxSuccessRateQuery)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var rate *float64
|
||||
querySt := url.QueryEscape(query)
|
||||
result, err := c.queryMetric(querySt)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
for _, v := range result.Data.Result {
|
||||
metricValue := v.Value[1]
|
||||
switch metricValue.(type) {
|
||||
case string:
|
||||
f, err := strconv.ParseFloat(metricValue.(string), 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
rate = &f
|
||||
}
|
||||
}
|
||||
if rate == nil {
|
||||
return 0, fmt.Errorf("no values found for metric %s", metric)
|
||||
}
|
||||
return *rate, nil
|
||||
}
|
||||
|
||||
const nginxRequestDurationQuery = `
|
||||
sum(rate(
|
||||
nginx_ingress_controller_ingress_upstream_latency_seconds_sum{namespace="{{ .Namespace }}",
|
||||
ingress="{{ .Name }}"}[{{ .Interval }}]))
|
||||
/
|
||||
sum(rate(nginx_ingress_controller_ingress_upstream_latency_seconds_count{namespace="{{ .Namespace }}",
|
||||
ingress="{{ .Name }}"}[{{ .Interval }}])) * 1000
|
||||
`
|
||||
|
||||
// GetNginxRequestDuration returns the avg requests latency using nginx_ingress_controller_ingress_upstream_latency_seconds_sum metric
|
||||
func (c *Observer) GetNginxRequestDuration(name string, namespace string, metric string, interval string) (time.Duration, error) {
|
||||
if c.metricsServer == "fake" {
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
meta := struct {
|
||||
Name string
|
||||
Namespace string
|
||||
Interval string
|
||||
}{
|
||||
name,
|
||||
namespace,
|
||||
interval,
|
||||
}
|
||||
|
||||
query, err := render(meta, nginxRequestDurationQuery)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var rate *float64
|
||||
querySt := url.QueryEscape(query)
|
||||
result, err := c.queryMetric(querySt)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
for _, v := range result.Data.Result {
|
||||
metricValue := v.Value[1]
|
||||
switch metricValue.(type) {
|
||||
case string:
|
||||
f, err := strconv.ParseFloat(metricValue.(string), 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
rate = &f
|
||||
}
|
||||
}
|
||||
if rate == nil {
|
||||
return 0, fmt.Errorf("no values found for metric %s", metric)
|
||||
}
|
||||
ms := time.Duration(int64(*rate)) * time.Millisecond
|
||||
return ms, nil
|
||||
}
|
||||
51
pkg/metrics/nginx_test.go
Normal file
51
pkg/metrics/nginx_test.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func Test_NginxSuccessRateQueryRender(t *testing.T) {
|
||||
meta := struct {
|
||||
Name string
|
||||
Namespace string
|
||||
Interval string
|
||||
}{
|
||||
"podinfo",
|
||||
"nginx",
|
||||
"1m",
|
||||
}
|
||||
|
||||
query, err := render(meta, nginxSuccessRateQuery)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expected := `sum(rate(nginx_ingress_controller_requests{namespace="nginx",ingress="podinfo",status!~"5.*"}[1m])) / sum(rate(nginx_ingress_controller_requests{namespace="nginx",ingress="podinfo"}[1m])) * 100`
|
||||
|
||||
if query != expected {
|
||||
t.Errorf("\nGot %s \nWanted %s", query, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_NginxRequestDurationQueryRender(t *testing.T) {
|
||||
meta := struct {
|
||||
Name string
|
||||
Namespace string
|
||||
Interval string
|
||||
}{
|
||||
"podinfo",
|
||||
"nginx",
|
||||
"1m",
|
||||
}
|
||||
|
||||
query, err := render(meta, nginxRequestDurationQuery)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
expected := `sum(rate(nginx_ingress_controller_ingress_upstream_latency_seconds_sum{namespace="nginx",ingress="podinfo"}[1m])) /sum(rate(nginx_ingress_controller_ingress_upstream_latency_seconds_count{namespace="nginx",ingress="podinfo"}[1m])) * 1000`
|
||||
|
||||
if query != expected {
|
||||
t.Errorf("\nGot %s \nWanted %s", query, expected)
|
||||
}
|
||||
}
|
||||
@@ -99,7 +99,9 @@ func (c *Observer) GetScalar(query string) (float64, error) {
|
||||
query = strings.Replace(query, " ", "", -1)
|
||||
|
||||
var value *float64
|
||||
result, err := c.queryMetric(query)
|
||||
|
||||
querySt := url.QueryEscape(query)
|
||||
result, err := c.queryMetric(querySt)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@@ -41,9 +41,14 @@ func (factory *Factory) KubernetesRouter(label string) *KubernetesRouter {
|
||||
}
|
||||
}
|
||||
|
||||
// MeshRouter returns a service mesh router (Istio or AppMesh)
|
||||
// MeshRouter returns a service mesh router
|
||||
func (factory *Factory) MeshRouter(provider string) Interface {
|
||||
switch {
|
||||
case provider == "nginx":
|
||||
return &IngressRouter{
|
||||
logger: factory.logger,
|
||||
kubeClient: factory.kubeClient,
|
||||
}
|
||||
case provider == "appmesh":
|
||||
return &AppMeshRouter{
|
||||
logger: factory.logger,
|
||||
|
||||
231
pkg/router/ingress.go
Normal file
231
pkg/router/ingress.go
Normal file
@@ -0,0 +1,231 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3"
|
||||
"go.uber.org/zap"
|
||||
"k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type IngressRouter struct {
|
||||
kubeClient kubernetes.Interface
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func (i *IngressRouter) Reconcile(canary *flaggerv1.Canary) error {
|
||||
if canary.Spec.IngressRef == nil || canary.Spec.IngressRef.Name == "" {
|
||||
return fmt.Errorf("ingress selector is empty")
|
||||
}
|
||||
|
||||
targetName := canary.Spec.TargetRef.Name
|
||||
canaryName := fmt.Sprintf("%s-canary", targetName)
|
||||
canaryIngressName := fmt.Sprintf("%s-canary", canary.Spec.IngressRef.Name)
|
||||
|
||||
ingress, err := i.kubeClient.ExtensionsV1beta1().Ingresses(canary.Namespace).Get(canary.Spec.IngressRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ingressClone := ingress.DeepCopy()
|
||||
|
||||
// change backend to <deployment-name>-canary
|
||||
backendExists := false
|
||||
for k, v := range ingressClone.Spec.Rules {
|
||||
for x, y := range v.HTTP.Paths {
|
||||
if y.Backend.ServiceName == targetName {
|
||||
ingressClone.Spec.Rules[k].HTTP.Paths[x].Backend.ServiceName = canaryName
|
||||
backendExists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !backendExists {
|
||||
return fmt.Errorf("backend %s not found in ingress %s", targetName, canary.Spec.IngressRef.Name)
|
||||
}
|
||||
|
||||
canaryIngress, err := i.kubeClient.ExtensionsV1beta1().Ingresses(canary.Namespace).Get(canaryIngressName, metav1.GetOptions{})
|
||||
|
||||
if errors.IsNotFound(err) {
|
||||
ing := &v1beta1.Ingress{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: canaryIngressName,
|
||||
Namespace: canary.Namespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
*metav1.NewControllerRef(canary, schema.GroupVersionKind{
|
||||
Group: flaggerv1.SchemeGroupVersion.Group,
|
||||
Version: flaggerv1.SchemeGroupVersion.Version,
|
||||
Kind: flaggerv1.CanaryKind,
|
||||
}),
|
||||
},
|
||||
Annotations: i.makeAnnotations(ingressClone.Annotations),
|
||||
Labels: ingressClone.Labels,
|
||||
},
|
||||
Spec: ingressClone.Spec,
|
||||
}
|
||||
|
||||
_, err := i.kubeClient.ExtensionsV1beta1().Ingresses(canary.Namespace).Create(ing)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
i.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
|
||||
Infof("Ingress %s.%s created", ing.GetName(), canary.Namespace)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("ingress %s query error %v", canaryIngressName, err)
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(ingressClone.Spec, canaryIngress.Spec); diff != "" {
|
||||
iClone := canaryIngress.DeepCopy()
|
||||
iClone.Spec = ingressClone.Spec
|
||||
|
||||
_, err := i.kubeClient.ExtensionsV1beta1().Ingresses(canary.Namespace).Update(iClone)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ingress %s update error %v", canaryIngressName, err)
|
||||
}
|
||||
|
||||
i.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
|
||||
Infof("Ingress %s updated", canaryIngressName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *IngressRouter) GetRoutes(canary *flaggerv1.Canary) (
|
||||
primaryWeight int,
|
||||
canaryWeight int,
|
||||
err error,
|
||||
) {
|
||||
canaryIngressName := fmt.Sprintf("%s-canary", canary.Spec.IngressRef.Name)
|
||||
canaryIngress, err := i.kubeClient.ExtensionsV1beta1().Ingresses(canary.Namespace).Get(canaryIngressName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
// A/B testing
|
||||
if len(canary.Spec.CanaryAnalysis.Match) > 0 {
|
||||
for k := range canaryIngress.Annotations {
|
||||
if k == "nginx.ingress.kubernetes.io/canary-by-cookie" || k == "nginx.ingress.kubernetes.io/canary-by-header" {
|
||||
return 0, 100, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Canary
|
||||
for k, v := range canaryIngress.Annotations {
|
||||
if k == "nginx.ingress.kubernetes.io/canary-weight" {
|
||||
val, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
canaryWeight = val
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
primaryWeight = 100 - canaryWeight
|
||||
return
|
||||
}
|
||||
|
||||
func (i *IngressRouter) SetRoutes(
|
||||
canary *flaggerv1.Canary,
|
||||
primaryWeight int,
|
||||
canaryWeight int,
|
||||
) error {
|
||||
canaryIngressName := fmt.Sprintf("%s-canary", canary.Spec.IngressRef.Name)
|
||||
canaryIngress, err := i.kubeClient.ExtensionsV1beta1().Ingresses(canary.Namespace).Get(canaryIngressName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
iClone := canaryIngress.DeepCopy()
|
||||
|
||||
// A/B testing
|
||||
if len(canary.Spec.CanaryAnalysis.Match) > 0 {
|
||||
cookie := ""
|
||||
header := ""
|
||||
headerValue := ""
|
||||
for _, m := range canary.Spec.CanaryAnalysis.Match {
|
||||
for k, v := range m.Headers {
|
||||
if k == "cookie" {
|
||||
cookie = v.Exact
|
||||
} else {
|
||||
header = k
|
||||
headerValue = v.Exact
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
iClone.Annotations = i.makeHeaderAnnotations(iClone.Annotations, header, headerValue, cookie)
|
||||
} else {
|
||||
// canary
|
||||
iClone.Annotations["nginx.ingress.kubernetes.io/canary-weight"] = fmt.Sprintf("%v", canaryWeight)
|
||||
}
|
||||
|
||||
// toggle canary
|
||||
if canaryWeight > 0 {
|
||||
iClone.Annotations["nginx.ingress.kubernetes.io/canary"] = "true"
|
||||
} else {
|
||||
iClone.Annotations = i.makeAnnotations(iClone.Annotations)
|
||||
}
|
||||
|
||||
_, err = i.kubeClient.ExtensionsV1beta1().Ingresses(canary.Namespace).Update(iClone)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ingress %s update error %v", canaryIngressName, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *IngressRouter) makeAnnotations(annotations map[string]string) map[string]string {
|
||||
res := make(map[string]string)
|
||||
for k, v := range annotations {
|
||||
if !strings.Contains(k, "nginx.ingress.kubernetes.io/canary") &&
|
||||
!strings.Contains(k, "kubectl.kubernetes.io/last-applied-configuration") {
|
||||
res[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
res["nginx.ingress.kubernetes.io/canary"] = "false"
|
||||
res["nginx.ingress.kubernetes.io/canary-weight"] = "0"
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func (i *IngressRouter) makeHeaderAnnotations(annotations map[string]string,
|
||||
header string, headerValue string, cookie string) map[string]string {
|
||||
res := make(map[string]string)
|
||||
for k, v := range annotations {
|
||||
if !strings.Contains(v, "nginx.ingress.kubernetes.io/canary") {
|
||||
res[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
res["nginx.ingress.kubernetes.io/canary"] = "true"
|
||||
res["nginx.ingress.kubernetes.io/canary-weight"] = "0"
|
||||
|
||||
if cookie != "" {
|
||||
res["nginx.ingress.kubernetes.io/canary-by-cookie"] = cookie
|
||||
}
|
||||
|
||||
if header != "" {
|
||||
res["nginx.ingress.kubernetes.io/canary-by-header"] = header
|
||||
}
|
||||
|
||||
if headerValue != "" {
|
||||
res["nginx.ingress.kubernetes.io/canary-by-header-value"] = headerValue
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
112
pkg/router/ingress_test.go
Normal file
112
pkg/router/ingress_test.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestIngressRouter_Reconcile(t *testing.T) {
|
||||
mocks := setupfakeClients()
|
||||
router := &IngressRouter{
|
||||
logger: mocks.logger,
|
||||
kubeClient: mocks.kubeClient,
|
||||
}
|
||||
|
||||
err := router.Reconcile(mocks.ingressCanary)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
canaryAn := "nginx.ingress.kubernetes.io/canary"
|
||||
canaryWeightAn := "nginx.ingress.kubernetes.io/canary-weight"
|
||||
|
||||
canaryName := fmt.Sprintf("%s-canary", mocks.ingressCanary.Spec.IngressRef.Name)
|
||||
inCanary, err := router.kubeClient.ExtensionsV1beta1().Ingresses("default").Get(canaryName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
if _, ok := inCanary.Annotations[canaryAn]; !ok {
|
||||
t.Errorf("Canary annotation missing")
|
||||
}
|
||||
|
||||
// test initialisation
|
||||
if inCanary.Annotations[canaryAn] != "false" {
|
||||
t.Errorf("Got canary annotation %v wanted false", inCanary.Annotations[canaryAn])
|
||||
}
|
||||
|
||||
if inCanary.Annotations[canaryWeightAn] != "0" {
|
||||
t.Errorf("Got canary weight annotation %v wanted 0", inCanary.Annotations[canaryWeightAn])
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngressRouter_GetSetRoutes(t *testing.T) {
|
||||
mocks := setupfakeClients()
|
||||
router := &IngressRouter{
|
||||
logger: mocks.logger,
|
||||
kubeClient: mocks.kubeClient,
|
||||
}
|
||||
|
||||
err := router.Reconcile(mocks.ingressCanary)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
p, c, err := router.GetRoutes(mocks.ingressCanary)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
p = 50
|
||||
c = 50
|
||||
|
||||
err = router.SetRoutes(mocks.ingressCanary, p, c)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
canaryAn := "nginx.ingress.kubernetes.io/canary"
|
||||
canaryWeightAn := "nginx.ingress.kubernetes.io/canary-weight"
|
||||
|
||||
canaryName := fmt.Sprintf("%s-canary", mocks.ingressCanary.Spec.IngressRef.Name)
|
||||
inCanary, err := router.kubeClient.ExtensionsV1beta1().Ingresses("default").Get(canaryName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
if _, ok := inCanary.Annotations[canaryAn]; !ok {
|
||||
t.Errorf("Canary annotation missing")
|
||||
}
|
||||
|
||||
// test rollout
|
||||
if inCanary.Annotations[canaryAn] != "true" {
|
||||
t.Errorf("Got canary annotation %v wanted true", inCanary.Annotations[canaryAn])
|
||||
}
|
||||
|
||||
if inCanary.Annotations[canaryWeightAn] != "50" {
|
||||
t.Errorf("Got canary weight annotation %v wanted 50", inCanary.Annotations[canaryWeightAn])
|
||||
}
|
||||
|
||||
p = 100
|
||||
c = 0
|
||||
|
||||
err = router.SetRoutes(mocks.ingressCanary, p, c)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
inCanary, err = router.kubeClient.ExtensionsV1beta1().Ingresses("default").Get(canaryName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
// test promotion
|
||||
if inCanary.Annotations[canaryAn] != "false" {
|
||||
t.Errorf("Got canary annotation %v wanted false", inCanary.Annotations[canaryAn])
|
||||
}
|
||||
|
||||
if inCanary.Annotations[canaryWeightAn] != "0" {
|
||||
t.Errorf("Got canary weight annotation %v wanted 0", inCanary.Annotations[canaryWeightAn])
|
||||
}
|
||||
}
|
||||
@@ -32,7 +32,7 @@ func (ir *IstioRouter) Reconcile(canary *flaggerv1.Canary) error {
|
||||
hosts := canary.Spec.Service.Hosts
|
||||
var hasServiceHost bool
|
||||
for _, h := range hosts {
|
||||
if h == targetName {
|
||||
if h == targetName || h == "*" {
|
||||
hasServiceHost = true
|
||||
break
|
||||
}
|
||||
|
||||
@@ -11,7 +11,9 @@ import (
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
hpav1 "k8s.io/api/autoscaling/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/api/extensions/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
)
|
||||
@@ -20,6 +22,7 @@ type fakeClients struct {
|
||||
canary *v1alpha3.Canary
|
||||
abtest *v1alpha3.Canary
|
||||
appmeshCanary *v1alpha3.Canary
|
||||
ingressCanary *v1alpha3.Canary
|
||||
kubeClient kubernetes.Interface
|
||||
meshClient clientset.Interface
|
||||
flaggerClient clientset.Interface
|
||||
@@ -30,9 +33,10 @@ func setupfakeClients() fakeClients {
|
||||
canary := newMockCanary()
|
||||
abtest := newMockABTest()
|
||||
appmeshCanary := newMockCanaryAppMesh()
|
||||
flaggerClient := fakeFlagger.NewSimpleClientset(canary, abtest, appmeshCanary)
|
||||
ingressCanary := newMockCanaryIngress()
|
||||
flaggerClient := fakeFlagger.NewSimpleClientset(canary, abtest, appmeshCanary, ingressCanary)
|
||||
|
||||
kubeClient := fake.NewSimpleClientset(newMockDeployment(), newMockABTestDeployment())
|
||||
kubeClient := fake.NewSimpleClientset(newMockDeployment(), newMockABTestDeployment(), newMockIngress())
|
||||
|
||||
meshClient := fakeFlagger.NewSimpleClientset()
|
||||
logger, _ := logger.NewLogger("debug")
|
||||
@@ -41,6 +45,7 @@ func setupfakeClients() fakeClients {
|
||||
canary: canary,
|
||||
abtest: abtest,
|
||||
appmeshCanary: appmeshCanary,
|
||||
ingressCanary: ingressCanary,
|
||||
kubeClient: kubeClient,
|
||||
meshClient: meshClient,
|
||||
flaggerClient: flaggerClient,
|
||||
@@ -266,3 +271,73 @@ func newMockABTestDeployment() *appsv1.Deployment {
|
||||
|
||||
return d
|
||||
}
|
||||
|
||||
func newMockCanaryIngress() *v1alpha3.Canary {
|
||||
cd := &v1alpha3.Canary{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: v1alpha3.SchemeGroupVersion.String()},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: "nginx",
|
||||
},
|
||||
Spec: v1alpha3.CanarySpec{
|
||||
TargetRef: hpav1.CrossVersionObjectReference{
|
||||
Name: "podinfo",
|
||||
APIVersion: "apps/v1",
|
||||
Kind: "Deployment",
|
||||
},
|
||||
IngressRef: &hpav1.CrossVersionObjectReference{
|
||||
Name: "podinfo",
|
||||
APIVersion: "extensions/v1beta1",
|
||||
Kind: "Ingress",
|
||||
},
|
||||
Service: v1alpha3.CanaryService{
|
||||
Port: 9898,
|
||||
}, CanaryAnalysis: v1alpha3.CanaryAnalysis{
|
||||
Threshold: 10,
|
||||
StepWeight: 10,
|
||||
MaxWeight: 50,
|
||||
Metrics: []v1alpha3.CanaryMetric{
|
||||
{
|
||||
Name: "request-success-rate",
|
||||
Threshold: 99,
|
||||
Interval: "1m",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return cd
|
||||
}
|
||||
|
||||
func newMockIngress() *v1beta1.Ingress {
|
||||
return &v1beta1.Ingress{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: v1beta1.SchemeGroupVersion.String()},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: "podinfo",
|
||||
Annotations: map[string]string{
|
||||
"kubernetes.io/ingress.class": "nginx",
|
||||
},
|
||||
},
|
||||
Spec: v1beta1.IngressSpec{
|
||||
Rules: []v1beta1.IngressRule{
|
||||
{
|
||||
Host: "app.example.com",
|
||||
IngressRuleValue: v1beta1.IngressRuleValue{
|
||||
HTTP: &v1beta1.HTTPIngressRuleValue{
|
||||
Paths: []v1beta1.HTTPIngressPath{
|
||||
{
|
||||
Path: "/",
|
||||
Backend: v1beta1.IngressBackend{
|
||||
ServiceName: "podinfo",
|
||||
ServicePort: intstr.FromInt(9898),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,14 +52,14 @@ func NewSuperglooRouter(ctx context.Context, provider string, flaggerClient clie
|
||||
|
||||
// remove the supergloo: prefix
|
||||
provider = strings.TrimPrefix(provider, "supergloo:")
|
||||
// split namespace.name :
|
||||
// split name.namespace:
|
||||
parts := strings.Split(provider, ".")
|
||||
if len(parts) != 2 {
|
||||
return nil, fmt.Errorf("invalid format for supergloo provider")
|
||||
}
|
||||
targetMesh := solokitcore.ResourceRef{
|
||||
Namespace: parts[0],
|
||||
Name: parts[1],
|
||||
Namespace: parts[1],
|
||||
Name: parts[0],
|
||||
}
|
||||
return NewSuperglooRouterWithClient(ctx, routingRuleClient, targetMesh, logger), nil
|
||||
}
|
||||
@@ -74,12 +74,22 @@ func (sr *SuperglooRouter) Reconcile(canary *flaggerv1.Canary) error {
|
||||
if err := sr.setRetries(canary); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := sr.setHeaders(canary); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := sr.setCors(canary); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return sr.SetRoutes(canary, 100, 0)
|
||||
|
||||
// do we have routes already?
|
||||
if _, _, err := sr.GetRoutes(canary); err == nil {
|
||||
// we have routes, no need to do anything else
|
||||
return nil
|
||||
} else if solokiterror.IsNotExist(err) {
|
||||
return sr.SetRoutes(canary, 100, 0)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (sr *SuperglooRouter) setRetries(canary *flaggerv1.Canary) error {
|
||||
@@ -98,6 +108,52 @@ func (sr *SuperglooRouter) setRetries(canary *flaggerv1.Canary) error {
|
||||
|
||||
return sr.writeRuleForCanary(canary, rule)
|
||||
}
|
||||
func (sr *SuperglooRouter) setHeaders(canary *flaggerv1.Canary) error {
|
||||
if canary.Spec.Service.Headers == nil {
|
||||
return nil
|
||||
}
|
||||
headerManipulation, err := convertHeaders(canary.Spec.Service.Headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if headerManipulation == nil {
|
||||
return nil
|
||||
}
|
||||
rule := sr.createRule(canary, "headers", &supergloov1.RoutingRuleSpec{
|
||||
RuleType: &supergloov1.RoutingRuleSpec_HeaderManipulation{
|
||||
HeaderManipulation: headerManipulation,
|
||||
},
|
||||
})
|
||||
|
||||
return sr.writeRuleForCanary(canary, rule)
|
||||
}
|
||||
|
||||
func convertHeaders(headers *istiov1alpha3.Headers) (*supergloov1.HeaderManipulation, error) {
|
||||
var headersMaipulation *supergloov1.HeaderManipulation
|
||||
|
||||
if headers.Request != nil {
|
||||
headersMaipulation = &supergloov1.HeaderManipulation{}
|
||||
|
||||
headersMaipulation.RemoveRequestHeaders = headers.Request.Remove
|
||||
headersMaipulation.AppendRequestHeaders = make(map[string]string)
|
||||
for k, v := range headers.Request.Add {
|
||||
headersMaipulation.AppendRequestHeaders[k] = v
|
||||
}
|
||||
}
|
||||
if headers.Response != nil {
|
||||
if headersMaipulation == nil {
|
||||
headersMaipulation = &supergloov1.HeaderManipulation{}
|
||||
}
|
||||
|
||||
headersMaipulation.RemoveResponseHeaders = headers.Response.Remove
|
||||
headersMaipulation.AppendResponseHeaders = make(map[string]string)
|
||||
for k, v := range headers.Response.Add {
|
||||
headersMaipulation.AppendResponseHeaders[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return headersMaipulation, nil
|
||||
}
|
||||
|
||||
func convertRetries(retries *istiov1alpha3.HTTPRetry) (*supergloov1.RetryPolicy, error) {
|
||||
perTryTimeout, err := time.ParseDuration(retries.PerTryTimeout)
|
||||
|
||||
@@ -25,10 +25,9 @@ func TestSuperglooRouter_Sync(t *testing.T) {
|
||||
if err := routingRuleClient.Register(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
// TODO(yuval-k): un hard code this
|
||||
targetMesh := solokitcore.ResourceRef{
|
||||
Namespace: "supergloo-system",
|
||||
Name: "yuval",
|
||||
Name: "mesh",
|
||||
}
|
||||
router := NewSuperglooRouterWithClient(context.TODO(), routingRuleClient, targetMesh, mocks.logger)
|
||||
err = router.Reconcile(mocks.canary)
|
||||
@@ -61,10 +60,9 @@ func TestSuperglooRouter_SetRoutes(t *testing.T) {
|
||||
if err := routingRuleClient.Register(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
// TODO(yuval-k): un hard code this
|
||||
targetMesh := solokitcore.ResourceRef{
|
||||
Namespace: "supergloo-system",
|
||||
Name: "yuval",
|
||||
Name: "mesh",
|
||||
}
|
||||
router := NewSuperglooRouterWithClient(context.TODO(), routingRuleClient, targetMesh, mocks.logger)
|
||||
|
||||
@@ -126,10 +124,9 @@ func TestSuperglooRouter_GetRoutes(t *testing.T) {
|
||||
if err := routingRuleClient.Register(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
// TODO(yuval-k): un hard code this
|
||||
targetMesh := solokitcore.ResourceRef{
|
||||
Namespace: "supergloo-system",
|
||||
Name: "yuval",
|
||||
Name: "mesh",
|
||||
}
|
||||
router := NewSuperglooRouterWithClient(context.TODO(), routingRuleClient, targetMesh, mocks.logger)
|
||||
err = router.Reconcile(mocks.canary)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package version
|
||||
|
||||
var VERSION = "0.11.1"
|
||||
var VERSION = "0.13.1"
|
||||
var REVISION = "unknown"
|
||||
|
||||
Reference in New Issue
Block a user