Controller refactoring part two

- share components between loops
This commit is contained in:
Stefan Prodan
2018-10-11 20:51:12 +03:00
parent baeee62a26
commit 663dc82574
5 changed files with 112 additions and 130 deletions

View File

@@ -4,7 +4,7 @@ VERSION_MINOR:=$(shell grep 'VERSION' pkg/version/version.go | awk '{ print $$4
PATCH:=$(shell grep 'VERSION' pkg/version/version.go | awk '{ print $$4 }' | tr -d '"' | awk -F. '{print $$NF}')
SOURCE_DIRS = cmd pkg/apis pkg/controller pkg/server pkg/logging pkg/version
run:
go run cmd/flagger/* -kubeconfig=$$HOME/.kube/config -log-level=info -metrics-server=https://prometheus.istio.weavedx.com
go run -race cmd/flagger/* -kubeconfig=$$HOME/.kube/config -log-level=info -metrics-server=https://prometheus.istio.weavedx.com
build:
docker build -t stefanprodan/flagger:$(TAG) . -f Dockerfile

View File

@@ -64,13 +64,13 @@ func main() {
logger.Fatalf("Error building shared clientset: %v", err)
}
rolloutClient, err := clientset.NewForConfig(cfg)
flaggerClient, err := clientset.NewForConfig(cfg)
if err != nil {
logger.Fatalf("Error building example clientset: %s", err.Error())
}
canaryInformerFactory := informers.NewSharedInformerFactory(rolloutClient, time.Second*30)
canaryInformer := canaryInformerFactory.Flagger().V1alpha1().Canaries()
flaggerInformerFactory := informers.NewSharedInformerFactory(flaggerClient, time.Second*30)
canaryInformer := flaggerInformerFactory.Flagger().V1alpha1().Canaries()
logger.Infof("Starting flagger version %s revision %s", version.VERSION, version.REVISION)
@@ -94,14 +94,14 @@ func main() {
c := controller.NewController(
kubeClient,
sharedClient,
rolloutClient,
flaggerClient,
canaryInformer,
controlLoopInterval,
metricsServer,
logger,
)
canaryInformerFactory.Start(stopCh)
flaggerInformerFactory.Start(stopCh)
logger.Info("Waiting for informer caches to sync")
for _, synced := range []cache.InformerSynced{

View File

@@ -30,23 +30,25 @@ const controllerAgentName = "flagger"
type Controller struct {
kubeClient kubernetes.Interface
istioClient istioclientset.Interface
rolloutClient clientset.Interface
rolloutLister flaggerlisters.CanaryLister
rolloutSynced cache.InformerSynced
rolloutWindow time.Duration
flaggerClient clientset.Interface
flaggerLister flaggerlisters.CanaryLister
flaggerSynced cache.InformerSynced
flaggerWindow time.Duration
workqueue workqueue.RateLimitingInterface
recorder record.EventRecorder
logger *zap.SugaredLogger
metricsServer string
rollouts *sync.Map
canaries *sync.Map
deployer CanaryDeployer
router CanaryRouter
observer CanaryObserver
}
func NewController(
kubeClient kubernetes.Interface,
istioClient istioclientset.Interface,
rolloutClient clientset.Interface,
rolloutInformer flaggerinformers.CanaryInformer,
rolloutWindow time.Duration,
flaggerClient clientset.Interface,
flaggerInformer flaggerinformers.CanaryInformer,
flaggerWindow time.Duration,
metricServer string,
logger *zap.SugaredLogger,
@@ -61,21 +63,41 @@ func NewController(
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
deployer := CanaryDeployer{
logger: logger,
kubeClient: kubeClient,
istioClient: istioClient,
flaggerClient: flaggerClient,
}
router := CanaryRouter{
logger: logger,
kubeClient: kubeClient,
istioClient: istioClient,
flaggerClient: flaggerClient,
}
observer := CanaryObserver{
metricsServer: metricServer,
}
ctrl := &Controller{
kubeClient: kubeClient,
istioClient: istioClient,
rolloutClient: rolloutClient,
rolloutLister: rolloutInformer.Lister(),
rolloutSynced: rolloutInformer.Informer().HasSynced,
flaggerClient: flaggerClient,
flaggerLister: flaggerInformer.Lister(),
flaggerSynced: flaggerInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
recorder: recorder,
logger: logger,
rollouts: new(sync.Map),
metricsServer: metricServer,
rolloutWindow: rolloutWindow,
canaries: new(sync.Map),
flaggerWindow: flaggerWindow,
deployer: deployer,
router: router,
observer: observer,
}
rolloutInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
flaggerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.enqueue,
UpdateFunc: func(old, new interface{}) {
oldRoll, ok := checkCustomResourceType(old, logger)
@@ -96,7 +118,7 @@ func NewController(
r, ok := checkCustomResourceType(old, logger)
if ok {
ctrl.logger.Infof("Deleting %s.%s from cache", r.Name, r.Namespace)
ctrl.rollouts.Delete(fmt.Sprintf("%s.%s", r.Name, r.Namespace))
ctrl.canaries.Delete(fmt.Sprintf("%s.%s", r.Name, r.Namespace))
}
},
})
@@ -119,7 +141,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
c.logger.Info("Started operator workers")
tickChan := time.NewTicker(c.rolloutWindow).C
tickChan := time.NewTicker(c.flaggerWindow).C
for {
select {
case <-tickChan:
@@ -174,13 +196,13 @@ func (c *Controller) syncHandler(key string) error {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
cd, err := c.rolloutLister.Canaries(namespace).Get(name)
cd, err := c.flaggerLister.Canaries(namespace).Get(name)
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("'%s' in work queue no longer exists", key))
return nil
}
c.rollouts.Store(fmt.Sprintf("%s.%s", cd.Name, cd.Namespace), cd)
c.canaries.Store(fmt.Sprintf("%s.%s", cd.Name, cd.Namespace), cd)
//if cd.Spec.TargetRef.Kind == "Deployment" {
// err = c.bootstrapDeployment(cd)

View File

@@ -57,9 +57,9 @@ func (c *CanaryDeployer) Promote(cd *flaggerv1.Canary) error {
return nil
}
// IsDeploymentHealthy checks the primary and canary deployment status and returns an error if
// the deployment is in the middle of a rolling update or if the pods are unhealthy
func (c *CanaryDeployer) IsDeploymentHealthy(cd *flaggerv1.Canary) error {
// IsReady checks the primary and canary deployment status and returns an error if
// the deployments are in the middle of a rolling update or if the pods are unhealthy
func (c *CanaryDeployer) IsReady(cd *flaggerv1.Canary) error {
canary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(cd.Spec.TargetRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
@@ -182,25 +182,28 @@ func (c *CanaryDeployer) Scale(cd *flaggerv1.Canary, replicas int32) error {
canary, err = c.kubeClient.AppsV1().Deployments(canary.Namespace).Update(canary)
if err != nil {
return fmt.Errorf("scaling %s.%s to %v failed: %v", canary.GetName(), canary.Namespace, replicas, err)
}
return nil
}
// Sync creates the primary deployment and hpa
// and scales to zero the canary deployment
func (c *CanaryDeployer) Sync(cd *flaggerv1.Canary) error {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
if err := c.createPrimaryDeployment(cd); err != nil {
return err
return fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err)
}
if cd.Status.State == "" {
c.Scale(cd, 0)
c.logger.Infof("Scaling down %s.%s", primaryName, cd.Namespace)
if err := c.Scale(cd, 0); err != nil {
return err
}
}
if cd.Spec.AutoscalerRef.Kind == "HorizontalPodAutoscaler" {
if err := c.createPrimaryHpa(cd); err != nil {
return err
return fmt.Errorf("creating hpa %s.%s failed: %v", primaryName, cd.Namespace, err)
}
}
return nil

View File

@@ -9,7 +9,7 @@ import (
)
func (c *Controller) scheduleCanaries() {
c.rollouts.Range(func(key interface{}, value interface{}) bool {
c.canaries.Range(func(key interface{}, value interface{}) bool {
r := value.(*flaggerv1.Canary)
if r.Spec.TargetRef.Kind == "Deployment" {
go c.advanceCanary(r.Name, r.Namespace)
@@ -19,93 +19,74 @@ func (c *Controller) scheduleCanaries() {
}
func (c *Controller) advanceCanary(name string, namespace string) {
// check if the rollout exists
r, err := c.rolloutClient.FlaggerV1alpha1().Canaries(namespace).Get(name, v1.GetOptions{})
// check if the canary exists
cd, err := c.flaggerClient.FlaggerV1alpha1().Canaries(namespace).Get(name, v1.GetOptions{})
if err != nil {
c.logger.Errorf("Canary %s.%s not found", name, namespace)
return
}
deployer := CanaryDeployer{
logger: c.logger,
kubeClient: c.kubeClient,
istioClient: c.istioClient,
flaggerClient: c.rolloutClient,
}
// create primary deployment and hpa if needed
err = deployer.Sync(r)
if err != nil {
c.recordEventWarningf(r, "%v", err)
if err := c.deployer.Sync(cd); err != nil {
c.recordEventWarningf(cd, "%v", err)
return
}
router := CanaryRouter{
logger: c.logger,
kubeClient: c.kubeClient,
istioClient: c.istioClient,
flaggerClient: c.rolloutClient,
}
// create ClusterIP services and virtual service if needed
err = router.Sync(r)
if err != nil {
c.recordEventWarningf(r, "%v", err)
if err := c.router.Sync(cd); err != nil {
c.recordEventWarningf(cd, "%v", err)
return
}
// set max weight default value to 100%
maxWeight := 100
if r.Spec.CanaryAnalysis.MaxWeight > 0 {
maxWeight = r.Spec.CanaryAnalysis.MaxWeight
if cd.Spec.CanaryAnalysis.MaxWeight > 0 {
maxWeight = cd.Spec.CanaryAnalysis.MaxWeight
}
// check primary and canary deployments status
err = deployer.IsDeploymentHealthy(r)
if err != nil {
c.recordEventWarningf(r, "%v", err)
if err := c.deployer.IsReady(cd); err != nil {
c.recordEventWarningf(cd, "%v", err)
return
}
// check if virtual service exists
// and if it contains weighted destination routes to the primary and canary services
primaryRoute, canaryRoute, err := router.GetRoutes(r)
primaryRoute, canaryRoute, err := c.router.GetRoutes(cd)
if err != nil {
c.recordEventWarningf(r, "%v", err)
c.recordEventWarningf(cd, "%v", err)
return
}
// check if canary analysis should start (canary revision has changes) or continue
if ok := c.checkCanaryStatus(r, deployer); !ok {
if ok := c.checkCanaryStatus(cd, c.deployer); !ok {
return
}
// check if the number of failed checks reached the threshold
if r.Status.State == "running" && r.Status.FailedChecks >= r.Spec.CanaryAnalysis.Threshold {
c.recordEventWarningf(r, "Rolling back %s.%s failed checks threshold reached %v",
r.Name, r.Namespace, r.Status.FailedChecks)
if cd.Status.State == "running" && cd.Status.FailedChecks >= cd.Spec.CanaryAnalysis.Threshold {
c.recordEventWarningf(cd, "Rolling back %s.%s failed checks threshold reached %v",
cd.Name, cd.Namespace, cd.Status.FailedChecks)
// route all traffic back to primary
primaryRoute.Weight = 100
canaryRoute.Weight = 0
if err := router.SetRoutes(r, primaryRoute, canaryRoute); err != nil {
c.recordEventWarningf(r, "%v", err)
if err := c.router.SetRoutes(cd, primaryRoute, canaryRoute); err != nil {
c.recordEventWarningf(cd, "%v", err)
return
}
c.recordEventWarningf(r, "Canary failed! Scaling down %s.%s",
r.Spec.TargetRef.Name, r.Namespace)
c.recordEventWarningf(cd, "Canary failed! Scaling down %s.%s",
cd.Spec.TargetRef.Name, cd.Namespace)
// shutdown canary
err = deployer.Scale(r, 0)
if err != nil {
c.recordEventWarningf(r, "%v", err)
if err := c.deployer.Scale(cd, 0); err != nil {
c.recordEventWarningf(cd, "%v", err)
return
}
// mark canary as failed
err := deployer.SetState(r, "failed")
if err != nil {
if err := c.deployer.SetState(cd, "failed"); err != nil {
c.logger.Errorf("%v", err)
return
}
@@ -115,11 +96,11 @@ func (c *Controller) advanceCanary(name string, namespace string) {
// check if the canary success rate is above the threshold
// skip check if no traffic is routed to canary
if canaryRoute.Weight == 0 {
c.recordEventInfof(r, "Starting canary deployment for %s.%s", r.Name, r.Namespace)
c.recordEventInfof(cd, "Starting canary deployment for %s.%s", cd.Name, cd.Namespace)
} else {
if ok := c.checkCanaryMetrics(r); !ok {
if err = deployer.SetFailedChecks(r, r.Status.FailedChecks+1); err != nil {
c.recordEventWarningf(r, "%v", err)
if ok := c.checkCanaryMetrics(cd); !ok {
if err := c.deployer.SetFailedChecks(cd, cd.Status.FailedChecks+1); err != nil {
c.recordEventWarningf(cd, "%v", err)
return
}
return
@@ -128,31 +109,29 @@ func (c *Controller) advanceCanary(name string, namespace string) {
// increase canary traffic percentage
if canaryRoute.Weight < maxWeight {
primaryRoute.Weight -= r.Spec.CanaryAnalysis.StepWeight
primaryRoute.Weight -= cd.Spec.CanaryAnalysis.StepWeight
if primaryRoute.Weight < 0 {
primaryRoute.Weight = 0
}
canaryRoute.Weight += r.Spec.CanaryAnalysis.StepWeight
canaryRoute.Weight += cd.Spec.CanaryAnalysis.StepWeight
if primaryRoute.Weight > 100 {
primaryRoute.Weight = 100
}
if err = router.SetRoutes(r, primaryRoute, canaryRoute); err != nil {
c.recordEventWarningf(r, "%v", err)
if err := c.router.SetRoutes(cd, primaryRoute, canaryRoute); err != nil {
c.recordEventWarningf(cd, "%v", err)
return
}
c.recordEventInfof(r, "Advance %s.%s canary weight %v", r.Name, r.Namespace, canaryRoute.Weight)
c.recordEventInfof(cd, "Advance %s.%s canary weight %v", cd.Name, cd.Namespace, canaryRoute.Weight)
// promote canary
primaryName := fmt.Sprintf("%s-primary", r.Spec.TargetRef.Name)
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
if canaryRoute.Weight == maxWeight {
c.recordEventInfof(r, "Copying %s.%s template spec to %s.%s",
r.Spec.TargetRef.Name, r.Namespace, primaryName, r.Namespace)
err := deployer.Promote(r)
if err != nil {
c.recordEventWarningf(r, "%v", err)
c.recordEventInfof(cd, "Copying %s.%s template spec to %s.%s",
cd.Spec.TargetRef.Name, cd.Namespace, primaryName, cd.Namespace)
if err := c.deployer.Promote(cd); err != nil {
c.recordEventWarningf(cd, "%v", err)
return
}
}
@@ -160,24 +139,22 @@ func (c *Controller) advanceCanary(name string, namespace string) {
// route all traffic back to primary
primaryRoute.Weight = 100
canaryRoute.Weight = 0
if err = router.SetRoutes(r, primaryRoute, canaryRoute); err != nil {
c.recordEventWarningf(r, "%v", err)
if err := c.router.SetRoutes(cd, primaryRoute, canaryRoute); err != nil {
c.recordEventWarningf(cd, "%v", err)
return
}
c.recordEventInfof(r, "Promotion completed! Scaling down %s.%s", r.Spec.TargetRef.Name, r.Namespace)
c.recordEventInfof(cd, "Promotion completed! Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)
// shutdown canary
err = deployer.Scale(r, 0)
if err != nil {
c.recordEventWarningf(r, "%v", err)
if err := c.deployer.Scale(cd, 0); err != nil {
c.recordEventWarningf(cd, "%v", err)
return
}
// update status
err = deployer.SetState(r, "finished")
if err != nil {
c.recordEventWarningf(r, "%v", err)
if err := c.deployer.SetState(cd, "finished"); err != nil {
c.recordEventWarningf(cd, "%v", err)
return
}
}
@@ -189,55 +166,35 @@ func (c *Controller) checkCanaryStatus(r *flaggerv1.Canary, deployer CanaryDeplo
}
if r.Status.State == "" {
status := flaggerv1.CanaryStatus{
State: "initialized",
FailedChecks: 0,
}
err := deployer.SyncStatus(r, status)
if err != nil {
if err := deployer.SyncStatus(r, flaggerv1.CanaryStatus{State: "initialized"}); err != nil {
c.logger.Errorf("%v", err)
return false
}
c.recordEventInfof(r, "Initialization done! %s.%s", r.Name, r.Namespace)
return false
}
if diff, err := deployer.IsNewSpec(r); diff {
c.recordEventInfof(r, "New revision detected %s.%s", r.Spec.TargetRef.Name, r.Namespace)
err = deployer.Scale(r, 1)
if err != nil {
c.recordEventInfof(r, "New revision detected! Scaling up %s.%s", r.Spec.TargetRef.Name, r.Namespace)
if err = deployer.Scale(r, 1); err != nil {
c.recordEventErrorf(r, "%v", err)
return false
}
status := flaggerv1.CanaryStatus{
State: "running",
FailedChecks: 0,
}
err := deployer.SyncStatus(r, status)
if err != nil {
if err := deployer.SyncStatus(r, flaggerv1.CanaryStatus{State: "running"}); err != nil {
c.logger.Errorf("%v", err)
return false
}
c.recordEventInfof(r, "Scaling up %s.%s", r.Spec.TargetRef.Name, r.Namespace)
return false
}
return false
}
func (c *Controller) checkCanaryMetrics(r *flaggerv1.Canary) bool {
observer := &CanaryObserver{
metricsServer: c.metricsServer,
}
for _, metric := range r.Spec.CanaryAnalysis.Metrics {
if metric.Name == "istio_requests_total" {
val, err := observer.GetDeploymentCounter(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval)
val, err := c.observer.GetDeploymentCounter(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval)
if err != nil {
c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.metricsServer, err)
c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.metricsServer, err)
return false
}
if float64(metric.Threshold) > val {
@@ -248,9 +205,9 @@ func (c *Controller) checkCanaryMetrics(r *flaggerv1.Canary) bool {
}
if metric.Name == "istio_request_duration_seconds_bucket" {
val, err := observer.GetDeploymentHistogram(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval)
val, err := c.observer.GetDeploymentHistogram(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval)
if err != nil {
c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.metricsServer, err)
c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.metricsServer, err)
return false
}
t := time.Duration(metric.Threshold) * time.Millisecond