diff --git a/cmd/flagger/main.go b/cmd/flagger/main.go index d3da7334..edf80198 100644 --- a/cmd/flagger/main.go +++ b/cmd/flagger/main.go @@ -26,7 +26,7 @@ import ( informers "github.com/weaveworks/flagger/pkg/client/informers/externalversions" "github.com/weaveworks/flagger/pkg/controller" "github.com/weaveworks/flagger/pkg/logger" - "github.com/weaveworks/flagger/pkg/metrics" + "github.com/weaveworks/flagger/pkg/metrics/observers" "github.com/weaveworks/flagger/pkg/notifier" "github.com/weaveworks/flagger/pkg/router" "github.com/weaveworks/flagger/pkg/server" @@ -163,7 +163,7 @@ func main() { logger.Infof("Watching namespace %s", namespace) } - observerFactory, err := metrics.NewFactory(metricsServer, 5*time.Second) + observerFactory, err := observers.NewFactory(metricsServer) if err != nil { logger.Fatalf("Error building prometheus client: %s", err.Error()) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 23298952..811702dc 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "github.com/weaveworks/flagger/pkg/metrics/observers" "sync" "time" @@ -49,7 +50,7 @@ type Controller struct { notifier notifier.Interface canaryFactory *canary.Factory routerFactory *router.Factory - observerFactory *metrics.Factory + observerFactory *observers.Factory meshProvider string eventWebhook string } @@ -64,7 +65,7 @@ func NewController( notifier notifier.Interface, canaryFactory *canary.Factory, routerFactory *router.Factory, - observerFactory *metrics.Factory, + observerFactory *observers.Factory, meshProvider string, version string, eventWebhook string, diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 6945ae2a..ce90d748 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -1,6 +1,7 @@ package controller import ( + "github.com/weaveworks/flagger/pkg/metrics/observers" "sync" "time" @@ -73,7 +74,7 @@ func SetupMocks(c *flaggerv1.Canary) Mocks { rf := router.NewFactory(nil, kubeClient, flaggerClient, "annotationsPrefix", logger, flaggerClient) // init observer - observerFactory, _ := metrics.NewFactory("fake", 5*time.Second) + observerFactory, _ := observers.NewFactory("fake") // init canary factory configTracker := canary.ConfigTracker{ diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index 0c17ccfb..94669222 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -7,9 +7,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3" + flaggerv1alpha1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + flaggerv1alpha3 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3" "github.com/weaveworks/flagger/pkg/canary" - "github.com/weaveworks/flagger/pkg/metrics" + "github.com/weaveworks/flagger/pkg/metrics/observers" "github.com/weaveworks/flagger/pkg/router" ) @@ -25,7 +26,7 @@ func (c *Controller) scheduleCanaries() { stats := make(map[string]int) c.canaries.Range(func(key interface{}, value interface{}) bool { - canary := value.(*flaggerv1.Canary) + canary := value.(*flaggerv1alpha3.Canary) // format: . name := key.(string) @@ -196,8 +197,8 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh } // reset status - status := flaggerv1.CanaryStatus{ - Phase: flaggerv1.CanaryPhaseProgressing, + status := flaggerv1alpha3.CanaryStatus{ + Phase: flaggerv1alpha3.CanaryPhaseProgressing, CanaryWeight: 0, FailedChecks: 0, Iterations: 0, @@ -225,8 +226,8 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh } // check if we should rollback - if cd.Status.Phase == flaggerv1.CanaryPhaseProgressing || - cd.Status.Phase == flaggerv1.CanaryPhaseWaiting { + if cd.Status.Phase == flaggerv1alpha3.CanaryPhaseProgressing || + cd.Status.Phase == flaggerv1alpha3.CanaryPhaseWaiting { if ok := c.runRollbackHooks(cd, cd.Status.Phase); ok { c.recordEventWarningf(cd, "Rolling back %s.%s manual webhook invoked", cd.Name, cd.Namespace) c.sendNotification(cd, "Rolling back manual webhook invoked", false, true) @@ -236,7 +237,7 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh } // route all traffic to primary if analysis has succeeded - if cd.Status.Phase == flaggerv1.CanaryPhasePromoting { + if cd.Status.Phase == flaggerv1alpha3.CanaryPhasePromoting { if provider != "kubernetes" { c.recordEventInfof(cd, "Routing all traffic to primary") if err := meshRouter.SetRoutes(cd, 100, 0, false); err != nil { @@ -247,7 +248,7 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh } // update status phase - if err := canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhaseFinalising); err != nil { + if err := canaryController.SetStatusPhase(cd, flaggerv1alpha3.CanaryPhaseFinalising); err != nil { c.recordEventWarningf(cd, "%v", err) return } @@ -256,19 +257,19 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh } // scale canary to zero if promotion has finished - if cd.Status.Phase == flaggerv1.CanaryPhaseFinalising { + if cd.Status.Phase == flaggerv1alpha3.CanaryPhaseFinalising { if err := canaryController.Scale(cd, 0); err != nil { c.recordEventWarningf(cd, "%v", err) return } // set status to succeeded - if err := canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhaseSucceeded); err != nil { + if err := canaryController.SetStatusPhase(cd, flaggerv1alpha3.CanaryPhaseSucceeded); err != nil { c.recordEventWarningf(cd, "%v", err) return } - c.recorder.SetStatus(cd, flaggerv1.CanaryPhaseSucceeded) - c.runPostRolloutHooks(cd, flaggerv1.CanaryPhaseSucceeded) + c.recorder.SetStatus(cd, flaggerv1alpha3.CanaryPhaseSucceeded) + c.runPostRolloutHooks(cd, flaggerv1alpha3.CanaryPhaseSucceeded) c.recordEventInfof(cd, "Promotion completed! Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace) c.sendNotification(cd, "Canary analysis completed successfully, promotion finished.", false, false) @@ -276,7 +277,7 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh } // check if the number of failed checks reached the threshold - if cd.Status.Phase == flaggerv1.CanaryPhaseProgressing && + if cd.Status.Phase == flaggerv1alpha3.CanaryPhaseProgressing && (!retriable || cd.Status.FailedChecks >= cd.Spec.CanaryAnalysis.Threshold) { if !retriable { c.recordEventWarningf(cd, "Rolling back %s.%s progress deadline exceeded %v", @@ -349,7 +350,7 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh } -func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string, mirrored bool, canaryWeight int, primaryWeight int, maxWeight int) { +func (c *Controller) runCanary(canary *flaggerv1alpha3.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string, mirrored bool, canaryWeight int, primaryWeight int, maxWeight int) { primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name) // increase traffic weight @@ -412,14 +413,14 @@ func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary } // update status phase - if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhasePromoting); err != nil { + if err := canaryController.SetStatusPhase(canary, flaggerv1alpha3.CanaryPhasePromoting); err != nil { c.recordEventWarningf(canary, "%v", err) return } } } -func (c *Controller) runAB(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string) { +func (c *Controller) runAB(canary *flaggerv1alpha3.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string) { primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name) // route traffic to canary and increment iterations @@ -454,14 +455,14 @@ func (c *Controller) runAB(canary *flaggerv1.Canary, canaryController canary.Con } // update status phase - if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhasePromoting); err != nil { + if err := canaryController.SetStatusPhase(canary, flaggerv1alpha3.CanaryPhasePromoting); err != nil { c.recordEventWarningf(canary, "%v", err) return } } } -func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string, mirrored bool) { +func (c *Controller) runBlueGreen(canary *flaggerv1alpha3.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string, mirrored bool) { primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name) // increment iterations @@ -522,7 +523,7 @@ func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController can } // update status phase - if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhasePromoting); err != nil { + if err := canaryController.SetStatusPhase(canary, flaggerv1alpha3.CanaryPhasePromoting); err != nil { c.recordEventWarningf(canary, "%v", err) return } @@ -530,7 +531,7 @@ func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController can } -func (c *Controller) shouldSkipAnalysis(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, primaryWeight int, canaryWeight int) bool { +func (c *Controller) shouldSkipAnalysis(canary *flaggerv1alpha3.Canary, canaryController canary.Controller, meshRouter router.Interface, primaryWeight int, canaryWeight int) bool { if !canary.Spec.SkipAnalysis { return false } @@ -559,13 +560,13 @@ func (c *Controller) shouldSkipAnalysis(canary *flaggerv1.Canary, canaryControll } // update status phase - if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhaseSucceeded); err != nil { + if err := canaryController.SetStatusPhase(canary, flaggerv1alpha3.CanaryPhaseSucceeded); err != nil { c.recordEventWarningf(canary, "%v", err) return false } // notify - c.recorder.SetStatus(canary, flaggerv1.CanaryPhaseSucceeded) + c.recorder.SetStatus(canary, flaggerv1alpha3.CanaryPhaseSucceeded) c.recordEventInfof(canary, "Promotion completed! Canary analysis was skipped for %s.%s", canary.Spec.TargetRef.Name, canary.Namespace) c.sendNotification(canary, "Canary analysis was skipped, promotion finished.", @@ -574,13 +575,13 @@ func (c *Controller) shouldSkipAnalysis(canary *flaggerv1.Canary, canaryControll return true } -func (c *Controller) shouldAdvance(canary *flaggerv1.Canary, canaryController canary.Controller) (bool, error) { +func (c *Controller) shouldAdvance(canary *flaggerv1alpha3.Canary, canaryController canary.Controller) (bool, error) { if canary.Status.LastAppliedSpec == "" || - canary.Status.Phase == flaggerv1.CanaryPhaseInitializing || - canary.Status.Phase == flaggerv1.CanaryPhaseProgressing || - canary.Status.Phase == flaggerv1.CanaryPhaseWaiting || - canary.Status.Phase == flaggerv1.CanaryPhasePromoting || - canary.Status.Phase == flaggerv1.CanaryPhaseFinalising { + canary.Status.Phase == flaggerv1alpha3.CanaryPhaseInitializing || + canary.Status.Phase == flaggerv1alpha3.CanaryPhaseProgressing || + canary.Status.Phase == flaggerv1alpha3.CanaryPhaseWaiting || + canary.Status.Phase == flaggerv1alpha3.CanaryPhasePromoting || + canary.Status.Phase == flaggerv1alpha3.CanaryPhaseFinalising { return true, nil } @@ -601,20 +602,20 @@ func (c *Controller) shouldAdvance(canary *flaggerv1.Canary, canaryController ca } -func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryController canary.Controller, shouldAdvance bool) bool { +func (c *Controller) checkCanaryStatus(canary *flaggerv1alpha3.Canary, canaryController canary.Controller, shouldAdvance bool) bool { c.recorder.SetStatus(canary, canary.Status.Phase) - if canary.Status.Phase == flaggerv1.CanaryPhaseProgressing || - canary.Status.Phase == flaggerv1.CanaryPhasePromoting || - canary.Status.Phase == flaggerv1.CanaryPhaseFinalising { + if canary.Status.Phase == flaggerv1alpha3.CanaryPhaseProgressing || + canary.Status.Phase == flaggerv1alpha3.CanaryPhasePromoting || + canary.Status.Phase == flaggerv1alpha3.CanaryPhaseFinalising { return true } - if canary.Status.Phase == "" || canary.Status.Phase == flaggerv1.CanaryPhaseInitializing { - if err := canaryController.SyncStatus(canary, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryPhaseInitialized}); err != nil { + if canary.Status.Phase == "" || canary.Status.Phase == flaggerv1alpha3.CanaryPhaseInitializing { + if err := canaryController.SyncStatus(canary, flaggerv1alpha3.CanaryStatus{Phase: flaggerv1alpha3.CanaryPhaseInitialized}); err != nil { c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).Errorf("%v", err) return false } - c.recorder.SetStatus(canary, flaggerv1.CanaryPhaseInitialized) + c.recorder.SetStatus(canary, flaggerv1alpha3.CanaryPhaseInitialized) c.recordEventInfof(canary, "Initialization done! %s.%s", canary.Name, canary.Namespace) c.sendNotification(canary, "New deployment detected, initialization completed.", true, false) @@ -623,7 +624,7 @@ func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryControlle if shouldAdvance { canaryPhaseProgressing := canary.DeepCopy() - canaryPhaseProgressing.Status.Phase = flaggerv1.CanaryPhaseProgressing + canaryPhaseProgressing.Status.Phase = flaggerv1alpha3.CanaryPhaseProgressing c.recordEventInfof(canaryPhaseProgressing, "New revision detected! Scaling up %s.%s", canaryPhaseProgressing.Spec.TargetRef.Name, canaryPhaseProgressing.Namespace) c.sendNotification(canaryPhaseProgressing, "New revision detected, starting canary analysis.", true, false) @@ -632,18 +633,18 @@ func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryControlle c.recordEventErrorf(canary, "%v", err) return false } - if err := canaryController.SyncStatus(canary, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryPhaseProgressing}); err != nil { + if err := canaryController.SyncStatus(canary, flaggerv1alpha3.CanaryStatus{Phase: flaggerv1alpha3.CanaryPhaseProgressing}); err != nil { c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).Errorf("%v", err) return false } - c.recorder.SetStatus(canary, flaggerv1.CanaryPhaseProgressing) + c.recorder.SetStatus(canary, flaggerv1alpha3.CanaryPhaseProgressing) return false } return false } -func (c *Controller) hasCanaryRevisionChanged(canary *flaggerv1.Canary, canaryController canary.Controller) bool { - if canary.Status.Phase == flaggerv1.CanaryPhaseProgressing { +func (c *Controller) hasCanaryRevisionChanged(canary *flaggerv1alpha3.Canary, canaryController canary.Controller) bool { + if canary.Status.Phase == flaggerv1alpha3.CanaryPhaseProgressing { if diff, _ := canaryController.HasTargetChanged(canary); diff { return true } @@ -654,13 +655,13 @@ func (c *Controller) hasCanaryRevisionChanged(canary *flaggerv1.Canary, canaryCo return false } -func (c *Controller) runConfirmRolloutHooks(canary *flaggerv1.Canary, canaryController canary.Controller) bool { +func (c *Controller) runConfirmRolloutHooks(canary *flaggerv1alpha3.Canary, canaryController canary.Controller) bool { for _, webhook := range canary.Spec.CanaryAnalysis.Webhooks { - if webhook.Type == flaggerv1.ConfirmRolloutHook { - err := CallWebhook(canary.Name, canary.Namespace, flaggerv1.CanaryPhaseProgressing, webhook) + if webhook.Type == flaggerv1alpha3.ConfirmRolloutHook { + err := CallWebhook(canary.Name, canary.Namespace, flaggerv1alpha3.CanaryPhaseProgressing, webhook) if err != nil { - if canary.Status.Phase != flaggerv1.CanaryPhaseWaiting { - if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhaseWaiting); err != nil { + if canary.Status.Phase != flaggerv1alpha3.CanaryPhaseWaiting { + if err := canaryController.SetStatusPhase(canary, flaggerv1alpha3.CanaryPhaseWaiting); err != nil { c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).Errorf("%v", err) } c.recordEventWarningf(canary, "Halt %s.%s advancement waiting for approval %s", @@ -669,8 +670,8 @@ func (c *Controller) runConfirmRolloutHooks(canary *flaggerv1.Canary, canaryCont } return false } else { - if canary.Status.Phase == flaggerv1.CanaryPhaseWaiting { - if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhaseProgressing); err != nil { + if canary.Status.Phase == flaggerv1alpha3.CanaryPhaseWaiting { + if err := canaryController.SetStatusPhase(canary, flaggerv1alpha3.CanaryPhaseProgressing); err != nil { c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).Errorf("%v", err) return false } @@ -683,10 +684,10 @@ func (c *Controller) runConfirmRolloutHooks(canary *flaggerv1.Canary, canaryCont return true } -func (c *Controller) runConfirmPromotionHooks(canary *flaggerv1.Canary) bool { +func (c *Controller) runConfirmPromotionHooks(canary *flaggerv1alpha3.Canary) bool { for _, webhook := range canary.Spec.CanaryAnalysis.Webhooks { - if webhook.Type == flaggerv1.ConfirmPromotionHook { - err := CallWebhook(canary.Name, canary.Namespace, flaggerv1.CanaryPhaseProgressing, webhook) + if webhook.Type == flaggerv1alpha3.ConfirmPromotionHook { + err := CallWebhook(canary.Name, canary.Namespace, flaggerv1alpha3.CanaryPhaseProgressing, webhook) if err != nil { c.recordEventWarningf(canary, "Halt %s.%s advancement waiting for promotion approval %s", canary.Name, canary.Namespace, webhook.Name) @@ -700,10 +701,10 @@ func (c *Controller) runConfirmPromotionHooks(canary *flaggerv1.Canary) bool { return true } -func (c *Controller) runPreRolloutHooks(canary *flaggerv1.Canary) bool { +func (c *Controller) runPreRolloutHooks(canary *flaggerv1alpha3.Canary) bool { for _, webhook := range canary.Spec.CanaryAnalysis.Webhooks { - if webhook.Type == flaggerv1.PreRolloutHook { - err := CallWebhook(canary.Name, canary.Namespace, flaggerv1.CanaryPhaseProgressing, webhook) + if webhook.Type == flaggerv1alpha3.PreRolloutHook { + err := CallWebhook(canary.Name, canary.Namespace, flaggerv1alpha3.CanaryPhaseProgressing, webhook) if err != nil { c.recordEventWarningf(canary, "Halt %s.%s advancement pre-rollout check %s failed %v", canary.Name, canary.Namespace, webhook.Name, err) @@ -716,9 +717,9 @@ func (c *Controller) runPreRolloutHooks(canary *flaggerv1.Canary) bool { return true } -func (c *Controller) runPostRolloutHooks(canary *flaggerv1.Canary, phase flaggerv1.CanaryPhase) bool { +func (c *Controller) runPostRolloutHooks(canary *flaggerv1alpha3.Canary, phase flaggerv1alpha3.CanaryPhase) bool { for _, webhook := range canary.Spec.CanaryAnalysis.Webhooks { - if webhook.Type == flaggerv1.PostRolloutHook { + if webhook.Type == flaggerv1alpha3.PostRolloutHook { err := CallWebhook(canary.Name, canary.Namespace, phase, webhook) if err != nil { c.recordEventWarningf(canary, "Post-rollout hook %s failed %v", webhook.Name, err) @@ -731,9 +732,9 @@ func (c *Controller) runPostRolloutHooks(canary *flaggerv1.Canary, phase flagger return true } -func (c *Controller) runRollbackHooks(canary *flaggerv1.Canary, phase flaggerv1.CanaryPhase) bool { +func (c *Controller) runRollbackHooks(canary *flaggerv1alpha3.Canary, phase flaggerv1alpha3.CanaryPhase) bool { for _, webhook := range canary.Spec.CanaryAnalysis.Webhooks { - if webhook.Type == flaggerv1.RollbackHook { + if webhook.Type == flaggerv1alpha3.RollbackHook { err := CallWebhook(canary.Name, canary.Namespace, phase, webhook) if err != nil { c.recordEventInfof(canary, "Rollback hook %s not signaling a rollback", webhook.Name) @@ -746,11 +747,11 @@ func (c *Controller) runRollbackHooks(canary *flaggerv1.Canary, phase flaggerv1. return false } -func (c *Controller) runAnalysis(r *flaggerv1.Canary) bool { +func (c *Controller) runAnalysis(r *flaggerv1alpha3.Canary) bool { // run external checks for _, webhook := range r.Spec.CanaryAnalysis.Webhooks { - if webhook.Type == "" || webhook.Type == flaggerv1.RolloutHook { - err := CallWebhook(r.Name, r.Namespace, flaggerv1.CanaryPhaseProgressing, webhook) + if webhook.Type == "" || webhook.Type == flaggerv1alpha3.RolloutHook { + err := CallWebhook(r.Name, r.Namespace, flaggerv1alpha3.CanaryPhaseProgressing, webhook) if err != nil { c.recordEventWarningf(r, "Halt %s.%s advancement external check %s failed %v", r.Name, r.Namespace, webhook.Name, err) @@ -759,6 +760,15 @@ func (c *Controller) runAnalysis(r *flaggerv1.Canary) bool { } } + ok := c.runBuiltinMetricChecks(r) + if !ok { + return ok + } + + return true +} + +func (c *Controller) runBuiltinMetricChecks(r *flaggerv1alpha3.Canary) bool { // override the global provider if one is specified in the canary spec var metricsProvider string // set the metrics provider to Crossover Prometheus when Crossover is the mesh provider @@ -786,11 +796,9 @@ func (c *Controller) runAnalysis(r *flaggerv1.Canary) bool { observerFactory := c.observerFactory // override the global metrics server if one is specified in the canary spec - metricsServer := c.observerFactory.Client.GetMetricsServer() if r.Spec.MetricsServer != "" { - metricsServer = r.Spec.MetricsServer var err error - observerFactory, err = metrics.NewFactory(metricsServer, 5*time.Second) + observerFactory, err = observers.NewFactory(r.Spec.MetricsServer) if err != nil { c.recordEventErrorf(r, "Error building Prometheus client for %s %v", r.Spec.MetricsServer, err) return false @@ -805,13 +813,13 @@ func (c *Controller) runAnalysis(r *flaggerv1.Canary) bool { } if metric.Name == "request-success-rate" { - val, err := observer.GetRequestSuccessRate(r.Spec.TargetRef.Name, r.Namespace, metric.Interval) + val, err := observer.GetRequestSuccessRate(toMetricModel(r, metric.Interval)) if err != nil { if strings.Contains(err.Error(), "no values found") { c.recordEventWarningf(r, "Halt advancement no values found for %s metric %s probably %s.%s is not receiving traffic", metricsProvider, metric.Name, r.Spec.TargetRef.Name, r.Namespace) } else { - c.recordEventErrorf(r, "Metrics server %s query failed: %v", metricsServer, err) + c.recordEventErrorf(r, "Prometheus query failed: %v", err) } return false } @@ -825,13 +833,13 @@ func (c *Controller) runAnalysis(r *flaggerv1.Canary) bool { } if metric.Name == "request-duration" { - val, err := observer.GetRequestDuration(r.Spec.TargetRef.Name, r.Namespace, metric.Interval) + val, err := observer.GetRequestDuration(toMetricModel(r, metric.Interval)) if err != nil { if strings.Contains(err.Error(), "no values found") { c.recordEventWarningf(r, "Halt advancement no values found for %s metric %s probably %s.%s is not receiving traffic", metricsProvider, metric.Name, r.Spec.TargetRef.Name, r.Namespace) } else { - c.recordEventErrorf(r, "Metrics server %s query failed: %v", metricsServer, err) + c.recordEventErrorf(r, "Prometheus query failed: %v", err) } return false } @@ -853,7 +861,7 @@ func (c *Controller) runAnalysis(r *flaggerv1.Canary) bool { c.recordEventWarningf(r, "Halt advancement no values found for custom metric: %s", metric.Name) } else { - c.recordEventErrorf(r, "Metrics server %s query failed for %s: %v", metricsServer, metric.Name, err) + c.recordEventErrorf(r, "Prometheus query failed for %s: %v", metric.Name, err) } return false } @@ -868,7 +876,26 @@ func (c *Controller) runAnalysis(r *flaggerv1.Canary) bool { return true } -func (c *Controller) rollback(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface) { +func toMetricModel(r *flaggerv1alpha3.Canary, interval string) flaggerv1alpha1.MetricTemplateModel { + service := r.Spec.TargetRef.Name + if r.Spec.Service.Name != "" { + service = r.Spec.Service.Name + } + ingress := r.Spec.TargetRef.Name + if r.Spec.IngressRef != nil { + ingress = r.Spec.IngressRef.Name + } + return flaggerv1alpha1.MetricTemplateModel{ + Name: r.Name, + Namespace: r.Namespace, + Target: r.Spec.TargetRef.Name, + Service: service, + Ingress: ingress, + Interval: interval, + } +} + +func (c *Controller) rollback(canary *flaggerv1alpha3.Canary, canaryController canary.Controller, meshRouter router.Interface) { if canary.Status.FailedChecks >= canary.Spec.CanaryAnalysis.Threshold { c.recordEventWarningf(canary, "Rolling back %s.%s failed checks threshold reached %v", canary.Name, canary.Namespace, canary.Status.FailedChecks) @@ -885,7 +912,7 @@ func (c *Controller) rollback(canary *flaggerv1.Canary, canaryController canary. } canaryPhaseFailed := canary.DeepCopy() - canaryPhaseFailed.Status.Phase = flaggerv1.CanaryPhaseFailed + canaryPhaseFailed.Status.Phase = flaggerv1alpha3.CanaryPhaseFailed c.recordEventWarningf(canaryPhaseFailed, "Canary failed! Scaling down %s.%s", canaryPhaseFailed.Name, canaryPhaseFailed.Namespace) @@ -898,11 +925,11 @@ func (c *Controller) rollback(canary *flaggerv1.Canary, canaryController canary. } // mark canary as failed - if err := canaryController.SyncStatus(canary, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryPhaseFailed, CanaryWeight: 0}); err != nil { + if err := canaryController.SyncStatus(canary, flaggerv1alpha3.CanaryStatus{Phase: flaggerv1alpha3.CanaryPhaseFailed, CanaryWeight: 0}); err != nil { c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).Errorf("%v", err) return } - c.recorder.SetStatus(canary, flaggerv1.CanaryPhaseFailed) - c.runPostRolloutHooks(canary, flaggerv1.CanaryPhaseFailed) + c.recorder.SetStatus(canary, flaggerv1alpha3.CanaryPhaseFailed) + c.runPostRolloutHooks(canary, flaggerv1alpha3.CanaryPhaseFailed) } diff --git a/pkg/metrics/appmesh.go b/pkg/metrics/appmesh.go deleted file mode 100644 index f922c55a..00000000 --- a/pkg/metrics/appmesh.go +++ /dev/null @@ -1,73 +0,0 @@ -package metrics - -import ( - "time" -) - -var appMeshQueries = map[string]string{ - "request-success-rate": ` - sum( - rate( - envoy_cluster_upstream_rq{ - kubernetes_namespace="{{ .Namespace }}", - kubernetes_pod_name=~"{{ .Name }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)", - envoy_response_code!~"5.*" - }[{{ .Interval }}] - ) - ) - / - sum( - rate( - envoy_cluster_upstream_rq{ - kubernetes_namespace="{{ .Namespace }}", - kubernetes_pod_name=~"{{ .Name }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)" - }[{{ .Interval }}] - ) - ) - * 100`, - "request-duration": ` - histogram_quantile( - 0.99, - sum( - rate( - envoy_cluster_upstream_rq_time_bucket{ - kubernetes_namespace="{{ .Namespace }}", - kubernetes_pod_name=~"{{ .Name }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)" - }[{{ .Interval }}] - ) - ) by (le) - )`, -} - -type AppMeshObserver struct { - client *PrometheusClient -} - -func (ob *AppMeshObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, appMeshQueries["request-success-rate"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - return value, nil -} - -func (ob *AppMeshObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, appMeshQueries["request-duration"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - ms := time.Duration(int64(value)) * time.Millisecond - return ms, nil -} diff --git a/pkg/metrics/client_test.go b/pkg/metrics/client_test.go deleted file mode 100644 index 9fdbe85a..00000000 --- a/pkg/metrics/client_test.go +++ /dev/null @@ -1,85 +0,0 @@ -package metrics - -import ( - "net/http" - "net/http/httptest" - "testing" - "time" -) - -func TestPrometheusClient_RunQuery(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"100"]}]}}` - w.Write([]byte(json)) - })) - defer ts.Close() - - client, err := NewPrometheusClient(ts.URL, time.Second) - if err != nil { - t.Fatal(err) - } - - query := ` - histogram_quantile(0.99, - sum( - rate( - http_request_duration_seconds_bucket{ - kubernetes_namespace="test", - kubernetes_pod_name=~"podinfo-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)" - }[1m] - ) - ) by (le) - )` - - val, err := client.RunQuery(query) - if err != nil { - t.Fatal(err.Error()) - } - - if val != 100 { - t.Errorf("Got %v wanted %v", val, 100) - } -} - -func TestPrometheusClient_IsOnline(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - json := `{"status":"success","data":{"config.file":"/etc/prometheus/prometheus.yml"}}` - w.Write([]byte(json)) - })) - defer ts.Close() - - client, err := NewPrometheusClient(ts.URL, time.Second) - if err != nil { - t.Fatal(err) - } - - ok, err := client.IsOnline() - if err != nil { - t.Fatal(err.Error()) - } - - if !ok { - t.Errorf("Got %v wanted %v", ok, true) - } -} - -func TestPrometheusClient_IsOffline(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusBadGateway) - })) - defer ts.Close() - - client, err := NewPrometheusClient(ts.URL, time.Second) - if err != nil { - t.Fatal(err) - } - - ok, err := client.IsOnline() - if err == nil { - t.Errorf("Got no error wanted %v", http.StatusBadGateway) - } - - if ok { - t.Errorf("Got %v wanted %v", ok, false) - } -} diff --git a/pkg/metrics/crossover.go b/pkg/metrics/crossover.go deleted file mode 100644 index 54a5e290..00000000 --- a/pkg/metrics/crossover.go +++ /dev/null @@ -1,73 +0,0 @@ -package metrics - -import ( - "time" -) - -var crossoverQueries = map[string]string{ - "request-success-rate": ` - sum( - rate( - envoy_cluster_upstream_rq{ - kubernetes_namespace="{{ .Namespace }}", - envoy_cluster_name=~"{{ .Name }}-canary", - envoy_response_code!~"5.*" - }[{{ .Interval }}] - ) - ) - / - sum( - rate( - envoy_cluster_upstream_rq{ - kubernetes_namespace="{{ .Namespace }}", - envoy_cluster_name=~"{{ .Name }}-canary" - }[{{ .Interval }}] - ) - ) - * 100`, - "request-duration": ` - histogram_quantile( - 0.99, - sum( - rate( - envoy_cluster_upstream_rq_time_bucket{ - kubernetes_namespace="{{ .Namespace }}", - envoy_cluster_name=~"{{ .Name }}-canary" - }[{{ .Interval }}] - ) - ) by (le) - )`, -} - -type CrossoverObserver struct { - client *PrometheusClient -} - -func (ob *CrossoverObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, crossoverQueries["request-success-rate"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - return value, nil -} - -func (ob *CrossoverObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, crossoverQueries["request-duration"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - ms := time.Duration(int64(value)) * time.Millisecond - return ms, nil -} diff --git a/pkg/metrics/crossover_service.go b/pkg/metrics/crossover_service.go deleted file mode 100644 index bde9f0a1..00000000 --- a/pkg/metrics/crossover_service.go +++ /dev/null @@ -1,73 +0,0 @@ -package metrics - -import ( - "time" -) - -var crossoverServiceQueries = map[string]string{ - "request-success-rate": ` - sum( - rate( - envoy_cluster_upstream_rq{ - kubernetes_namespace="{{ .Namespace }}", - envoy_cluster_name="{{ .Name }}-canary", - envoy_response_code!~"5.*" - }[{{ .Interval }}] - ) - ) - / - sum( - rate( - envoy_cluster_upstream_rq{ - kubernetes_namespace="{{ .Namespace }}", - envoy_cluster_name="{{ .Name }}-canary" - }[{{ .Interval }}] - ) - ) - * 100`, - "request-duration": ` - histogram_quantile( - 0.99, - sum( - rate( - envoy_cluster_upstream_rq_time_bucket{ - kubernetes_namespace="{{ .Namespace }}", - envoy_cluster_name="{{ .Name }}-canary" - }[{{ .Interval }}] - ) - ) by (le) - )`, -} - -type CrossoverServiceObserver struct { - client *PrometheusClient -} - -func (ob *CrossoverServiceObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, crossoverServiceQueries["request-success-rate"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - return value, nil -} - -func (ob *CrossoverServiceObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, crossoverServiceQueries["request-duration"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - ms := time.Duration(int64(value)) * time.Millisecond - return ms, nil -} diff --git a/pkg/metrics/http.go b/pkg/metrics/http.go deleted file mode 100644 index 9c169e44..00000000 --- a/pkg/metrics/http.go +++ /dev/null @@ -1,71 +0,0 @@ -package metrics - -import "time" - -var httpQueries = map[string]string{ - "request-success-rate": ` - sum( - rate( - http_request_duration_seconds_count{ - kubernetes_namespace="{{ .Namespace }}", - kubernetes_pod_name=~"{{ .Name }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)", - status!~"5.*" - }[{{ .Interval }}] - ) - ) - / - sum( - rate( - http_request_duration_seconds_count{ - kubernetes_namespace="{{ .Namespace }}", - kubernetes_pod_name=~"{{ .Name }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)" - }[{{ .Interval }}] - ) - ) - * 100`, - "request-duration": ` - histogram_quantile( - 0.99, - sum( - rate( - http_request_duration_seconds_bucket{ - kubernetes_namespace="{{ .Namespace }}", - kubernetes_pod_name=~"{{ .Name }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)" - }[{{ .Interval }}] - ) - ) by (le) - )`, -} - -type HttpObserver struct { - client *PrometheusClient -} - -func (ob *HttpObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, httpQueries["request-success-rate"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - return value, nil -} - -func (ob *HttpObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, httpQueries["request-duration"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - ms := time.Duration(int64(value*1000)) * time.Millisecond - return ms, nil -} diff --git a/pkg/metrics/istio.go b/pkg/metrics/istio.go deleted file mode 100644 index f6894d05..00000000 --- a/pkg/metrics/istio.go +++ /dev/null @@ -1,76 +0,0 @@ -package metrics - -import ( - "time" -) - -var istioQueries = map[string]string{ - "request-success-rate": ` - sum( - rate( - istio_requests_total{ - reporter="destination", - destination_workload_namespace="{{ .Namespace }}", - destination_workload=~"{{ .Name }}", - response_code!~"5.*" - }[{{ .Interval }}] - ) - ) - / - sum( - rate( - istio_requests_total{ - reporter="destination", - destination_workload_namespace="{{ .Namespace }}", - destination_workload=~"{{ .Name }}" - }[{{ .Interval }}] - ) - ) - * 100`, - "request-duration": ` - histogram_quantile( - 0.99, - sum( - rate( - istio_request_duration_seconds_bucket{ - reporter="destination", - destination_workload_namespace="{{ .Namespace }}", - destination_workload=~"{{ .Name }}" - }[{{ .Interval }}] - ) - ) by (le) - )`, -} - -type IstioObserver struct { - client *PrometheusClient -} - -func (ob *IstioObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, istioQueries["request-success-rate"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - return value, nil -} - -func (ob *IstioObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, istioQueries["request-duration"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - ms := time.Duration(int64(value*1000)) * time.Millisecond - return ms, nil -} diff --git a/pkg/metrics/linkerd.go b/pkg/metrics/linkerd.go deleted file mode 100644 index f4d5707a..00000000 --- a/pkg/metrics/linkerd.go +++ /dev/null @@ -1,76 +0,0 @@ -package metrics - -import ( - "time" -) - -var linkerdQueries = map[string]string{ - "request-success-rate": ` - sum( - rate( - response_total{ - namespace="{{ .Namespace }}", - deployment=~"{{ .Name }}", - classification!="failure", - direction="inbound" - }[{{ .Interval }}] - ) - ) - / - sum( - rate( - response_total{ - namespace="{{ .Namespace }}", - deployment=~"{{ .Name }}", - direction="inbound" - }[{{ .Interval }}] - ) - ) - * 100`, - "request-duration": ` - histogram_quantile( - 0.99, - sum( - rate( - response_latency_ms_bucket{ - namespace="{{ .Namespace }}", - deployment=~"{{ .Name }}", - direction="inbound" - }[{{ .Interval }}] - ) - ) by (le) - )`, -} - -type LinkerdObserver struct { - client *PrometheusClient -} - -func (ob *LinkerdObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, linkerdQueries["request-success-rate"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - return value, nil -} - -func (ob *LinkerdObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, linkerdQueries["request-duration"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - ms := time.Duration(int64(value)) * time.Millisecond - return ms, nil -} diff --git a/pkg/metrics/nginx.go b/pkg/metrics/nginx.go deleted file mode 100644 index 593d2306..00000000 --- a/pkg/metrics/nginx.go +++ /dev/null @@ -1,80 +0,0 @@ -package metrics - -import ( - "time" -) - -var nginxQueries = map[string]string{ - "request-success-rate": ` - sum( - rate( - nginx_ingress_controller_requests{ - namespace="{{ .Namespace }}", - ingress="{{ .Name }}", - status!~"5.*" - }[{{ .Interval }}] - ) - ) - / - sum( - rate( - nginx_ingress_controller_requests{ - namespace="{{ .Namespace }}", - ingress="{{ .Name }}" - }[{{ .Interval }}] - ) - ) - * 100`, - "request-duration": ` - sum( - rate( - nginx_ingress_controller_ingress_upstream_latency_seconds_sum{ - namespace="{{ .Namespace }}", - ingress="{{ .Name }}" - }[{{ .Interval }}] - ) - ) - / - sum( - rate( - nginx_ingress_controller_ingress_upstream_latency_seconds_count{ - namespace="{{ .Namespace }}", - ingress="{{ .Name }}" - }[{{ .Interval }}] - ) - ) - * 1000`, -} - -type NginxObserver struct { - client *PrometheusClient -} - -func (ob *NginxObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, nginxQueries["request-success-rate"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - return value, nil -} - -func (ob *NginxObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, nginxQueries["request-duration"]) - if err != nil { - return 0, err - } - - value, err := ob.client.RunQuery(query) - if err != nil { - return 0, err - } - - ms := time.Duration(int64(value)) * time.Millisecond - return ms, nil -} diff --git a/pkg/metrics/observer.go b/pkg/metrics/observer.go deleted file mode 100644 index 906285e7..00000000 --- a/pkg/metrics/observer.go +++ /dev/null @@ -1,10 +0,0 @@ -package metrics - -import ( - "time" -) - -type Interface interface { - GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) - GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) -} diff --git a/pkg/metrics/observers/appmesh.go b/pkg/metrics/observers/appmesh.go new file mode 100644 index 00000000..9bd3de22 --- /dev/null +++ b/pkg/metrics/observers/appmesh.go @@ -0,0 +1,76 @@ +package observers + +import ( + "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" +) + +var appMeshQueries = map[string]string{ + "request-success-rate": ` + sum( + rate( + envoy_cluster_upstream_rq{ + kubernetes_namespace="{{ namespace }}", + kubernetes_pod_name=~"{{ target }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)", + envoy_response_code!~"5.*" + }[{{ interval }}] + ) + ) + / + sum( + rate( + envoy_cluster_upstream_rq{ + kubernetes_namespace="{{ namespace }}", + kubernetes_pod_name=~"{{ target }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)" + }[{{ interval }}] + ) + ) + * 100`, + "request-duration": ` + histogram_quantile( + 0.99, + sum( + rate( + envoy_cluster_upstream_rq_time_bucket{ + kubernetes_namespace="{{ namespace }}", + kubernetes_pod_name=~"{{ target }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)" + }[{{ interval }}] + ) + ) by (le) + )`, +} + +type AppMeshObserver struct { + client providers.Interface +} + +func (ob *AppMeshObserver) GetRequestSuccessRate(model flaggerv1.MetricTemplateModel) (float64, error) { + query, err := RenderQuery(appMeshQueries["request-success-rate"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + return value, nil +} + +func (ob *AppMeshObserver) GetRequestDuration(model flaggerv1.MetricTemplateModel) (time.Duration, error) { + query, err := RenderQuery(appMeshQueries["request-duration"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + ms := time.Duration(int64(value)) * time.Millisecond + return ms, nil +} diff --git a/pkg/metrics/appmesh_test.go b/pkg/metrics/observers/appmesh_test.go similarity index 69% rename from pkg/metrics/appmesh_test.go rename to pkg/metrics/observers/appmesh_test.go index 471be5c4..a3114f91 100644 --- a/pkg/metrics/appmesh_test.go +++ b/pkg/metrics/observers/appmesh_test.go @@ -1,10 +1,13 @@ -package metrics +package observers import ( "net/http" "net/http/httptest" "testing" "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" ) func TestAppMeshObserver_GetRequestSuccessRate(t *testing.T) { @@ -21,7 +24,11 @@ func TestAppMeshObserver_GetRequestSuccessRate(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -30,7 +37,13 @@ func TestAppMeshObserver_GetRequestSuccessRate(t *testing.T) { client: client, } - val, err := observer.GetRequestSuccessRate("podinfo", "default", "1m") + val, err := observer.GetRequestSuccessRate(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } @@ -54,7 +67,11 @@ func TestAppMeshObserver_GetRequestDuration(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -63,7 +80,13 @@ func TestAppMeshObserver_GetRequestDuration(t *testing.T) { client: client, } - val, err := observer.GetRequestDuration("podinfo", "default", "1m") + val, err := observer.GetRequestDuration(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/metrics/contour.go b/pkg/metrics/observers/contour.go similarity index 51% rename from pkg/metrics/contour.go rename to pkg/metrics/observers/contour.go index bf5345ee..408a84c1 100644 --- a/pkg/metrics/contour.go +++ b/pkg/metrics/observers/contour.go @@ -1,7 +1,10 @@ -package metrics +package observers import ( "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" ) //envoy_cluster_name="test_podinfo-canary_9898" @@ -11,17 +14,17 @@ var contourQueries = map[string]string{ sum( rate( envoy_cluster_upstream_rq{ - envoy_cluster_name=~"{{ .Namespace }}_{{ .Name }}-canary_[0-9a-zA-Z-]+", + envoy_cluster_name=~"{{ namespace }}_{{ target }}-canary_[0-9a-zA-Z-]+", envoy_response_code!~"5.*" - }[{{ .Interval }}] + }[{{ interval }}] ) ) / sum( rate( envoy_cluster_upstream_rq{ - envoy_cluster_name=~"{{ .Namespace }}_{{ .Name }}-canary_[0-9a-zA-Z-]+", - }[{{ .Interval }}] + envoy_cluster_name=~"{{ namespace }}_{{ target }}-canary_[0-9a-zA-Z-]+", + }[{{ interval }}] ) ) * 100`, @@ -31,19 +34,19 @@ var contourQueries = map[string]string{ sum( rate( envoy_cluster_upstream_rq_time_bucket{ - envoy_cluster_name=~"{{ .Namespace }}_{{ .Name }}-canary_[0-9a-zA-Z-]+", - }[{{ .Interval }}] + envoy_cluster_name=~"{{ namespace }}_{{ target }}-canary_[0-9a-zA-Z-]+", + }[{{ interval }}] ) ) by (le) )`, } type ContourObserver struct { - client *PrometheusClient + client providers.Interface } -func (ob *ContourObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, contourQueries["request-success-rate"]) +func (ob *ContourObserver) GetRequestSuccessRate(model flaggerv1.MetricTemplateModel) (float64, error) { + query, err := RenderQuery(contourQueries["request-success-rate"], model) if err != nil { return 0, err } @@ -56,8 +59,8 @@ func (ob *ContourObserver) GetRequestSuccessRate(name string, namespace string, return value, nil } -func (ob *ContourObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, contourQueries["request-duration"]) +func (ob *ContourObserver) GetRequestDuration(model flaggerv1.MetricTemplateModel) (time.Duration, error) { + query, err := RenderQuery(contourQueries["request-duration"], model) if err != nil { return 0, err } diff --git a/pkg/metrics/contour_test.go b/pkg/metrics/observers/contour_test.go similarity index 68% rename from pkg/metrics/contour_test.go rename to pkg/metrics/observers/contour_test.go index 77a4c495..7b743da2 100644 --- a/pkg/metrics/contour_test.go +++ b/pkg/metrics/observers/contour_test.go @@ -1,10 +1,13 @@ -package metrics +package observers import ( "net/http" "net/http/httptest" "testing" "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" ) func TestContourObserver_GetRequestSuccessRate(t *testing.T) { @@ -21,7 +24,11 @@ func TestContourObserver_GetRequestSuccessRate(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -30,7 +37,13 @@ func TestContourObserver_GetRequestSuccessRate(t *testing.T) { client: client, } - val, err := observer.GetRequestSuccessRate("podinfo", "default", "1m") + val, err := observer.GetRequestSuccessRate(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } @@ -54,7 +67,11 @@ func TestContourObserver_GetRequestDuration(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -63,7 +80,13 @@ func TestContourObserver_GetRequestDuration(t *testing.T) { client: client, } - val, err := observer.GetRequestDuration("podinfo", "default", "1m") + val, err := observer.GetRequestDuration(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/metrics/observers/crossover.go b/pkg/metrics/observers/crossover.go new file mode 100644 index 00000000..f0cee774 --- /dev/null +++ b/pkg/metrics/observers/crossover.go @@ -0,0 +1,76 @@ +package observers + +import ( + "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" +) + +var crossoverQueries = map[string]string{ + "request-success-rate": ` + sum( + rate( + envoy_cluster_upstream_rq{ + kubernetes_namespace="{{ namespace }}", + envoy_cluster_name=~"{{ target }}-canary", + envoy_response_code!~"5.*" + }[{{ interval }}] + ) + ) + / + sum( + rate( + envoy_cluster_upstream_rq{ + kubernetes_namespace="{{ namespace }}", + envoy_cluster_name=~"{{ target }}-canary" + }[{{ interval }}] + ) + ) + * 100`, + "request-duration": ` + histogram_quantile( + 0.99, + sum( + rate( + envoy_cluster_upstream_rq_time_bucket{ + kubernetes_namespace="{{ namespace }}", + envoy_cluster_name=~"{{ target }}-canary" + }[{{ interval }}] + ) + ) by (le) + )`, +} + +type CrossoverObserver struct { + client providers.Interface +} + +func (ob *CrossoverObserver) GetRequestSuccessRate(model flaggerv1.MetricTemplateModel) (float64, error) { + query, err := RenderQuery(crossoverQueries["request-success-rate"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + return value, nil +} + +func (ob *CrossoverObserver) GetRequestDuration(model flaggerv1.MetricTemplateModel) (time.Duration, error) { + query, err := RenderQuery(crossoverQueries["request-duration"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + ms := time.Duration(int64(value)) * time.Millisecond + return ms, nil +} diff --git a/pkg/metrics/observers/crossover_service.go b/pkg/metrics/observers/crossover_service.go new file mode 100644 index 00000000..35c0412e --- /dev/null +++ b/pkg/metrics/observers/crossover_service.go @@ -0,0 +1,76 @@ +package observers + +import ( + "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" +) + +var crossoverServiceQueries = map[string]string{ + "request-success-rate": ` + sum( + rate( + envoy_cluster_upstream_rq{ + kubernetes_namespace="{{ namespace }}", + envoy_cluster_name="{{ target }}-canary", + envoy_response_code!~"5.*" + }[{{ interval }}] + ) + ) + / + sum( + rate( + envoy_cluster_upstream_rq{ + kubernetes_namespace="{{ namespace }}", + envoy_cluster_name="{{ target }}-canary" + }[{{ interval }}] + ) + ) + * 100`, + "request-duration": ` + histogram_quantile( + 0.99, + sum( + rate( + envoy_cluster_upstream_rq_time_bucket{ + kubernetes_namespace="{{ namespace }}", + envoy_cluster_name="{{ target }}-canary" + }[{{ interval }}] + ) + ) by (le) + )`, +} + +type CrossoverServiceObserver struct { + client providers.Interface +} + +func (ob *CrossoverServiceObserver) GetRequestSuccessRate(model flaggerv1.MetricTemplateModel) (float64, error) { + query, err := RenderQuery(crossoverServiceQueries["request-success-rate"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + return value, nil +} + +func (ob *CrossoverServiceObserver) GetRequestDuration(model flaggerv1.MetricTemplateModel) (time.Duration, error) { + query, err := RenderQuery(crossoverServiceQueries["request-duration"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + ms := time.Duration(int64(value)) * time.Millisecond + return ms, nil +} diff --git a/pkg/metrics/crossover_service_test.go b/pkg/metrics/observers/crossover_service_test.go similarity index 68% rename from pkg/metrics/crossover_service_test.go rename to pkg/metrics/observers/crossover_service_test.go index 8d65bbab..d1503506 100644 --- a/pkg/metrics/crossover_service_test.go +++ b/pkg/metrics/observers/crossover_service_test.go @@ -1,10 +1,13 @@ -package metrics +package observers import ( "net/http" "net/http/httptest" "testing" "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" ) func TestCrossoverServiceObserver_GetRequestSuccessRate(t *testing.T) { @@ -21,7 +24,11 @@ func TestCrossoverServiceObserver_GetRequestSuccessRate(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -30,7 +37,13 @@ func TestCrossoverServiceObserver_GetRequestSuccessRate(t *testing.T) { client: client, } - val, err := observer.GetRequestSuccessRate("podinfo", "default", "1m") + val, err := observer.GetRequestSuccessRate(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } @@ -54,7 +67,11 @@ func TestCrossoverServiceObserver_GetRequestDuration(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -63,7 +80,13 @@ func TestCrossoverServiceObserver_GetRequestDuration(t *testing.T) { client: client, } - val, err := observer.GetRequestDuration("podinfo", "default", "1m") + val, err := observer.GetRequestDuration(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/metrics/crossover_test.go b/pkg/metrics/observers/crossover_test.go similarity index 68% rename from pkg/metrics/crossover_test.go rename to pkg/metrics/observers/crossover_test.go index dd788a6f..0ab2cbee 100644 --- a/pkg/metrics/crossover_test.go +++ b/pkg/metrics/observers/crossover_test.go @@ -1,10 +1,13 @@ -package metrics +package observers import ( "net/http" "net/http/httptest" "testing" "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" ) func TestCrossoverObserver_GetRequestSuccessRate(t *testing.T) { @@ -21,7 +24,11 @@ func TestCrossoverObserver_GetRequestSuccessRate(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -30,7 +37,13 @@ func TestCrossoverObserver_GetRequestSuccessRate(t *testing.T) { client: client, } - val, err := observer.GetRequestSuccessRate("podinfo", "default", "1m") + val, err := observer.GetRequestSuccessRate(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } @@ -54,7 +67,11 @@ func TestCrossoverObserver_GetRequestDuration(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -63,7 +80,13 @@ func TestCrossoverObserver_GetRequestDuration(t *testing.T) { client: client, } - val, err := observer.GetRequestDuration("podinfo", "default", "1m") + val, err := observer.GetRequestDuration(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/metrics/factory.go b/pkg/metrics/observers/factory.go similarity index 74% rename from pkg/metrics/factory.go rename to pkg/metrics/observers/factory.go index 0edc7386..b6f02902 100644 --- a/pkg/metrics/factory.go +++ b/pkg/metrics/observers/factory.go @@ -1,16 +1,22 @@ -package metrics +package observers import ( "strings" - "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" ) type Factory struct { - Client *PrometheusClient + Client providers.Interface } -func NewFactory(metricsServer string, timeout time.Duration) (*Factory, error) { - client, err := NewPrometheusClient(metricsServer, timeout) +func NewFactory(metricsServer string) (*Factory, error) { + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: metricsServer, + SecretRef: nil, + }, nil) if err != nil { return nil, err } diff --git a/pkg/metrics/gloo.go b/pkg/metrics/observers/gloo.go similarity index 52% rename from pkg/metrics/gloo.go rename to pkg/metrics/observers/gloo.go index 67afd661..cbde1239 100644 --- a/pkg/metrics/gloo.go +++ b/pkg/metrics/observers/gloo.go @@ -1,7 +1,10 @@ -package metrics +package observers import ( "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" ) //envoy_cluster_name="test-podinfo-primary-9898_gloo-system" @@ -11,17 +14,17 @@ var glooQueries = map[string]string{ sum( rate( envoy_cluster_upstream_rq{ - envoy_cluster_name=~"{{ .Namespace }}-{{ .Name }}-canary-[0-9a-zA-Z-]+_[0-9a-zA-Z-]+", + envoy_cluster_name=~"{{ namespace }}-{{ target }}-canary-[0-9a-zA-Z-]+_[0-9a-zA-Z-]+", envoy_response_code!~"5.*" - }[{{ .Interval }}] + }[{{ interval }}] ) ) / sum( rate( envoy_cluster_upstream_rq{ - envoy_cluster_name=~"{{ .Namespace }}-{{ .Name }}-canary-[0-9a-zA-Z-]+_[0-9a-zA-Z-]+", - }[{{ .Interval }}] + envoy_cluster_name=~"{{ namespace }}-{{ target }}-canary-[0-9a-zA-Z-]+_[0-9a-zA-Z-]+", + }[{{ interval }}] ) ) * 100`, @@ -31,19 +34,19 @@ var glooQueries = map[string]string{ sum( rate( envoy_cluster_upstream_rq_time_bucket{ - envoy_cluster_name=~"{{ .Namespace }}-{{ .Name }}-canary-[0-9a-zA-Z-]+_[0-9a-zA-Z-]+", - }[{{ .Interval }}] + envoy_cluster_name=~"{{ namespace }}-{{ target }}-canary-[0-9a-zA-Z-]+_[0-9a-zA-Z-]+", + }[{{ interval }}] ) ) by (le) )`, } type GlooObserver struct { - client *PrometheusClient + client providers.Interface } -func (ob *GlooObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, glooQueries["request-success-rate"]) +func (ob *GlooObserver) GetRequestSuccessRate(model flaggerv1.MetricTemplateModel) (float64, error) { + query, err := RenderQuery(glooQueries["request-success-rate"], model) if err != nil { return 0, err } @@ -56,8 +59,8 @@ func (ob *GlooObserver) GetRequestSuccessRate(name string, namespace string, int return value, nil } -func (ob *GlooObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { - query, err := ob.client.RenderQuery(name, namespace, interval, glooQueries["request-duration"]) +func (ob *GlooObserver) GetRequestDuration(model flaggerv1.MetricTemplateModel) (time.Duration, error) { + query, err := RenderQuery(glooQueries["request-duration"], model) if err != nil { return 0, err } diff --git a/pkg/metrics/gloo_test.go b/pkg/metrics/observers/gloo_test.go similarity index 68% rename from pkg/metrics/gloo_test.go rename to pkg/metrics/observers/gloo_test.go index 0244e694..52ad4512 100644 --- a/pkg/metrics/gloo_test.go +++ b/pkg/metrics/observers/gloo_test.go @@ -1,10 +1,13 @@ -package metrics +package observers import ( "net/http" "net/http/httptest" "testing" "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" ) func TestGlooObserver_GetRequestSuccessRate(t *testing.T) { @@ -21,7 +24,11 @@ func TestGlooObserver_GetRequestSuccessRate(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -30,7 +37,13 @@ func TestGlooObserver_GetRequestSuccessRate(t *testing.T) { client: client, } - val, err := observer.GetRequestSuccessRate("podinfo", "default", "1m") + val, err := observer.GetRequestSuccessRate(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } @@ -54,7 +67,11 @@ func TestGlooObserver_GetRequestDuration(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -63,7 +80,13 @@ func TestGlooObserver_GetRequestDuration(t *testing.T) { client: client, } - val, err := observer.GetRequestDuration("podinfo", "default", "1m") + val, err := observer.GetRequestDuration(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/metrics/observers/http.go b/pkg/metrics/observers/http.go new file mode 100644 index 00000000..476ae3ba --- /dev/null +++ b/pkg/metrics/observers/http.go @@ -0,0 +1,76 @@ +package observers + +import ( + "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" +) + +var httpQueries = map[string]string{ + "request-success-rate": ` + sum( + rate( + http_request_duration_seconds_count{ + kubernetes_namespace="{{ namespace }}", + kubernetes_pod_name=~"{{ target }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)", + status!~"5.*" + }[{{ interval }}] + ) + ) + / + sum( + rate( + http_request_duration_seconds_count{ + kubernetes_namespace="{{ namespace }}", + kubernetes_pod_name=~"{{ target }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)" + }[{{ interval }}] + ) + ) + * 100`, + "request-duration": ` + histogram_quantile( + 0.99, + sum( + rate( + http_request_duration_seconds_bucket{ + kubernetes_namespace="{{ namespace }}", + kubernetes_pod_name=~"{{ target }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)" + }[{{ interval }}] + ) + ) by (le) + )`, +} + +type HttpObserver struct { + client providers.Interface +} + +func (ob *HttpObserver) GetRequestSuccessRate(model flaggerv1.MetricTemplateModel) (float64, error) { + query, err := RenderQuery(httpQueries["request-success-rate"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + return value, nil +} + +func (ob *HttpObserver) GetRequestDuration(model flaggerv1.MetricTemplateModel) (time.Duration, error) { + query, err := RenderQuery(httpQueries["request-duration"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + ms := time.Duration(int64(value*1000)) * time.Millisecond + return ms, nil +} diff --git a/pkg/metrics/http_test.go b/pkg/metrics/observers/http_test.go similarity index 69% rename from pkg/metrics/http_test.go rename to pkg/metrics/observers/http_test.go index 6b11c04b..3daf8dbd 100644 --- a/pkg/metrics/http_test.go +++ b/pkg/metrics/observers/http_test.go @@ -1,10 +1,13 @@ -package metrics +package observers import ( "net/http" "net/http/httptest" "testing" "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" ) func TestHttpObserver_GetRequestSuccessRate(t *testing.T) { @@ -21,7 +24,11 @@ func TestHttpObserver_GetRequestSuccessRate(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -30,7 +37,13 @@ func TestHttpObserver_GetRequestSuccessRate(t *testing.T) { client: client, } - val, err := observer.GetRequestSuccessRate("podinfo", "default", "1m") + val, err := observer.GetRequestSuccessRate(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } @@ -54,7 +67,11 @@ func TestHttpObserver_GetRequestDuration(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -63,7 +80,13 @@ func TestHttpObserver_GetRequestDuration(t *testing.T) { client: client, } - val, err := observer.GetRequestDuration("podinfo", "default", "1m") + val, err := observer.GetRequestDuration(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/metrics/observers/istio.go b/pkg/metrics/observers/istio.go new file mode 100644 index 00000000..9c9df838 --- /dev/null +++ b/pkg/metrics/observers/istio.go @@ -0,0 +1,79 @@ +package observers + +import ( + "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" +) + +var istioQueries = map[string]string{ + "request-success-rate": ` + sum( + rate( + istio_requests_total{ + reporter="destination", + destination_workload_namespace="{{ namespace }}", + destination_workload=~"{{ target }}", + response_code!~"5.*" + }[{{ interval }}] + ) + ) + / + sum( + rate( + istio_requests_total{ + reporter="destination", + destination_workload_namespace="{{ namespace }}", + destination_workload=~"{{ target }}" + }[{{ interval }}] + ) + ) + * 100`, + "request-duration": ` + histogram_quantile( + 0.99, + sum( + rate( + istio_request_duration_seconds_bucket{ + reporter="destination", + destination_workload_namespace="{{ namespace }}", + destination_workload=~"{{ target }}" + }[{{ interval }}] + ) + ) by (le) + )`, +} + +type IstioObserver struct { + client providers.Interface +} + +func (ob *IstioObserver) GetRequestSuccessRate(model flaggerv1.MetricTemplateModel) (float64, error) { + query, err := RenderQuery(istioQueries["request-success-rate"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + return value, nil +} + +func (ob *IstioObserver) GetRequestDuration(model flaggerv1.MetricTemplateModel) (time.Duration, error) { + query, err := RenderQuery(istioQueries["request-duration"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + ms := time.Duration(int64(value*1000)) * time.Millisecond + return ms, nil +} diff --git a/pkg/metrics/istio_test.go b/pkg/metrics/observers/istio_test.go similarity index 69% rename from pkg/metrics/istio_test.go rename to pkg/metrics/observers/istio_test.go index 76fff3d3..f3dd62a9 100644 --- a/pkg/metrics/istio_test.go +++ b/pkg/metrics/observers/istio_test.go @@ -1,10 +1,13 @@ -package metrics +package observers import ( "net/http" "net/http/httptest" "testing" "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" ) func TestIstioObserver_GetRequestSuccessRate(t *testing.T) { @@ -21,7 +24,11 @@ func TestIstioObserver_GetRequestSuccessRate(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -30,7 +37,13 @@ func TestIstioObserver_GetRequestSuccessRate(t *testing.T) { client: client, } - val, err := observer.GetRequestSuccessRate("podinfo", "default", "1m") + val, err := observer.GetRequestSuccessRate(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } @@ -54,7 +67,11 @@ func TestIstioObserver_GetRequestDuration(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -63,7 +80,13 @@ func TestIstioObserver_GetRequestDuration(t *testing.T) { client: client, } - val, err := observer.GetRequestDuration("podinfo", "default", "1m") + val, err := observer.GetRequestDuration(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/metrics/observers/linkerd.go b/pkg/metrics/observers/linkerd.go new file mode 100644 index 00000000..e54fd844 --- /dev/null +++ b/pkg/metrics/observers/linkerd.go @@ -0,0 +1,79 @@ +package observers + +import ( + "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" +) + +var linkerdQueries = map[string]string{ + "request-success-rate": ` + sum( + rate( + response_total{ + namespace="{{ namespace }}", + deployment=~"{{ target }}", + classification!="failure", + direction="inbound" + }[{{ interval }}] + ) + ) + / + sum( + rate( + response_total{ + namespace="{{ namespace }}", + deployment=~"{{ target }}", + direction="inbound" + }[{{ interval }}] + ) + ) + * 100`, + "request-duration": ` + histogram_quantile( + 0.99, + sum( + rate( + response_latency_ms_bucket{ + namespace="{{ namespace }}", + deployment=~"{{ target }}", + direction="inbound" + }[{{ interval }}] + ) + ) by (le) + )`, +} + +type LinkerdObserver struct { + client providers.Interface +} + +func (ob *LinkerdObserver) GetRequestSuccessRate(model flaggerv1.MetricTemplateModel) (float64, error) { + query, err := RenderQuery(linkerdQueries["request-success-rate"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + return value, nil +} + +func (ob *LinkerdObserver) GetRequestDuration(model flaggerv1.MetricTemplateModel) (time.Duration, error) { + query, err := RenderQuery(linkerdQueries["request-duration"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + ms := time.Duration(int64(value)) * time.Millisecond + return ms, nil +} diff --git a/pkg/metrics/linkerd_test.go b/pkg/metrics/observers/linkerd_test.go similarity index 67% rename from pkg/metrics/linkerd_test.go rename to pkg/metrics/observers/linkerd_test.go index 920d0cac..94d54c4c 100644 --- a/pkg/metrics/linkerd_test.go +++ b/pkg/metrics/observers/linkerd_test.go @@ -1,10 +1,13 @@ -package metrics +package observers import ( "net/http" "net/http/httptest" "testing" "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" ) func TestLinkerdObserver_GetRequestSuccessRate(t *testing.T) { @@ -21,7 +24,11 @@ func TestLinkerdObserver_GetRequestSuccessRate(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -30,7 +37,13 @@ func TestLinkerdObserver_GetRequestSuccessRate(t *testing.T) { client: client, } - val, err := observer.GetRequestSuccessRate("podinfo", "default", "1m") + val, err := observer.GetRequestSuccessRate(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } @@ -54,7 +67,11 @@ func TestLinkerdObserver_GetRequestDuration(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -63,7 +80,13 @@ func TestLinkerdObserver_GetRequestDuration(t *testing.T) { client: client, } - val, err := observer.GetRequestDuration("podinfo", "default", "1m") + val, err := observer.GetRequestDuration(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "default", + Target: "podinfo", + Service: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/metrics/observers/nginx.go b/pkg/metrics/observers/nginx.go new file mode 100644 index 00000000..f74a41df --- /dev/null +++ b/pkg/metrics/observers/nginx.go @@ -0,0 +1,83 @@ +package observers + +import ( + "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" +) + +var nginxQueries = map[string]string{ + "request-success-rate": ` + sum( + rate( + nginx_ingress_controller_requests{ + namespace="{{ namespace }}", + ingress="{{ ingress }}", + status!~"5.*" + }[{{ interval }}] + ) + ) + / + sum( + rate( + nginx_ingress_controller_requests{ + namespace="{{ namespace }}", + ingress="{{ ingress }}" + }[{{ interval }}] + ) + ) + * 100`, + "request-duration": ` + sum( + rate( + nginx_ingress_controller_ingress_upstream_latency_seconds_sum{ + namespace="{{ namespace }}", + ingress="{{ ingress }}" + }[{{ interval }}] + ) + ) + / + sum( + rate( + nginx_ingress_controller_ingress_upstream_latency_seconds_count{ + namespace="{{ namespace }}", + ingress="{{ ingress }}" + }[{{ interval }}] + ) + ) + * 1000`, +} + +type NginxObserver struct { + client providers.Interface +} + +func (ob *NginxObserver) GetRequestSuccessRate(model flaggerv1.MetricTemplateModel) (float64, error) { + query, err := RenderQuery(nginxQueries["request-success-rate"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + return value, nil +} + +func (ob *NginxObserver) GetRequestDuration(model flaggerv1.MetricTemplateModel) (time.Duration, error) { + query, err := RenderQuery(nginxQueries["request-duration"], model) + if err != nil { + return 0, err + } + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err + } + + ms := time.Duration(int64(value)) * time.Millisecond + return ms, nil +} diff --git a/pkg/metrics/nginx_test.go b/pkg/metrics/observers/nginx_test.go similarity index 68% rename from pkg/metrics/nginx_test.go rename to pkg/metrics/observers/nginx_test.go index de3613d0..b51fcded 100644 --- a/pkg/metrics/nginx_test.go +++ b/pkg/metrics/observers/nginx_test.go @@ -1,10 +1,13 @@ -package metrics +package observers import ( "net/http" "net/http/httptest" "testing" "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "github.com/weaveworks/flagger/pkg/metrics/providers" ) func TestNginxObserver_GetRequestSuccessRate(t *testing.T) { @@ -21,7 +24,11 @@ func TestNginxObserver_GetRequestSuccessRate(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -30,7 +37,13 @@ func TestNginxObserver_GetRequestSuccessRate(t *testing.T) { client: client, } - val, err := observer.GetRequestSuccessRate("podinfo", "nginx", "1m") + val, err := observer.GetRequestSuccessRate(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "nginx", + Target: "podinfo", + Ingress: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } @@ -54,7 +67,11 @@ func TestNginxObserver_GetRequestDuration(t *testing.T) { })) defer ts.Close() - client, err := NewPrometheusClient(ts.URL, time.Second) + client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{ + Type: "prometheus", + Address: ts.URL, + SecretRef: nil, + }, nil) if err != nil { t.Fatal(err) } @@ -63,7 +80,13 @@ func TestNginxObserver_GetRequestDuration(t *testing.T) { client: client, } - val, err := observer.GetRequestDuration("podinfo", "nginx", "1m") + val, err := observer.GetRequestDuration(flaggerv1.MetricTemplateModel{ + Name: "podinfo", + Namespace: "nginx", + Target: "podinfo", + Ingress: "podinfo", + Interval: "1m", + }) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/metrics/observers/observer.go b/pkg/metrics/observers/observer.go new file mode 100644 index 00000000..59cb0b70 --- /dev/null +++ b/pkg/metrics/observers/observer.go @@ -0,0 +1,11 @@ +package observers + +import ( + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" + "time" +) + +type Interface interface { + GetRequestSuccessRate(model flaggerv1.MetricTemplateModel) (float64, error) + GetRequestDuration(model flaggerv1.MetricTemplateModel) (time.Duration, error) +} diff --git a/pkg/metrics/observers/render.go b/pkg/metrics/observers/render.go new file mode 100644 index 00000000..a59917c9 --- /dev/null +++ b/pkg/metrics/observers/render.go @@ -0,0 +1,29 @@ +package observers + +import ( + "bufio" + "bytes" + "text/template" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha1" +) + +func RenderQuery(queryTemplate string, model flaggerv1.MetricTemplateModel) (string, error) { + t, err := template.New("tmpl").Funcs(model.TemplateFunctions()).Parse(queryTemplate) + if err != nil { + return "", err + } + var data bytes.Buffer + b := bufio.NewWriter(&data) + + if err := t.Execute(b, nil); err != nil { + return "", err + } + + err = b.Flush() + if err != nil { + return "", err + } + + return data.String(), nil +}