mirror of
https://github.com/fluxcd/flagger.git
synced 2026-03-02 17:51:00 +00:00
Canary CRD refactoring
- set canaries.flagger.app version to v1alpha1 - replace old Canary spec with CanaryDeployment
This commit is contained in:
@@ -7,11 +7,11 @@ import (
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
istioclientset "github.com/knative/pkg/client/clientset/versioned"
|
||||
flaggerv1 "github.com/stefanprodan/flagger/pkg/apis/flagger/v1beta1"
|
||||
flaggerv1 "github.com/stefanprodan/flagger/pkg/apis/flagger/v1alpha1"
|
||||
clientset "github.com/stefanprodan/flagger/pkg/client/clientset/versioned"
|
||||
flaggerscheme "github.com/stefanprodan/flagger/pkg/client/clientset/versioned/scheme"
|
||||
flaggerinformers "github.com/stefanprodan/flagger/pkg/client/informers/externalversions/flagger/v1beta1"
|
||||
flaggerlisters "github.com/stefanprodan/flagger/pkg/client/listers/flagger/v1beta1"
|
||||
flaggerinformers "github.com/stefanprodan/flagger/pkg/client/informers/externalversions/flagger/v1alpha1"
|
||||
flaggerlisters "github.com/stefanprodan/flagger/pkg/client/listers/flagger/v1alpha1"
|
||||
"go.uber.org/zap"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
@@ -76,7 +76,7 @@ func NewController(
|
||||
}
|
||||
|
||||
rolloutInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: ctrl.enqueueRollout,
|
||||
AddFunc: ctrl.enqueue,
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
oldRoll, ok := checkCustomResourceType(old, logger)
|
||||
if !ok {
|
||||
@@ -89,7 +89,7 @@ func NewController(
|
||||
|
||||
if diff := cmp.Diff(newRoll.Spec, oldRoll.Spec); diff != "" {
|
||||
ctrl.logger.Debugf("Diff detected %s.%s %s", oldRoll.Name, oldRoll.Namespace, diff)
|
||||
ctrl.enqueueRollout(new)
|
||||
ctrl.enqueue(new)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(old interface{}) {
|
||||
@@ -108,7 +108,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.workqueue.ShutDown()
|
||||
|
||||
c.logger.Info("Starting controller")
|
||||
c.logger.Info("Starting operator")
|
||||
|
||||
for i := 0; i < threadiness; i++ {
|
||||
go wait.Until(func() {
|
||||
@@ -117,7 +117,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
|
||||
}, time.Second, stopCh)
|
||||
}
|
||||
|
||||
c.logger.Info("Started workers")
|
||||
c.logger.Info("Started operator workers")
|
||||
|
||||
tickChan := time.NewTicker(c.rolloutWindow).C
|
||||
for {
|
||||
@@ -125,7 +125,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
|
||||
case <-tickChan:
|
||||
c.doRollouts()
|
||||
case <-stopCh:
|
||||
c.logger.Info("Shutting down workers")
|
||||
c.logger.Info("Shutting down operator workers")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -174,19 +174,26 @@ func (c *Controller) syncHandler(key string) error {
|
||||
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
|
||||
return nil
|
||||
}
|
||||
rollout, err := c.rolloutLister.Canaries(namespace).Get(name)
|
||||
cd, err := c.rolloutLister.Canaries(namespace).Get(name)
|
||||
if errors.IsNotFound(err) {
|
||||
utilruntime.HandleError(fmt.Errorf("rollout '%s' in work queue no longer exists", key))
|
||||
utilruntime.HandleError(fmt.Errorf("'%s' in work queue no longer exists", key))
|
||||
return nil
|
||||
}
|
||||
|
||||
c.rollouts.Store(fmt.Sprintf("%s.%s", rollout.Name, rollout.Namespace), rollout)
|
||||
c.rollouts.Store(fmt.Sprintf("%s.%s", cd.Name, cd.Namespace), cd)
|
||||
|
||||
err = c.bootstrapDeployment(cd)
|
||||
if err != nil {
|
||||
c.logger.Warnf("%s.%s bootstrap error %v", cd.Name, cd.Namespace, err)
|
||||
return err
|
||||
}
|
||||
|
||||
c.logger.Infof("Synced %s", key)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) enqueueRollout(obj interface{}) {
|
||||
func (c *Controller) enqueue(obj interface{}) {
|
||||
var key string
|
||||
var err error
|
||||
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
|
||||
@@ -196,6 +203,16 @@ func (c *Controller) enqueueRollout(obj interface{}) {
|
||||
c.workqueue.AddRateLimited(key)
|
||||
}
|
||||
|
||||
func checkCustomResourceType(obj interface{}, logger *zap.SugaredLogger) (flaggerv1.Canary, bool) {
|
||||
var roll *flaggerv1.Canary
|
||||
var ok bool
|
||||
if roll, ok = obj.(*flaggerv1.Canary); !ok {
|
||||
logger.Errorf("Event Watch received an invalid object: %#v", obj)
|
||||
return flaggerv1.Canary{}, false
|
||||
}
|
||||
return *roll, true
|
||||
}
|
||||
|
||||
func (c *Controller) recordEventInfof(r *flaggerv1.Canary, template string, args ...interface{}) {
|
||||
c.logger.Infof(template, args...)
|
||||
c.recorder.Event(r, corev1.EventTypeNormal, "Synced", fmt.Sprintf(template, args...))
|
||||
@@ -210,13 +227,3 @@ func (c *Controller) recordEventWarningf(r *flaggerv1.Canary, template string, a
|
||||
c.logger.Infof(template, args...)
|
||||
c.recorder.Event(r, corev1.EventTypeWarning, "Synced", fmt.Sprintf(template, args...))
|
||||
}
|
||||
|
||||
func checkCustomResourceType(obj interface{}, logger *zap.SugaredLogger) (flaggerv1.Canary, bool) {
|
||||
var roll *flaggerv1.Canary
|
||||
var ok bool
|
||||
if roll, ok = obj.(*flaggerv1.Canary); !ok {
|
||||
logger.Errorf("Event Watch received an invalid object: %#v", obj)
|
||||
return flaggerv1.Canary{}, false
|
||||
}
|
||||
return *roll, true
|
||||
}
|
||||
|
||||
246
pkg/controller/deployer.go
Normal file
246
pkg/controller/deployer.go
Normal file
@@ -0,0 +1,246 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
|
||||
flaggerv1 "github.com/stefanprodan/flagger/pkg/apis/flagger/v1alpha1"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
)
|
||||
|
||||
func (c *Controller) bootstrapDeployment(cd *flaggerv1.Canary) error {
|
||||
|
||||
canaryName := cd.Spec.TargetRef.Name
|
||||
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
|
||||
|
||||
canaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(canaryName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
return fmt.Errorf("deployment %s.%s not found, retrying in %v",
|
||||
canaryName, cd.Namespace, c.rolloutWindow)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
primaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
primaryDep = &appsv1.Deployment{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: primaryName,
|
||||
Annotations: canaryDep.Annotations,
|
||||
Namespace: cd.Namespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
*metav1.NewControllerRef(cd, schema.GroupVersionKind{
|
||||
Group: flaggerv1.SchemeGroupVersion.Group,
|
||||
Version: flaggerv1.SchemeGroupVersion.Version,
|
||||
Kind: flaggerv1.CanaryKind,
|
||||
}),
|
||||
},
|
||||
},
|
||||
Spec: appsv1.DeploymentSpec{
|
||||
Replicas: canaryDep.Spec.Replicas,
|
||||
Strategy: canaryDep.Spec.Strategy,
|
||||
Selector: &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{
|
||||
"app": primaryName,
|
||||
},
|
||||
},
|
||||
Template: corev1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{"app": primaryName},
|
||||
Annotations: canaryDep.Spec.Template.Annotations,
|
||||
},
|
||||
Spec: canaryDep.Spec.Template.Spec,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err = c.kubeClient.AppsV1().Deployments(cd.Namespace).Create(primaryDep)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.recordEventInfof(cd, "Deployment %s.%s created", primaryDep.GetName(), cd.Namespace)
|
||||
}
|
||||
|
||||
if cd.Status.State == "" {
|
||||
c.scaleToZeroCanary(cd)
|
||||
}
|
||||
|
||||
canaryService, err := c.kubeClient.CoreV1().Services(cd.Namespace).Get(canaryName, metav1.GetOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
canaryService = &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: canaryName,
|
||||
Namespace: cd.Namespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
*metav1.NewControllerRef(cd, schema.GroupVersionKind{
|
||||
Group: flaggerv1.SchemeGroupVersion.Group,
|
||||
Version: flaggerv1.SchemeGroupVersion.Version,
|
||||
Kind: flaggerv1.CanaryKind,
|
||||
}),
|
||||
},
|
||||
},
|
||||
Spec: corev1.ServiceSpec{
|
||||
Type: corev1.ServiceTypeClusterIP,
|
||||
Selector: map[string]string{"app": canaryName},
|
||||
Ports: []corev1.ServicePort{
|
||||
{
|
||||
Name: "http",
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
Port: cd.Spec.Service.Port,
|
||||
TargetPort: intstr.IntOrString{
|
||||
Type: intstr.Int,
|
||||
IntVal: cd.Spec.Service.Port,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err = c.kubeClient.CoreV1().Services(cd.Namespace).Create(canaryService)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.recordEventInfof(cd, "Service %s.%s created", canaryService.GetName(), cd.Namespace)
|
||||
}
|
||||
|
||||
canaryTestServiceName := fmt.Sprintf("%s-canary", cd.Spec.TargetRef.Name)
|
||||
canaryTestService, err := c.kubeClient.CoreV1().Services(cd.Namespace).Get(canaryTestServiceName, metav1.GetOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
canaryTestService = &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: canaryTestServiceName,
|
||||
Namespace: cd.Namespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
*metav1.NewControllerRef(cd, schema.GroupVersionKind{
|
||||
Group: flaggerv1.SchemeGroupVersion.Group,
|
||||
Version: flaggerv1.SchemeGroupVersion.Version,
|
||||
Kind: flaggerv1.CanaryKind,
|
||||
}),
|
||||
},
|
||||
},
|
||||
Spec: corev1.ServiceSpec{
|
||||
Type: corev1.ServiceTypeClusterIP,
|
||||
Selector: map[string]string{"app": canaryName},
|
||||
Ports: []corev1.ServicePort{
|
||||
{
|
||||
Name: "http",
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
Port: cd.Spec.Service.Port,
|
||||
TargetPort: intstr.IntOrString{
|
||||
Type: intstr.Int,
|
||||
IntVal: cd.Spec.Service.Port,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err = c.kubeClient.CoreV1().Services(cd.Namespace).Create(canaryTestService)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.recordEventInfof(cd, "Service %s.%s created", canaryTestService.GetName(), cd.Namespace)
|
||||
}
|
||||
|
||||
primaryService, err := c.kubeClient.CoreV1().Services(cd.Namespace).Get(primaryName, metav1.GetOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
primaryService = &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: primaryName,
|
||||
Namespace: cd.Namespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
*metav1.NewControllerRef(cd, schema.GroupVersionKind{
|
||||
Group: flaggerv1.SchemeGroupVersion.Group,
|
||||
Version: flaggerv1.SchemeGroupVersion.Version,
|
||||
Kind: flaggerv1.CanaryKind,
|
||||
}),
|
||||
},
|
||||
},
|
||||
Spec: corev1.ServiceSpec{
|
||||
Type: corev1.ServiceTypeClusterIP,
|
||||
Selector: map[string]string{"app": primaryName},
|
||||
Ports: []corev1.ServicePort{
|
||||
{
|
||||
Name: "http",
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
Port: cd.Spec.Service.Port,
|
||||
TargetPort: intstr.IntOrString{
|
||||
Type: intstr.Int,
|
||||
IntVal: cd.Spec.Service.Port,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err = c.kubeClient.CoreV1().Services(cd.Namespace).Create(primaryService)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.recordEventInfof(cd, "Service %s.%s created", primaryService.GetName(), cd.Namespace)
|
||||
}
|
||||
|
||||
hosts := append(cd.Spec.Service.Hosts, canaryName)
|
||||
gateways := append(cd.Spec.Service.Gateways, "mesh")
|
||||
virtualService, err := c.istioClient.NetworkingV1alpha3().VirtualServices(cd.Namespace).Get(canaryName, metav1.GetOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
virtualService = &istiov1alpha3.VirtualService{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: canaryName,
|
||||
Namespace: cd.Namespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
*metav1.NewControllerRef(cd, schema.GroupVersionKind{
|
||||
Group: flaggerv1.SchemeGroupVersion.Group,
|
||||
Version: flaggerv1.SchemeGroupVersion.Version,
|
||||
Kind: flaggerv1.CanaryKind,
|
||||
}),
|
||||
},
|
||||
},
|
||||
Spec: istiov1alpha3.VirtualServiceSpec{
|
||||
Hosts: hosts,
|
||||
Gateways: gateways,
|
||||
Http: []istiov1alpha3.HTTPRoute{
|
||||
{
|
||||
Route: []istiov1alpha3.DestinationWeight{
|
||||
{
|
||||
Destination: istiov1alpha3.Destination{
|
||||
Host: primaryName,
|
||||
Port: istiov1alpha3.PortSelector{
|
||||
Number: uint32(cd.Spec.Service.Port),
|
||||
},
|
||||
},
|
||||
Weight: 100,
|
||||
},
|
||||
{
|
||||
Destination: istiov1alpha3.Destination{
|
||||
Host: canaryName,
|
||||
Port: istiov1alpha3.PortSelector{
|
||||
Number: uint32(cd.Spec.Service.Port),
|
||||
},
|
||||
},
|
||||
Weight: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err = c.istioClient.NetworkingV1alpha3().VirtualServices(cd.Namespace).Create(virtualService)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.recordEventInfof(cd, "VirtualService %s.%s created", virtualService.GetName(), cd.Namespace)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
|
||||
flaggerv1 "github.com/stefanprodan/flagger/pkg/apis/flagger/v1beta1"
|
||||
flaggerv1 "github.com/stefanprodan/flagger/pkg/apis/flagger/v1alpha1"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
func (c *Controller) doRollouts() {
|
||||
c.rollouts.Range(func(key interface{}, value interface{}) bool {
|
||||
r := value.(*flaggerv1.Canary)
|
||||
if r.Spec.TargetKind == "Deployment" {
|
||||
if r.Spec.TargetRef.Kind == "Deployment" {
|
||||
go c.advanceDeploymentRollout(r.Name, r.Namespace)
|
||||
}
|
||||
return true
|
||||
@@ -27,6 +27,12 @@ func (c *Controller) advanceDeploymentRollout(name string, namespace string) {
|
||||
return
|
||||
}
|
||||
|
||||
err := c.bootstrapDeployment(r)
|
||||
if err != nil {
|
||||
c.recordEventWarningf(r, "%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// set max weight default value to 100%
|
||||
maxWeight := 100
|
||||
if r.Spec.CanaryAnalysis.MaxWeight > 0 {
|
||||
@@ -34,13 +40,13 @@ func (c *Controller) advanceDeploymentRollout(name string, namespace string) {
|
||||
}
|
||||
|
||||
// gate stage: check if canary deployment exists and is healthy
|
||||
canary, ok := c.getCanaryDeployment(r, r.Spec.Canary.Name, r.Namespace)
|
||||
canary, ok := c.getCanaryDeployment(r, r.Spec.TargetRef.Name, r.Namespace)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// gate stage: check if primary deployment exists and is healthy
|
||||
primary, ok := c.getDeployment(r, r.Spec.Primary.Name, r.Namespace)
|
||||
primary, ok := c.getDeployment(r, fmt.Sprintf("%s-primary", r.Spec.TargetRef.Name), r.Namespace)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
@@ -60,7 +66,7 @@ func (c *Controller) advanceDeploymentRollout(name string, namespace string) {
|
||||
// gate stage: check if the number of failed checks reached the threshold
|
||||
if r.Status.State == "running" && r.Status.FailedChecks >= r.Spec.CanaryAnalysis.Threshold {
|
||||
c.recordEventWarningf(r, "Rolling back %s.%s failed checks threshold reached %v",
|
||||
r.Spec.Canary.Name, r.Namespace, r.Status.FailedChecks)
|
||||
r.Name, r.Namespace, r.Status.FailedChecks)
|
||||
|
||||
// route all traffic back to primary
|
||||
primaryRoute.Weight = 100
|
||||
@@ -136,7 +142,7 @@ func (c *Controller) advanceDeploymentRollout(name string, namespace string) {
|
||||
}
|
||||
|
||||
func (c *Controller) getRollout(name string, namespace string) (*flaggerv1.Canary, bool) {
|
||||
r, err := c.rolloutClient.FlaggerV1beta1().Canaries(namespace).Get(name, v1.GetOptions{})
|
||||
r, err := c.rolloutClient.FlaggerV1alpha1().Canaries(namespace).Get(name, v1.GetOptions{})
|
||||
if err != nil {
|
||||
c.logger.Errorf("Canary %s.%s not found", name, namespace)
|
||||
return nil, false
|
||||
@@ -146,19 +152,26 @@ func (c *Controller) getRollout(name string, namespace string) (*flaggerv1.Canar
|
||||
}
|
||||
|
||||
func (c *Controller) checkRolloutStatus(r *flaggerv1.Canary, canary *appsv1.Deployment) bool {
|
||||
var err error
|
||||
canaryRevision, err := c.getDeploymentSpecEnc(canary)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Canary %s.%s not found: %v", r.Name, r.Namespace, err)
|
||||
return false
|
||||
}
|
||||
|
||||
if r.Status.State == "" {
|
||||
r.Status = flaggerv1.CanaryStatus{
|
||||
State: "running",
|
||||
CanaryRevision: canary.ResourceVersion,
|
||||
State: "initialized",
|
||||
CanaryRevision: canaryRevision,
|
||||
FailedChecks: 0,
|
||||
}
|
||||
r, err = c.rolloutClient.FlaggerV1beta1().Canaries(r.Namespace).Update(r)
|
||||
r, err = c.rolloutClient.FlaggerV1alpha1().Canaries(r.Namespace).Update(r)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Canary %s.%s status update failed: %v", r.Name, r.Namespace, err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
||||
c.recordEventInfof(r, "Initialization done! %s.%s", canary.GetName(), canary.Namespace)
|
||||
return false
|
||||
}
|
||||
|
||||
if r.Status.State == "running" {
|
||||
@@ -166,38 +179,31 @@ func (c *Controller) checkRolloutStatus(r *flaggerv1.Canary, canary *appsv1.Depl
|
||||
}
|
||||
|
||||
if r.Status.State == "promotion-finished" {
|
||||
c.setCanaryRevision(r, "finished")
|
||||
c.logger.Infof("Promotion completed! %s.%s revision %s", r.Spec.Canary.Name, r.Namespace,
|
||||
c.getDeploymentRevision(r.Spec.Canary.Name, r.Namespace))
|
||||
c.setCanaryRevision(r, canary, "finished")
|
||||
c.logger.Infof("Promotion completed! %s.%s", r.Name, r.Namespace)
|
||||
return false
|
||||
}
|
||||
|
||||
if r.Status.State == "promotion-failed" {
|
||||
c.setCanaryRevision(r, "failed")
|
||||
c.logger.Infof("Promotion failed! %s.%s revision %s", r.Spec.Canary.Name, r.Namespace,
|
||||
c.getDeploymentRevision(r.Spec.Canary.Name, r.Namespace))
|
||||
c.setCanaryRevision(r, canary, "failed")
|
||||
c.logger.Infof("Promotion failed! %s.%s", r.Name, r.Namespace)
|
||||
return false
|
||||
}
|
||||
|
||||
if r.Status.CanaryRevision != canary.ResourceVersion {
|
||||
c.recordEventInfof(r, "New revision detected %s.%s old %s new %s",
|
||||
canary.GetName(), canary.Namespace, r.Status.CanaryRevision, canary.ResourceVersion)
|
||||
if diff, err := c.diffDeploymentSpec(r, canary); diff {
|
||||
c.recordEventInfof(r, "New revision detected %s.%s",
|
||||
canary.GetName(), canary.Namespace)
|
||||
canary.Spec.Replicas = int32p(1)
|
||||
canary, err = c.kubeClient.AppsV1().Deployments(canary.Namespace).Update(canary)
|
||||
if err != nil {
|
||||
c.recordEventErrorf(r, "Scaling up %s.%s failed: %v", canary.GetName(), canary.Namespace, err)
|
||||
return false
|
||||
}
|
||||
|
||||
r.Status = flaggerv1.CanaryStatus{
|
||||
State: "running",
|
||||
CanaryRevision: canary.ResourceVersion,
|
||||
FailedChecks: 0,
|
||||
}
|
||||
r, err = c.rolloutClient.FlaggerV1beta1().Canaries(r.Namespace).Update(r)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Canary %s.%s status update failed: %v", r.Name, r.Namespace, err)
|
||||
return false
|
||||
FailedChecks: 0,
|
||||
}
|
||||
c.setCanaryRevision(r, canary, "running")
|
||||
c.recordEventInfof(r, "Scaling up %s.%s", canary.GetName(), canary.Namespace)
|
||||
|
||||
return false
|
||||
@@ -209,7 +215,7 @@ func (c *Controller) checkRolloutStatus(r *flaggerv1.Canary, canary *appsv1.Depl
|
||||
func (c *Controller) updateRolloutStatus(r *flaggerv1.Canary, status string) bool {
|
||||
var err error
|
||||
r.Status.State = status
|
||||
r, err = c.rolloutClient.FlaggerV1beta1().Canaries(r.Namespace).Update(r)
|
||||
r, err = c.rolloutClient.FlaggerV1alpha1().Canaries(r.Namespace).Update(r)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Canary %s.%s status update failed: %v", r.Name, r.Namespace, err)
|
||||
return false
|
||||
@@ -220,7 +226,7 @@ func (c *Controller) updateRolloutStatus(r *flaggerv1.Canary, status string) boo
|
||||
func (c *Controller) updateRolloutFailedChecks(r *flaggerv1.Canary, val int) bool {
|
||||
var err error
|
||||
r.Status.FailedChecks = val
|
||||
r, err = c.rolloutClient.FlaggerV1beta1().Canaries(r.Namespace).Update(r)
|
||||
r, err = c.rolloutClient.FlaggerV1alpha1().Canaries(r.Namespace).Update(r)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Canary %s.%s status update failed: %v", r.Name, r.Namespace, err)
|
||||
return false
|
||||
@@ -262,20 +268,10 @@ func (c *Controller) getCanaryDeployment(r *flaggerv1.Canary, name string, names
|
||||
return dep, true
|
||||
}
|
||||
|
||||
func (c *Controller) getDeploymentRevision(name string, namespace string) string {
|
||||
dep, err := c.kubeClient.AppsV1().Deployments(namespace).Get(name, v1.GetOptions{})
|
||||
if err != nil {
|
||||
c.logger.Errorf("Deployment %s.%s not found", name, namespace)
|
||||
return ""
|
||||
}
|
||||
|
||||
return dep.ResourceVersion
|
||||
}
|
||||
|
||||
func (c *Controller) checkDeploymentMetrics(r *flaggerv1.Canary) bool {
|
||||
for _, metric := range r.Spec.CanaryAnalysis.Metrics {
|
||||
if metric.Name == "istio_requests_total" {
|
||||
val, err := c.getDeploymentCounter(r.Spec.Canary.Name, r.Namespace, metric.Name, metric.Interval)
|
||||
val, err := c.getDeploymentCounter(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval)
|
||||
if err != nil {
|
||||
c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.metricsServer, err)
|
||||
return false
|
||||
@@ -288,7 +284,7 @@ func (c *Controller) checkDeploymentMetrics(r *flaggerv1.Canary) bool {
|
||||
}
|
||||
|
||||
if metric.Name == "istio_request_duration_seconds_bucket" {
|
||||
val, err := c.GetDeploymentHistogram(r.Spec.Canary.Name, r.Namespace, metric.Name, metric.Interval)
|
||||
val, err := c.GetDeploymentHistogram(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval)
|
||||
if err != nil {
|
||||
c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.metricsServer, err)
|
||||
return false
|
||||
@@ -306,9 +302,9 @@ func (c *Controller) checkDeploymentMetrics(r *flaggerv1.Canary) bool {
|
||||
}
|
||||
|
||||
func (c *Controller) scaleToZeroCanary(r *flaggerv1.Canary) {
|
||||
canary, err := c.kubeClient.AppsV1().Deployments(r.Namespace).Get(r.Spec.Canary.Name, v1.GetOptions{})
|
||||
canary, err := c.kubeClient.AppsV1().Deployments(r.Namespace).Get(r.Spec.TargetRef.Name, v1.GetOptions{})
|
||||
if err != nil {
|
||||
c.recordEventErrorf(r, "Deployment %s.%s not found", r.Spec.Canary.Name, r.Namespace)
|
||||
c.recordEventErrorf(r, "Deployment %s.%s not found", r.Spec.TargetRef.Name, r.Namespace)
|
||||
return
|
||||
}
|
||||
//HPA https://github.com/kubernetes/kubernetes/pull/29212
|
||||
@@ -320,22 +316,15 @@ func (c *Controller) scaleToZeroCanary(r *flaggerv1.Canary) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) setCanaryRevision(r *flaggerv1.Canary, status string) {
|
||||
canaryRevision := c.getDeploymentRevision(r.Spec.Canary.Name, r.Namespace)
|
||||
r, ok := c.getRollout(r.Name, r.Namespace)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
func (c *Controller) setCanaryRevision(r *flaggerv1.Canary, canary *appsv1.Deployment, status string) {
|
||||
r.Status = flaggerv1.CanaryStatus{
|
||||
State: status,
|
||||
CanaryRevision: canaryRevision,
|
||||
FailedChecks: r.Status.FailedChecks,
|
||||
State: status,
|
||||
FailedChecks: r.Status.FailedChecks,
|
||||
}
|
||||
r, err := c.rolloutClient.FlaggerV1beta1().Canaries(r.Namespace).Update(r)
|
||||
err := c.saveDeploymentSpec(r, canary)
|
||||
if err != nil {
|
||||
c.logger.Errorf("Canary %s.%s status update failed: %v", r.Name, r.Namespace, err)
|
||||
}
|
||||
//c.logger.Infof("Canary %s.%s status %+v", r.Spec.Canary.Name, r.Namespace, r.Status)
|
||||
}
|
||||
|
||||
func (c *Controller) getVirtualService(r *flaggerv1.Canary) (
|
||||
@@ -345,18 +334,18 @@ func (c *Controller) getVirtualService(r *flaggerv1.Canary) (
|
||||
ok bool,
|
||||
) {
|
||||
var err error
|
||||
vs, err = c.istioClient.NetworkingV1alpha3().VirtualServices(r.Namespace).Get(r.Spec.VirtualService.Name, v1.GetOptions{})
|
||||
vs, err = c.istioClient.NetworkingV1alpha3().VirtualServices(r.Namespace).Get(r.Name, v1.GetOptions{})
|
||||
if err != nil {
|
||||
c.recordEventErrorf(r, "VirtualService %s.%s not found", r.Spec.VirtualService.Name, r.Namespace)
|
||||
c.recordEventErrorf(r, "VirtualService %s.%s not found", r.Name, r.Namespace)
|
||||
return
|
||||
}
|
||||
|
||||
for _, http := range vs.Spec.Http {
|
||||
for _, route := range http.Route {
|
||||
if route.Destination.Host == r.Spec.Primary.Host {
|
||||
if route.Destination.Host == fmt.Sprintf("%s-primary", r.Spec.TargetRef.Name) {
|
||||
primary = route
|
||||
}
|
||||
if route.Destination.Host == r.Spec.Canary.Host {
|
||||
if route.Destination.Host == r.Spec.TargetRef.Name {
|
||||
canary = route
|
||||
}
|
||||
}
|
||||
@@ -364,7 +353,7 @@ func (c *Controller) getVirtualService(r *flaggerv1.Canary) (
|
||||
|
||||
if primary.Weight == 0 && canary.Weight == 0 {
|
||||
c.recordEventErrorf(r, "VirtualService %s.%s does not contain routes for %s and %s",
|
||||
r.Spec.VirtualService.Name, r.Namespace, r.Spec.Primary.Host, r.Spec.Canary.Host)
|
||||
r.Name, r.Namespace, fmt.Sprintf("%s-primary", r.Spec.TargetRef.Name), r.Spec.TargetRef.Name)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -387,7 +376,7 @@ func (c *Controller) updateVirtualServiceRoutes(
|
||||
var err error
|
||||
vs, err = c.istioClient.NetworkingV1alpha3().VirtualServices(r.Namespace).Update(vs)
|
||||
if err != nil {
|
||||
c.recordEventErrorf(r, "VirtualService %s.%s update failed: %v", r.Spec.VirtualService.Name, r.Namespace, err)
|
||||
c.recordEventErrorf(r, "VirtualService %s.%s update failed: %v", r.Name, r.Namespace, err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
||||
79
pkg/controller/utils.go
Normal file
79
pkg/controller/utils.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
flaggerv1 "github.com/stefanprodan/flagger/pkg/apis/flagger/v1alpha1"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func (c *Controller) saveDeploymentSpec(cd *flaggerv1.Canary, dep *appsv1.Deployment) error {
|
||||
specJson, err := json.Marshal(dep.Spec.Template.Spec)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
specEnc := base64.StdEncoding.EncodeToString(specJson)
|
||||
cd.Status.CanaryRevision = specEnc
|
||||
cd, err = c.rolloutClient.FlaggerV1alpha1().Canaries(cd.Namespace).Update(cd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) diffDeploymentSpec(cd *flaggerv1.Canary, dep *appsv1.Deployment) (bool, error) {
|
||||
if cd.Status.CanaryRevision == "" {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
newSpec := &dep.Spec.Template.Spec
|
||||
oldSpecJson, err := base64.StdEncoding.DecodeString(cd.Status.CanaryRevision)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
oldSpec := &corev1.PodSpec{}
|
||||
err = json.Unmarshal(oldSpecJson, oldSpec)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(*newSpec, *oldSpec, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" {
|
||||
fmt.Println(diff)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (c *Controller) getDeploymentSpec(name string, namespace string) (string, error) {
|
||||
dep, err := c.kubeClient.AppsV1().Deployments(namespace).Get(name, v1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
specJson, err := json.Marshal(dep.Spec.Template.Spec)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
specEnc := base64.StdEncoding.EncodeToString(specJson)
|
||||
return specEnc, nil
|
||||
}
|
||||
|
||||
func (c *Controller) getDeploymentSpecEnc(dep *appsv1.Deployment) (string, error) {
|
||||
specJson, err := json.Marshal(dep.Spec.Template.Spec)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
specEnc := base64.StdEncoding.EncodeToString(specJson)
|
||||
return specEnc, nil
|
||||
}
|
||||
Reference in New Issue
Block a user