package controller import ( "context" "fmt" "strings" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/retry" flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1" "github.com/weaveworks/flagger/pkg/canary" "github.com/weaveworks/flagger/pkg/router" ) func (c *Controller) min(a int, b int) int { if a < b { return a } return b } func (c *Controller) maxWeight(canary *flaggerv1.Canary) int { var stepWeightsLen = len(canary.GetAnalysis().StepWeights) if stepWeightsLen > 0 { return c.min(c.totalWeight(canary), canary.GetAnalysis().StepWeights[stepWeightsLen-1]) } if canary.GetAnalysis().MaxWeight > 0 { return canary.GetAnalysis().MaxWeight } // set max weight default value to total weight return c.totalWeight(canary) } func (c *Controller) totalWeight(canary *flaggerv1.Canary) int { // set total weight default value to 100% return 100 } func (c *Controller) nextStepWeight(canary *flaggerv1.Canary, canaryWeight int) int { var stepWeightsLen = len(canary.GetAnalysis().StepWeights) if canary.GetAnalysis().StepWeight > 0 || stepWeightsLen == 0 { return canary.GetAnalysis().StepWeight } totalWeight := c.totalWeight(canary) maxStep := totalWeight - canaryWeight // If maxStep is zero we need to promote, so any non zero step weight will move the canary to promotion. // This is the same use case as the last step via StepWeight. if maxStep == 0 { return 1 } // return min of maxStep and the calculated step to avoid going above totalWeight // initial step if canaryWeight == 0 { return c.min(maxStep, canary.GetAnalysis().StepWeights[0]) } // find the current step and return the difference in weight for i := 0; i < stepWeightsLen-1; i++ { if canary.GetAnalysis().StepWeights[i] == canaryWeight { return c.min(maxStep, canary.GetAnalysis().StepWeights[i+1]-canaryWeight) } } return maxStep } // scheduleCanaries synchronises the canary map with the jobs map, // for new canaries new jobs are created and started // for the removed canaries the jobs are stopped and deleted func (c *Controller) scheduleCanaries() { current := make(map[string]string) stats := make(map[string]int) c.canaries.Range(func(key interface{}, value interface{}) bool { cn := value.(*flaggerv1.Canary) // format: . name := key.(string) current[name] = fmt.Sprintf("%s.%s", cn.Spec.TargetRef.Name, cn.Namespace) job, exists := c.jobs[name] // schedule new job for existing job with different analysis interval or non-existing job if (exists && job.GetCanaryAnalysisInterval() != cn.GetAnalysisInterval()) || !exists { if exists { job.Stop() } newJob := CanaryJob{ Name: cn.Name, Namespace: cn.Namespace, function: c.advanceCanary, done: make(chan bool), ticker: time.NewTicker(cn.GetAnalysisInterval()), analysisInterval: cn.GetAnalysisInterval(), } c.jobs[name] = newJob newJob.Start() } // compute canaries per namespace total t, ok := stats[cn.Namespace] if !ok { stats[cn.Namespace] = 1 } else { stats[cn.Namespace] = t + 1 } return true }) // cleanup deleted jobs for job := range c.jobs { if _, exists := current[job]; !exists { c.jobs[job].Stop() delete(c.jobs, job) } } // check if multiple canaries have the same target for canaryName, targetName := range current { for name, target := range current { if name != canaryName && target == targetName { c.logger.With("canary", canaryName). Errorf("Bad things will happen! Found more than one canary with the same target %s", targetName) } } } // set total canaries per namespace metric for k, v := range stats { c.recorder.SetTotal(k, v) } } func (c *Controller) advanceCanary(name string, namespace string) { begin := time.Now() // check if the canary exists cd, err := c.flaggerClient.FlaggerV1beta1().Canaries(namespace).Get(context.TODO(), name, metav1.GetOptions{}) if err != nil { c.logger.With("canary", fmt.Sprintf("%s.%s", name, namespace)). Errorf("Canary %s.%s not found", name, namespace) return } // override the global provider if one is specified in the canary spec provider := c.meshProvider if cd.Spec.Provider != "" { provider = cd.Spec.Provider } // init controller based on target kind canaryController := c.canaryFactory.Controller(cd.Spec.TargetRef.Kind) labelSelector, labelValue, ports, err := canaryController.GetMetadata(cd) if err != nil { c.recordEventWarningf(cd, "%v", err) return } // init Kubernetes router kubeRouter := c.routerFactory.KubernetesRouter(cd.Spec.TargetRef.Kind, labelSelector, labelValue, ports) // reconcile the canary/primary services if err := kubeRouter.Initialize(cd); err != nil { c.recordEventWarningf(cd, "%v", err) return } // check metric servers' availability if !cd.SkipAnalysis() && (cd.Status.Phase == "" || cd.Status.Phase == flaggerv1.CanaryPhaseInitializing) { if err := c.checkMetricProviderAvailability(cd); err != nil { c.recordEventErrorf(cd, "Error checking metric providers: %v", err) } } // init mesh router meshRouter := c.routerFactory.MeshRouter(provider, labelSelector) // register the AppMesh VirtualNodes before creating the primary deployment // otherwise the pods will not be injected with the Envoy proxy if strings.HasPrefix(provider, flaggerv1.AppMeshProvider) { if err := meshRouter.Reconcile(cd); err != nil { c.recordEventWarningf(cd, "%v", err) return } } // create primary workload err = canaryController.Initialize(cd) if err != nil { c.recordEventWarningf(cd, "%v", err) return } // change the apex service pod selector to primary if err := kubeRouter.Reconcile(cd); err != nil { c.recordEventWarningf(cd, "%v", err) return } // take over an existing virtual service or ingress // runs after the primary is ready to ensure zero downtime if !strings.HasPrefix(provider, flaggerv1.AppMeshProvider) { if err := meshRouter.Reconcile(cd); err != nil { c.recordEventWarningf(cd, "%v", err) return } } // check for changes shouldAdvance, err := c.shouldAdvance(cd, canaryController) if err != nil { c.recordEventWarningf(cd, "%v", err) return } if !shouldAdvance { c.recorder.SetStatus(cd, cd.Status.Phase) return } // check gates if isApproved := c.runConfirmRolloutHooks(cd, canaryController); !isApproved { return } maxWeight := c.maxWeight(cd) // check primary status if !cd.SkipAnalysis() { if err := canaryController.IsPrimaryReady(cd); err != nil { c.recordEventWarningf(cd, "%v", err) return } } // get the routing settings primaryWeight, canaryWeight, mirrored, err := meshRouter.GetRoutes(cd) if err != nil { c.recordEventWarningf(cd, "%v", err) return } c.recorder.SetWeight(cd, primaryWeight, canaryWeight) // check if canary analysis should start (canary revision has changes) or continue if ok := c.checkCanaryStatus(cd, canaryController, shouldAdvance); !ok { return } // check if canary revision changed during analysis if restart := c.hasCanaryRevisionChanged(cd, canaryController); restart { c.recordEventInfof(cd, "New revision detected! Restarting analysis for %s.%s", cd.Spec.TargetRef.Name, cd.Namespace) // route all traffic back to primary primaryWeight = c.totalWeight(cd) canaryWeight = 0 if err := meshRouter.SetRoutes(cd, primaryWeight, canaryWeight, false); err != nil { c.recordEventWarningf(cd, "%v", err) return } // reset status status := flaggerv1.CanaryStatus{ Phase: flaggerv1.CanaryPhaseProgressing, CanaryWeight: 0, FailedChecks: 0, Iterations: 0, } if err := canaryController.SyncStatus(cd, status); err != nil { c.recordEventWarningf(cd, "%v", err) } return } // check canary status var retriable = true retriable, err = canaryController.IsCanaryReady(cd) if err != nil && retriable { c.recordEventWarningf(cd, "%v", err) return } // check if analysis should be skipped if skip := c.shouldSkipAnalysis(cd, canaryController, meshRouter, err, retriable); skip { return } // check if we should rollback if cd.Status.Phase == flaggerv1.CanaryPhaseProgressing || cd.Status.Phase == flaggerv1.CanaryPhaseWaiting { if ok := c.runRollbackHooks(cd, cd.Status.Phase); ok { c.recordEventWarningf(cd, "Rolling back %s.%s manual webhook invoked", cd.Name, cd.Namespace) c.alert(cd, "Rolling back manual webhook invoked", false, flaggerv1.SeverityWarn) c.rollback(cd, canaryController, meshRouter) return } } // route traffic back to primary if analysis has succeeded if cd.Status.Phase == flaggerv1.CanaryPhasePromoting { c.runPromotionTrafficShift(cd, canaryController, meshRouter, provider, canaryWeight, primaryWeight) return } // scale canary to zero if promotion has finished if cd.Status.Phase == flaggerv1.CanaryPhaseFinalising { if err := canaryController.ScaleToZero(cd); err != nil { c.recordEventWarningf(cd, "%v", err) return } // set status to succeeded if err := canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhaseSucceeded); err != nil { c.recordEventWarningf(cd, "%v", err) return } c.recorder.SetStatus(cd, flaggerv1.CanaryPhaseSucceeded) c.runPostRolloutHooks(cd, flaggerv1.CanaryPhaseSucceeded) c.recordEventInfof(cd, "Promotion completed! Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace) c.alert(cd, "Canary analysis completed successfully, promotion finished.", false, flaggerv1.SeverityInfo) return } // check if the number of failed checks reached the threshold if cd.Status.Phase == flaggerv1.CanaryPhaseProgressing && (!retriable || cd.Status.FailedChecks >= cd.GetAnalysisThreshold()) { if !retriable { c.recordEventWarningf(cd, "Rolling back %s.%s progress deadline exceeded %v", cd.Name, cd.Namespace, err) c.alert(cd, fmt.Sprintf("Progress deadline exceeded %v", err), false, flaggerv1.SeverityError) } c.rollback(cd, canaryController, meshRouter) return } // record analysis duration defer func() { c.recorder.SetDuration(cd, time.Since(begin)) }() // check if the canary success rate is above the threshold // skip check if no traffic is routed or mirrored to canary if canaryWeight == 0 && cd.Status.Iterations == 0 && !(cd.GetAnalysis().Mirror && mirrored) { c.recordEventInfof(cd, "Starting canary analysis for %s.%s", cd.Spec.TargetRef.Name, cd.Namespace) // run pre-rollout web hooks if ok := c.runPreRolloutHooks(cd); !ok { if err := canaryController.SetStatusFailedChecks(cd, cd.Status.FailedChecks+1); err != nil { c.recordEventWarningf(cd, "%v", err) } return } } else { if ok := c.runAnalysis(cd); !ok { if err := canaryController.SetStatusFailedChecks(cd, cd.Status.FailedChecks+1); err != nil { c.recordEventWarningf(cd, "%v", err) } return } } // use blue/green strategy for kubernetes provider if provider == flaggerv1.KubernetesProvider { if len(cd.GetAnalysis().Match) > 0 { c.recordEventWarningf(cd, "A/B testing is not supported when using the kubernetes provider") cd.GetAnalysis().Match = nil } if cd.GetAnalysis().Iterations < 1 { c.recordEventWarningf(cd, "Progressive traffic is not supported when using the kubernetes provider") c.recordEventWarningf(cd, "Setting canaryAnalysis.iterations: 10") cd.GetAnalysis().Iterations = 10 } } // strategy: A/B testing if len(cd.GetAnalysis().Match) > 0 && cd.GetAnalysis().Iterations > 0 { c.runAB(cd, canaryController, meshRouter) return } // strategy: Blue/Green if cd.GetAnalysis().Iterations > 0 { c.runBlueGreen(cd, canaryController, meshRouter, provider, mirrored) return } // strategy: Canary progressive traffic increase if c.nextStepWeight(cd, canaryWeight) > 0 { c.runCanary(cd, canaryController, meshRouter, mirrored, canaryWeight, primaryWeight, maxWeight) } } func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string, canaryWeight int, primaryWeight int) { // finalize promotion since no traffic shifting is possible for Kubernetes CNI if provider == flaggerv1.KubernetesProvider { if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhaseFinalising); err != nil { c.recordEventWarningf(canary, "%v", err) } return } // route all traffic to primary in one go when promotion step wight is not set if canary.Spec.Analysis.StepWeightPromotion == 0 { c.recordEventInfof(canary, "Routing all traffic to primary") if err := meshRouter.SetRoutes(canary, c.totalWeight(canary), 0, false); err != nil { c.recordEventWarningf(canary, "%v", err) return } c.recorder.SetWeight(canary, c.totalWeight(canary), 0) if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhaseFinalising); err != nil { c.recordEventWarningf(canary, "%v", err) } return } // increment the primary traffic weight until it reaches total weight if canaryWeight > 0 { primaryWeight += canary.GetAnalysis().StepWeightPromotion if primaryWeight > c.totalWeight(canary) { primaryWeight = c.totalWeight(canary) } canaryWeight -= canary.GetAnalysis().StepWeightPromotion if canaryWeight < 0 { canaryWeight = 0 } if err := meshRouter.SetRoutes(canary, primaryWeight, canaryWeight, false); err != nil { c.recordEventWarningf(canary, "%v", err) return } c.recorder.SetWeight(canary, primaryWeight, canaryWeight) c.recordEventInfof(canary, "Advance %s.%s primary weight %v", canary.Name, canary.Namespace, primaryWeight) // finalize promotion if primaryWeight == c.totalWeight(canary) { if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhaseFinalising); err != nil { c.recordEventWarningf(canary, "%v", err) } } else { if err := canaryController.SetStatusWeight(canary, canaryWeight); err != nil { c.recordEventWarningf(canary, "%v", err) } } } return } func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, mirrored bool, canaryWeight int, primaryWeight int, maxWeight int) { primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name) // increase traffic weight if canaryWeight < maxWeight { // If in "mirror" mode, do one step of mirroring before shifting traffic to canary. // When mirroring, all requests go to primary and canary, but only responses from // primary go back to the user. var nextStepWeight int nextStepWeight = c.nextStepWeight(canary, canaryWeight) if canary.GetAnalysis().Mirror && canaryWeight == 0 { if !mirrored { mirrored = true primaryWeight = c.totalWeight(canary) canaryWeight = 0 } else { mirrored = false primaryWeight = c.totalWeight(canary) - nextStepWeight canaryWeight = nextStepWeight } c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)). Infof("Running mirror step %d/%d/%t", primaryWeight, canaryWeight, mirrored) } else { primaryWeight -= nextStepWeight if primaryWeight < 0 { primaryWeight = 0 } canaryWeight += nextStepWeight if canaryWeight > c.totalWeight(canary) { canaryWeight = c.totalWeight(canary) } } if err := meshRouter.SetRoutes(canary, primaryWeight, canaryWeight, mirrored); err != nil { c.recordEventWarningf(canary, "%v", err) return } if err := canaryController.SetStatusWeight(canary, canaryWeight); err != nil { c.recordEventWarningf(canary, "%v", err) return } c.recorder.SetWeight(canary, primaryWeight, canaryWeight) c.recordEventInfof(canary, "Advance %s.%s canary weight %v", canary.Name, canary.Namespace, canaryWeight) return } // promote canary - max weight reached if canaryWeight >= maxWeight { // check promotion gate if promote := c.runConfirmPromotionHooks(canary); !promote { return } // update primary spec c.recordEventInfof(canary, "Copying %s.%s template spec to %s.%s", canary.Spec.TargetRef.Name, canary.Namespace, primaryName, canary.Namespace) if err := canaryController.Promote(canary); err != nil { c.recordEventWarningf(canary, "%v", err) return } // update status phase if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhasePromoting); err != nil { c.recordEventWarningf(canary, "%v", err) return } } } func (c *Controller) runAB(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface) { primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name) // route traffic to canary and increment iterations if canary.GetAnalysis().Iterations > canary.Status.Iterations { if err := meshRouter.SetRoutes(canary, 0, c.totalWeight(canary), false); err != nil { c.recordEventWarningf(canary, "%v", err) return } c.recorder.SetWeight(canary, 0, c.totalWeight(canary)) if err := canaryController.SetStatusIterations(canary, canary.Status.Iterations+1); err != nil { c.recordEventWarningf(canary, "%v", err) return } c.recordEventInfof(canary, "Advance %s.%s canary iteration %v/%v", canary.Name, canary.Namespace, canary.Status.Iterations+1, canary.GetAnalysis().Iterations) return } // check promotion gate if promote := c.runConfirmPromotionHooks(canary); !promote { return } // promote canary - max iterations reached if canary.GetAnalysis().Iterations == canary.Status.Iterations { c.recordEventInfof(canary, "Copying %s.%s template spec to %s.%s", canary.Spec.TargetRef.Name, canary.Namespace, primaryName, canary.Namespace) if err := canaryController.Promote(canary); err != nil { c.recordEventWarningf(canary, "%v", err) return } // update status phase if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhasePromoting); err != nil { c.recordEventWarningf(canary, "%v", err) return } } } func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string, mirrored bool) { primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name) // increment iterations if canary.GetAnalysis().Iterations > canary.Status.Iterations { // If in "mirror" mode, mirror requests during the entire B/G canary test if provider != "kubernetes" && canary.GetAnalysis().Mirror && !mirrored { if err := meshRouter.SetRoutes(canary, c.totalWeight(canary), 0, true); err != nil { c.recordEventWarningf(canary, "%v", err) } c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)). Infof("Start traffic mirroring") } if err := canaryController.SetStatusIterations(canary, canary.Status.Iterations+1); err != nil { c.recordEventWarningf(canary, "%v", err) return } c.recordEventInfof(canary, "Advance %s.%s canary iteration %v/%v", canary.Name, canary.Namespace, canary.Status.Iterations+1, canary.GetAnalysis().Iterations) return } // check promotion gate if promote := c.runConfirmPromotionHooks(canary); !promote { return } // route all traffic to canary - max iterations reached if canary.GetAnalysis().Iterations == canary.Status.Iterations { if provider != "kubernetes" { if canary.GetAnalysis().Mirror { c.recordEventInfof(canary, "Stop traffic mirroring and route all traffic to canary") } else { c.recordEventInfof(canary, "Routing all traffic to canary") } if err := meshRouter.SetRoutes(canary, 0, c.totalWeight(canary), false); err != nil { c.recordEventWarningf(canary, "%v", err) return } c.recorder.SetWeight(canary, 0, c.totalWeight(canary)) } // increment iterations if err := canaryController.SetStatusIterations(canary, canary.Status.Iterations+1); err != nil { c.recordEventWarningf(canary, "%v", err) return } return } // promote canary - max iterations reached if canary.GetAnalysis().Iterations < canary.Status.Iterations { c.recordEventInfof(canary, "Copying %s.%s template spec to %s.%s", canary.Spec.TargetRef.Name, canary.Namespace, primaryName, canary.Namespace) if err := canaryController.Promote(canary); err != nil { c.recordEventWarningf(canary, "%v", err) return } // update status phase if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhasePromoting); err != nil { c.recordEventWarningf(canary, "%v", err) return } } } func (c *Controller) runAnalysis(canary *flaggerv1.Canary) bool { // run external checks for _, webhook := range canary.GetAnalysis().Webhooks { if webhook.Type == "" || webhook.Type == flaggerv1.RolloutHook { err := CallWebhook(canary.Name, canary.Namespace, flaggerv1.CanaryPhaseProgressing, webhook) if err != nil { c.recordEventWarningf(canary, "Halt %s.%s advancement external check %s failed %v", canary.Name, canary.Namespace, webhook.Name, err) return false } } } ok := c.runBuiltinMetricChecks(canary) if !ok { return ok } ok = c.runMetricChecks(canary) if !ok { return ok } return true } func (c *Controller) shouldSkipAnalysis(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, err error, retriable bool) bool { if !canary.SkipAnalysis() { return false } // regardless if analysis is being skipped, rollback if canary failed to progress if !retriable || canary.Status.FailedChecks >= canary.GetAnalysisThreshold() { c.recordEventWarningf(canary, "Rolling back %s.%s progress deadline exceeded %v", canary.Name, canary.Namespace, err) c.alert(canary, fmt.Sprintf("Progress deadline exceeded %v", err), false, flaggerv1.SeverityError) c.rollback(canary, canaryController, meshRouter) return true } // route all traffic to primary primaryWeight := c.totalWeight(canary) canaryWeight := 0 if err := meshRouter.SetRoutes(canary, primaryWeight, canaryWeight, false); err != nil { c.recordEventWarningf(canary, "%v", err) return false } c.recorder.SetWeight(canary, primaryWeight, canaryWeight) // copy spec and configs from canary to primary c.recordEventInfof(canary, "Copying %s.%s template spec to %s-primary.%s", canary.Spec.TargetRef.Name, canary.Namespace, canary.Spec.TargetRef.Name, canary.Namespace) if err := canaryController.Promote(canary); err != nil { c.recordEventWarningf(canary, "%v", err) return false } // shutdown canary if err := canaryController.ScaleToZero(canary); err != nil { c.recordEventWarningf(canary, "%v", err) return false } // update status phase if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhaseSucceeded); err != nil { c.recordEventWarningf(canary, "%v", err) return false } // notify c.recorder.SetStatus(canary, flaggerv1.CanaryPhaseSucceeded) c.recordEventInfof(canary, "Promotion completed! Canary analysis was skipped for %s.%s", canary.Spec.TargetRef.Name, canary.Namespace) c.alert(canary, "Canary analysis was skipped, promotion finished.", false, flaggerv1.SeverityInfo) return true } func (c *Controller) shouldAdvance(canary *flaggerv1.Canary, canaryController canary.Controller) (bool, error) { if canary.Status.LastAppliedSpec == "" || canary.Status.Phase == flaggerv1.CanaryPhaseInitializing || canary.Status.Phase == flaggerv1.CanaryPhaseProgressing || canary.Status.Phase == flaggerv1.CanaryPhaseWaiting || canary.Status.Phase == flaggerv1.CanaryPhasePromoting || canary.Status.Phase == flaggerv1.CanaryPhaseFinalising { return true, nil } newTarget, err := canaryController.HasTargetChanged(canary) if err != nil { return false, err } if newTarget { return newTarget, nil } newCfg, err := canaryController.HaveDependenciesChanged(canary) if err != nil { return false, err } return newCfg, nil } func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryController canary.Controller, shouldAdvance bool) bool { c.recorder.SetStatus(canary, canary.Status.Phase) if canary.Status.Phase == flaggerv1.CanaryPhaseProgressing || canary.Status.Phase == flaggerv1.CanaryPhasePromoting || canary.Status.Phase == flaggerv1.CanaryPhaseFinalising { return true } var err error canary, err = c.flaggerClient.FlaggerV1beta1().Canaries(canary.Namespace).Get(context.TODO(), canary.Name, metav1.GetOptions{}) if err != nil { c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).Errorf("%v", err) return false } if canary.Status.Phase == "" || canary.Status.Phase == flaggerv1.CanaryPhaseInitializing { if err := canaryController.SyncStatus(canary, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryPhaseInitialized}); err != nil { c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).Errorf("%v", err) return false } c.recorder.SetStatus(canary, flaggerv1.CanaryPhaseInitialized) c.recordEventInfof(canary, "Initialization done! %s.%s", canary.Name, canary.Namespace) c.alert(canary, fmt.Sprintf("New %s detected, initialization completed.", canary.Spec.TargetRef.Kind), true, flaggerv1.SeverityInfo) return false } if shouldAdvance { canaryPhaseProgressing := canary.DeepCopy() canaryPhaseProgressing.Status.Phase = flaggerv1.CanaryPhaseProgressing c.recordEventInfof(canaryPhaseProgressing, "New revision detected! Scaling up %s.%s", canaryPhaseProgressing.Spec.TargetRef.Name, canaryPhaseProgressing.Namespace) c.alert(canaryPhaseProgressing, "New revision detected, progressing canary analysis.", true, flaggerv1.SeverityInfo) if err := canaryController.ScaleFromZero(canary); err != nil { c.recordEventErrorf(canary, "%v", err) return false } if err := canaryController.SyncStatus(canary, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryPhaseProgressing}); err != nil { c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).Errorf("%v", err) return false } c.recorder.SetStatus(canary, flaggerv1.CanaryPhaseProgressing) return false } return false } func (c *Controller) hasCanaryRevisionChanged(canary *flaggerv1.Canary, canaryController canary.Controller) bool { if canary.Status.Phase == flaggerv1.CanaryPhaseProgressing { if diff, _ := canaryController.HasTargetChanged(canary); diff { return true } if diff, _ := canaryController.HaveDependenciesChanged(canary); diff { return true } } return false } func (c *Controller) rollback(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface) { if canary.Status.FailedChecks >= canary.GetAnalysisThreshold() { c.recordEventWarningf(canary, "Rolling back %s.%s failed checks threshold reached %v", canary.Name, canary.Namespace, canary.Status.FailedChecks) c.alert(canary, fmt.Sprintf("Failed checks threshold reached %v", canary.Status.FailedChecks), false, flaggerv1.SeverityError) } // route all traffic back to primary primaryWeight := c.totalWeight(canary) canaryWeight := 0 if err := meshRouter.SetRoutes(canary, primaryWeight, canaryWeight, false); err != nil { c.recordEventWarningf(canary, "%v", err) return } canaryPhaseFailed := canary.DeepCopy() canaryPhaseFailed.Status.Phase = flaggerv1.CanaryPhaseFailed c.recordEventWarningf(canaryPhaseFailed, "Canary failed! Scaling down %s.%s", canaryPhaseFailed.Name, canaryPhaseFailed.Namespace) c.recorder.SetWeight(canary, primaryWeight, canaryWeight) // shutdown canary if err := canaryController.ScaleToZero(canary); err != nil { c.recordEventWarningf(canary, "%v", err) return } // mark canary as failed if err := canaryController.SyncStatus(canary, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryPhaseFailed, CanaryWeight: 0}); err != nil { c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).Errorf("%v", err) return } c.recorder.SetStatus(canary, flaggerv1.CanaryPhaseFailed) c.runPostRolloutHooks(canary, flaggerv1.CanaryPhaseFailed) } func (c *Controller) setPhaseInitializing(cd *flaggerv1.Canary) error { phase := flaggerv1.CanaryPhaseInitializing firstTry := true name, ns := cd.GetName(), cd.GetNamespace() err := retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { if !firstTry { cd, err = c.flaggerClient.FlaggerV1beta1().Canaries(ns).Get(context.TODO(), name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("canary %s.%s get query failed: %w", name, ns, err) } } if ok, conditions := canary.MakeStatusConditions(cd, phase); ok { cdCopy := cd.DeepCopy() cdCopy.Status.Conditions = conditions cdCopy.Status.LastTransitionTime = metav1.Now() cdCopy.Status.Phase = phase _, err = c.flaggerClient.FlaggerV1beta1().Canaries(cd.Namespace).UpdateStatus(context.TODO(), cdCopy, metav1.UpdateOptions{}) } firstTry = false return }) if err != nil { return fmt.Errorf("failed after retries: %w", err) } return nil }