Refactor rollout advancement

This commit is contained in:
Stefan Prodan
2018-09-25 13:30:07 +03:00
parent da2b6c85aa
commit b9309703e0
3 changed files with 137 additions and 66 deletions

View File

@@ -5,6 +5,10 @@
Steerer is a Kubernetes operator that automates the promotion of canary deployments
using Istio routing for traffic shifting and Prometheus metrics for canary analysis.
Steerer requires two Kubernetes deployments: one for the version you want to upgrade called primary
and one for the canary. Each deployment has a corresponding ClusterIP service.
These services are used as destinations in a Istio virtual service.
Gated rollout stages:
* scan for deployments marked for rollout
@@ -28,7 +32,36 @@ Gated rollout stages:
* mark rollout as finished
* wait for the canary deployment to be updated (revision bump) and start over
A rollout can be defined using steerer's custom resource:
Steerer requires two Kubernetes deployments: one for the version you want to upgrade called primary
and one for the canary. Each deployment has a corresponding ClusterIP service.
These services are used as destinations in an Istio virtual service.
Assuming my primary deployment is named podinfo and my canary one podinfo-canary, steerer will require
a virtual service configured with weight-based routing:
```yaml
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: podinfo
spec:
hosts:
- podinfo
http:
- route:
- destination:
host: podinfo
port:
number: 9898
weight: 100
- destination:
host: podinfo-canary
port:
number: 9898
weight: 0
```
Based on the two deployments, services and virtual service, a rollout can be defined using steerer's custom resource:
```yaml
apiVersion: apps.weave.works/v1beta1

View File

@@ -125,7 +125,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
for {
select {
case <-tickChan:
c.doRollout()
c.doRollouts()
case <-stopCh:
c.logger.Info("Shutting down workers")
return nil

View File

@@ -14,7 +14,7 @@ const (
statusAnnotation = "apps.weave.works/status"
)
func (c *Controller) doRollout() {
func (c *Controller) doRollouts() {
c.rollouts.Range(func(key interface{}, value interface{}) bool {
r := value.(*rolloutv1.Rollout)
if r.Spec.TargetKind == "Deployment" {
@@ -25,31 +25,38 @@ func (c *Controller) doRollout() {
}
func (c *Controller) advanceDeploymentRollout(name string, namespace string) {
// gate stage: check if the rollout exists
r, ok := c.getRollout(name, namespace)
if !ok {
return
}
// gate stage: check if primary deployment exists and is healthy
primary, ok := c.getDeployment(r.Spec.Primary.Name, r.Namespace)
if !ok {
return
}
// gate stage: check if canary deployment exists and is healthy
canary, ok := c.getDeployment(r.Spec.Canary.Name, r.Namespace)
if !ok {
return
}
// gate stage: check if virtual service exists
// and if it contains weighted destination routes to the primary and canary services
vs, primaryRoute, canaryRoute, ok := c.getVirtualService(r)
if !ok {
return
}
if ok := c.updateRolloutAnnotations(r, canary.ResourceVersion); !ok {
// gate stage: check if rollout should start (canary revision has changes) or continue
if ok := c.checkRolloutStatus(r, canary.ResourceVersion); !ok {
return
}
// skip HTTP error rate check when no traffic is routed to canary
// gate stage: check if the canary success rate is above the threshold
// skip check if no traffic is routed to canary
if canaryRoute.Weight == 0 {
c.recordEventInfof(r, "Starting rollout for %s.%s", r.Name, r.Namespace)
} else {
@@ -58,88 +65,64 @@ func (c *Controller) advanceDeploymentRollout(name string, namespace string) {
}
}
// routing stage: increase canary traffic percentage
if canaryRoute.Weight != 100 {
primaryRoute.Weight -= 10
canaryRoute.Weight += 10
vs.Spec.Http = []istiov1alpha3.HTTPRoute{
{
Route: []istiov1alpha3.DestinationWeight{primaryRoute, canaryRoute},
},
primaryRoute.Weight -= r.Spec.VirtualService.Weight
if primaryRoute.Weight > 100 {
primaryRoute.Weight = 100
}
canaryRoute.Weight += r.Spec.VirtualService.Weight
if primaryRoute.Weight < 0 {
primaryRoute.Weight = 0
}
_, err := c.istioClient.NetworkingV1alpha3().VirtualServices(r.Namespace).Update(vs)
if err != nil {
c.recordEventErrorf(r, "VirtualService %s.%s update failed: %v", r.Spec.VirtualService.Name, r.Namespace, err)
if ok := c.updateVirtualServiceRoutes(r, vs, primaryRoute, canaryRoute); !ok {
return
} else {
c.recordEventInfof(r, "Advance rollout %s.%s weight %v", r.Name, r.Namespace, canaryRoute.Weight)
}
c.recordEventInfof(r, "Advance rollout %s.%s weight %v", r.Name, r.Namespace, canaryRoute.Weight)
// promotion stage: override primary.template.spec with the canary spec
if canaryRoute.Weight == 100 {
c.recordEventInfof(r, "Copying %s.%s template spec to %s.%s",
canary.GetName(), canary.Namespace, primary.GetName(), primary.Namespace)
primary.Spec.Template.Spec = canary.Spec.Template.Spec
_, err = c.kubeClient.AppsV1().Deployments(primary.Namespace).Update(primary)
_, err := c.kubeClient.AppsV1().Deployments(primary.Namespace).Update(primary)
if err != nil {
c.recordEventErrorf(r, "Deployment %s.%s promotion failed: %v", primary.GetName(), primary.Namespace, err)
return
}
}
} else {
// final stage: route all traffic back to primary
primaryRoute.Weight = 100
canaryRoute.Weight = 0
vs.Spec.Http = []istiov1alpha3.HTTPRoute{
{
Route: []istiov1alpha3.DestinationWeight{primaryRoute, canaryRoute},
},
}
vs.Annotations[statusAnnotation] = "finished"
_, err := c.istioClient.NetworkingV1alpha3().VirtualServices(r.Namespace).Update(vs)
if err != nil {
c.recordEventErrorf(r, "VirtualService %s.%s annotations update failed: %v", r.Spec.VirtualService.Name, r.Namespace, err)
if ok := c.updateVirtualServiceRoutes(r, vs, primaryRoute, canaryRoute); !ok {
return
}
// final stage: mark rollout as finished and scale canary to zero replicas
c.checkRolloutStatus(r, "finished")
c.recordEventInfof(r, "%s.%s promotion complete! Scaling down %s.%s",
r.Name, r.Namespace, canary.GetName(), canary.Namespace)
c.scaleToZeroCanary(r)
}
}
func (c *Controller) scaleToZeroCanary(r *rolloutv1.Rollout) {
canary, err := c.kubeClient.AppsV1().Deployments(r.Namespace).Get(r.Spec.Canary.Name, v1.GetOptions{})
func (c *Controller) getRollout(name string, namespace string) (*rolloutv1.Rollout, bool) {
r, err := c.rolloutClient.AppsV1beta1().Rollouts(namespace).Get(name, v1.GetOptions{})
if err != nil {
c.recordEventErrorf(r, "Deployment %s.%s not found", r.Spec.Canary.Name, r.Namespace)
return
}
//HPA https://github.com/kubernetes/kubernetes/pull/29212
canary.Spec.Replicas = int32p(0)
_, err = c.kubeClient.AppsV1().Deployments(canary.Namespace).Update(canary)
if err != nil {
c.recordEventErrorf(r, "Scaling down %s.%s failed: %v", canary.GetName(), canary.Namespace, err)
return
c.logger.Errorf("Rollout %s.%s not found", name, namespace)
return nil, false
}
return r, true
}
func (c *Controller) checkDeploymentSuccessRate(r *rolloutv1.Rollout) bool {
val, err := c.getDeploymentMetric(r.Spec.Canary.Name, r.Namespace, r.Spec.Metric.Name, r.Spec.Metric.Interval)
if err != nil {
c.recordEventErrorf(r, "Metric query error: %v", err)
return false
}
if float64(r.Spec.Metric.Threshold) > val {
c.recordEventErrorf(r, "Halt rollout %s.%s success rate %.2f%% < %v%%",
r.Name, r.Namespace, val, r.Spec.Metric.Threshold)
return false
}
return true
}
func (c *Controller) updateRolloutAnnotations(r *rolloutv1.Rollout, canaryVersion string) bool {
func (c *Controller) checkRolloutStatus(r *rolloutv1.Rollout, canaryVersion string) bool {
var err error
if val, ok := r.Annotations[revisionAnnotation]; !ok {
var err error
r.Annotations[revisionAnnotation] = canaryVersion
r.Annotations[statusAnnotation] = "running"
r, err = c.rolloutClient.AppsV1beta1().Rollouts(r.Namespace).Update(r)
@@ -153,7 +136,6 @@ func (c *Controller) updateRolloutAnnotations(r *rolloutv1.Rollout, canaryVersio
return true
}
if val != canaryVersion {
var err error
r.Annotations[revisionAnnotation] = canaryVersion
r.Annotations[statusAnnotation] = "running"
r, err = c.rolloutClient.AppsV1beta1().Rollouts(r.Namespace).Update(r)
@@ -164,18 +146,19 @@ func (c *Controller) updateRolloutAnnotations(r *rolloutv1.Rollout, canaryVersio
return true
}
}
return false
}
func (c *Controller) getRollout(name string, namespace string) (*rolloutv1.Rollout, bool) {
r, err := c.rolloutClient.AppsV1beta1().Rollouts(namespace).Get(name, v1.GetOptions{})
func (c *Controller) updateRolloutStatus(r *rolloutv1.Rollout, status string) bool {
var err error
r.Annotations[statusAnnotation] = "running"
r, err = c.rolloutClient.AppsV1beta1().Rollouts(r.Namespace).Update(r)
if err != nil {
c.logger.Errorf("Rollout %s.%s not found", name, namespace)
return nil, false
c.recordEventErrorf(r, "Rollout %s.%s status update failed: %v", r.Name, r.Namespace, err)
return false
}
return true
return r, true
}
func (c *Controller) getDeployment(name string, namespace string) (*appsv1.Deployment, bool) {
@@ -197,6 +180,37 @@ func (c *Controller) getDeployment(name string, namespace string) (*appsv1.Deplo
return dep, true
}
func (c *Controller) checkDeploymentSuccessRate(r *rolloutv1.Rollout) bool {
val, err := c.getDeploymentMetric(r.Spec.Canary.Name, r.Namespace, r.Spec.Metric.Name, r.Spec.Metric.Interval)
if err != nil {
c.recordEventErrorf(r, "Metric query error: %v", err)
return false
}
if float64(r.Spec.Metric.Threshold) > val {
c.recordEventErrorf(r, "Halt rollout %s.%s success rate %.2f%% < %v%%",
r.Name, r.Namespace, val, r.Spec.Metric.Threshold)
return false
}
return true
}
func (c *Controller) scaleToZeroCanary(r *rolloutv1.Rollout) {
canary, err := c.kubeClient.AppsV1().Deployments(r.Namespace).Get(r.Spec.Canary.Name, v1.GetOptions{})
if err != nil {
c.recordEventErrorf(r, "Deployment %s.%s not found", r.Spec.Canary.Name, r.Namespace)
return
}
//HPA https://github.com/kubernetes/kubernetes/pull/29212
canary.Spec.Replicas = int32p(0)
_, err = c.kubeClient.AppsV1().Deployments(canary.Namespace).Update(canary)
if err != nil {
c.recordEventErrorf(r, "Scaling down %s.%s failed: %v", canary.GetName(), canary.Namespace, err)
return
}
}
func (c *Controller) getVirtualService(r *rolloutv1.Rollout) (
vs *istiov1alpha3.VirtualService,
primary istiov1alpha3.DestinationWeight,
@@ -206,7 +220,7 @@ func (c *Controller) getVirtualService(r *rolloutv1.Rollout) (
var err error
vs, err = c.istioClient.NetworkingV1alpha3().VirtualServices(r.Namespace).Get(r.Spec.VirtualService.Name, v1.GetOptions{})
if err != nil {
c.logger.Errorf("VirtualService %s.%s not found", r.Spec.VirtualService.Name, r.Namespace)
c.recordEventErrorf(r, "VirtualService %s.%s not found", r.Spec.VirtualService.Name, r.Namespace)
return
}
@@ -222,7 +236,7 @@ func (c *Controller) getVirtualService(r *rolloutv1.Rollout) (
}
if primary.Weight == 0 && canary.Weight == 0 {
c.logger.Errorf("VirtualService %s.%s does not contain routes for %s and %s",
c.recordEventErrorf(r, "VirtualService %s.%s does not contain routes for %s and %s",
r.Spec.VirtualService.Name, r.Namespace, r.Spec.Primary.Host, r.Spec.Canary.Host)
return
}
@@ -231,6 +245,27 @@ func (c *Controller) getVirtualService(r *rolloutv1.Rollout) (
return
}
func (c *Controller) updateVirtualServiceRoutes(
r *rolloutv1.Rollout,
vs *istiov1alpha3.VirtualService,
primary istiov1alpha3.DestinationWeight,
canary istiov1alpha3.DestinationWeight,
) bool {
vs.Spec.Http = []istiov1alpha3.HTTPRoute{
{
Route: []istiov1alpha3.DestinationWeight{primary, canary},
},
}
var err error
vs, err = c.istioClient.NetworkingV1alpha3().VirtualServices(r.Namespace).Update(vs)
if err != nil {
c.recordEventErrorf(r, "VirtualService %s.%s update failed: %v", r.Spec.VirtualService.Name, r.Namespace, err)
return false
}
return true
}
func getDeploymentStatus(deployment *appsv1.Deployment) (string, bool) {
if deployment.Generation <= deployment.Status.ObservedGeneration {
cond := getDeploymentCondition(deployment.Status, appsv1.DeploymentProgressing)
@@ -253,10 +288,13 @@ func getDeploymentStatus(deployment *appsv1.Deployment) (string, bool) {
return "ready", true
}
func getDeploymentCondition(status appsv1.DeploymentStatus, condType appsv1.DeploymentConditionType) *appsv1.DeploymentCondition {
func getDeploymentCondition(
status appsv1.DeploymentStatus,
conditionType appsv1.DeploymentConditionType,
) *appsv1.DeploymentCondition {
for i := range status.Conditions {
c := status.Conditions[i]
if c.Type == condType {
if c.Type == conditionType {
return &c
}
}