Add gated roll out based on Istio HTTP success rate metric

- add Prometheus observer component
- halt rollover when success rate drops under the threshold
- scale to zero for canary deployment after promotion has succeeded
This commit is contained in:
Stefan Prodan
2018-09-22 15:37:30 +03:00
parent 19e7ae2fef
commit c9ba2f6cb3
5 changed files with 211 additions and 41 deletions

View File

@@ -2,8 +2,8 @@ apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
annotations:
apps.weave.works/canary-revision: ""
apps.weave.works/canary-status: ""
apps.weave.works/progressive-revision: ""
apps.weave.works/progressive-status: ""
labels:
app: podinfo
name: podinfo

View File

@@ -16,18 +16,24 @@ import (
)
var (
namespace string
masterURL string
kubeconfig string
window time.Duration
namespace string
masterURL string
kubeconfig string
window time.Duration
promURL string
threshold float64
thresholdWindow string
)
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&namespace, "namespace", "", "Kubernetes namespace.")
flag.StringVar(&namespace, "namespace", "", "Kubernetes namespace")
flag.StringVar(&promURL, "prometheus", "https://prometheus.istio.weavedx.com", "Prometheus URL")
flag.DurationVar(&window, "window", 10*time.Second, "wait interval between deployment rollouts")
flag.Float64Var(&threshold, "threshold", 99, "HTTP request success rate threshold (1-99) to halt the rollout")
flag.StringVar(&thresholdWindow, "interval", "1m", "HTTP request success rate query interval 30s 1m")
}
func main() {
@@ -62,10 +68,14 @@ func main() {
stopCh := signals.SetupSignalHandler()
deployer := pd.NewDeployer(kubeClient, istioClient, logger)
obs, err := pd.NewObserver(promURL, thresholdWindow)
if err != nil {
logger.Fatalf("Error parsing Prometheus URL: %v", err)
}
deployer := pd.NewDeployer(kubeClient, istioClient, obs, threshold, logger)
deployer.Run(namespace)
tickChan := time.NewTicker(window).C
for {
select {
case <-tickChan:
@@ -75,4 +85,3 @@ func main() {
}
}
}

View File

@@ -16,11 +16,19 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)
const (
enabledAnnotation = "apps.weave.works/progressive"
revisionAnnotation = "apps.weave.works/progressive-revision"
statusAnnotation = "apps.weave.works/progressive-status"
)
type Deployer struct {
KubeClientSet kubernetes.Interface
IstioClientSet istioclientset.Interface
Logger *zap.SugaredLogger
ProgressiveDeployments *sync.Map
Observer *Observer
Threshold float64
}
type ProgressiveDeployment struct {
@@ -32,18 +40,26 @@ type ProgressiveDeployment struct {
VirtualService string
}
func NewDeployer(kubeClient kubernetes.Interface, istioClient istioclientset.Interface, logger *zap.SugaredLogger) *Deployer {
func NewDeployer(
kubeClient kubernetes.Interface,
istioClient istioclientset.Interface,
observer *Observer,
threshold float64,
logger *zap.SugaredLogger,
) *Deployer {
d := &Deployer{
KubeClientSet: kubeClient,
IstioClientSet: istioClient,
Logger: logger,
ProgressiveDeployments: new(sync.Map),
Observer: observer,
Threshold: threshold,
}
return d
}
func (d *Deployer) Run(ns string) {
d.scanForDeployments(ns)
func (d *Deployer) Run(namespace string) {
d.scanForDeployments(namespace)
d.ProgressiveDeployments.Range(func(key interface{}, value interface{}) bool {
pd := value.(*ProgressiveDeployment)
go d.advanceDeployment(pd)
@@ -52,7 +68,6 @@ func (d *Deployer) Run(ns string) {
}
func (d *Deployer) scanForDeployments(namespace string) {
annotation := "apps.weave.works/progressive"
// scan for healthy deployments marked for progressive delivery
deployments := make(map[string]appsv1.Deployment)
@@ -62,15 +77,12 @@ func (d *Deployer) scanForDeployments(namespace string) {
return
}
for _, dep := range depList.Items {
if val, ok := dep.Annotations[annotation]; ok {
if val, ok := dep.Annotations[enabledAnnotation]; ok {
if val == "true" && !strings.Contains(dep.GetName(), "-canary") {
if msg, healthy := getDeploymentStatus(&dep); healthy {
deployments[dep.GetName()] = dep
} else {
d.Logger.Warnw(
"Ignoring deployment",
zap.String("name", dep.GetName()),
zap.String("reason", msg))
d.Logger.Infof("Ignoring deployment %s.%s %s", dep.GetName(), dep.Namespace, msg)
}
}
}
@@ -132,11 +144,11 @@ func (d *Deployer) makeProgressiveDeployment(dep *appsv1.Deployment) *Progressiv
func (d *Deployer) advanceDeployment(pd *ProgressiveDeployment) {
dep, err := d.KubeClientSet.AppsV1().Deployments(pd.Namespace).Get(pd.Deployment, v1.GetOptions{})
if err != nil {
d.Logger.Errorf("Deployment %s not found", pd.Deployment)
d.Logger.Errorf("Deployment %s.%s not found", pd.Deployment, pd.Namespace)
return
}
if msg, healthy := getDeploymentStatus(dep); !healthy {
d.Logger.Infof("Ignoring deployment %s %s", dep.GetName(), msg)
d.Logger.Infof("Ignoring deployment %s.%s %s", dep.GetName(), dep.Namespace, msg)
return
}
@@ -145,26 +157,29 @@ func (d *Deployer) advanceDeployment(pd *ProgressiveDeployment) {
d.Logger.Errorf("Canary deployment %s not found", pd.DeploymentCanary)
return
}
// ignore deployment if canary relicas are zero or deployment is not healthy
if canary.Spec.Replicas == nil || *canary.Spec.Replicas == 0 {
return
}
if msg, healthy := getDeploymentStatus(canary); !healthy {
d.Logger.Infof("Ignoring deployment %s %s", canary.GetName(), msg)
d.Logger.Infof("Ignoring deployment %s.%s %s", canary.GetName(), canary.Namespace, msg)
return
}
vsvc, err := d.IstioClientSet.NetworkingV1alpha3().VirtualServices(pd.Namespace).Get(pd.VirtualService, v1.GetOptions{})
if err != nil {
d.Logger.Errorf("Virtual Service %s not found", pd.VirtualService)
d.Logger.Errorf("Virtual Service %s.%s not found", pd.VirtualService, pd.Namespace)
return
}
shouldAdvance := false
crAnnotation := "apps.weave.works/canary-revision"
statusAnnotation := "apps.weave.works/canary-status"
if val, ok := vsvc.Annotations[crAnnotation]; !ok {
vsvc.Annotations[crAnnotation] = canary.ResourceVersion
if val, ok := vsvc.Annotations[revisionAnnotation]; !ok {
vsvc.Annotations[revisionAnnotation] = canary.ResourceVersion
vsvc.Annotations[statusAnnotation] = "running"
_, err := d.IstioClientSet.NetworkingV1alpha3().VirtualServices(pd.Namespace).Update(vsvc)
if err != nil {
d.Logger.Errorf("Virtual Service %s annotations update failed: %v", pd.VirtualService, err)
d.Logger.Errorf("Virtual Service %s.%s annotations update failed: %v", pd.VirtualService, pd.Namespace, err)
return
}
shouldAdvance = true
@@ -173,11 +188,11 @@ func (d *Deployer) advanceDeployment(pd *ProgressiveDeployment) {
shouldAdvance = true
}
if val != canary.ResourceVersion {
vsvc.Annotations[crAnnotation] = canary.ResourceVersion
vsvc.Annotations[revisionAnnotation] = canary.ResourceVersion
vsvc.Annotations[statusAnnotation] = "running"
_, err := d.IstioClientSet.NetworkingV1alpha3().VirtualServices(pd.Namespace).Update(vsvc)
if err != nil {
d.Logger.Errorf("Virtual Service %s annotations update failed: %v", pd.VirtualService, err)
d.Logger.Errorf("Virtual Service %s.%s annotations update failed: %v", pd.VirtualService, pd.Namespace, err)
return
}
shouldAdvance = true
@@ -202,18 +217,23 @@ func (d *Deployer) advanceDeployment(pd *ProgressiveDeployment) {
vsvc, err = d.IstioClientSet.NetworkingV1alpha3().VirtualServices(pd.Namespace).Get(pd.VirtualService, v1.GetOptions{})
if err != nil {
d.Logger.Errorf("Virtual Service %s not found", pd.VirtualService)
d.Logger.Errorf("Virtual Service %s.%s not found", pd.VirtualService, pd.Namespace)
return
}
if svcRoute.Weight == 0 && canaryRoute.Weight == 0 {
d.Logger.Errorf("Virtual Service %s does not contain routes for %s and %s",
pd.VirtualService, pd.Service, pd.ServiceCanary)
d.Logger.Errorf("Virtual Service %s.%s does not contain routes for %s and %s",
pd.VirtualService, pd.Namespace, pd.Service, pd.ServiceCanary)
return
}
// skip HTTP error rate check when there is no traffic
if canaryRoute.Weight == 0 {
d.Logger.Infof("Stating progressive deployment for %s", pd.Deployment)
d.Logger.Infof("Stating progressive deployment for %s.%s", pd.Deployment, pd.Namespace)
} else {
if !d.checkSuccessRate(pd.Namespace, pd.DeploymentCanary) {
return
}
}
if canaryRoute.Weight != 100 {
@@ -227,20 +247,20 @@ func (d *Deployer) advanceDeployment(pd *ProgressiveDeployment) {
_, err := d.IstioClientSet.NetworkingV1alpha3().VirtualServices(pd.Namespace).Update(vsvc)
if err != nil {
d.Logger.Errorf("Virtual Service %s update failed: %v", pd.VirtualService, err)
d.Logger.Errorf("Virtual Service %s.%s update failed: %v", pd.VirtualService, pd.Namespace, err)
return
} else {
d.Logger.Infof("Advance deployment %s canary weight %v", pd.Deployment, canaryRoute.Weight)
d.Logger.Infof("Advance deployment %s.%s weight %v", pd.DeploymentCanary, pd.Namespace, canaryRoute.Weight)
}
if canaryRoute.Weight == 100 {
d.Logger.Infof("Copying %s.%s template spec to %s.%s",
canary.GetName(), canary.Namespace, dep.GetName(), dep.Namespace)
dep.Spec.Template.Spec = canary.Spec.Template.Spec
_, err = d.KubeClientSet.AppsV1().Deployments(dep.Namespace).Update(dep)
if err != nil {
d.Logger.Errorf("Deployment %s promotion failed: %v", dep.GetName(), err)
d.Logger.Errorf("Deployment %s.%s promotion failed: %v", dep.GetName(), dep.Namespace, err)
return
} else {
d.Logger.Infof("Promote %s template spec to %s", canary.GetName(), dep.GetName())
}
}
} else {
@@ -254,11 +274,56 @@ func (d *Deployer) advanceDeployment(pd *ProgressiveDeployment) {
vsvc.Annotations[statusAnnotation] = "finished"
_, err = d.IstioClientSet.NetworkingV1alpha3().VirtualServices(pd.Namespace).Update(vsvc)
if err != nil {
d.Logger.Errorf("Virtual Service %s annotations update failed: %v", pd.VirtualService, err)
d.Logger.Errorf("Virtual Service %s.%s annotations update failed: %v", pd.VirtualService, pd.Namespace, err)
return
}
d.Logger.Infof("Deployment %s promotion complete", dep.GetName())
d.Logger.Infof("%s.%s promotion complete! Scaling down %s.%s",
dep.GetName(), dep.Namespace, canary.GetName(), canary.Namespace)
d.scaleToZeroCanary(pd)
}
}
func (d *Deployer) checkSuccessRate(namespace string, name string) (pass bool) {
val, err := d.Observer.GetDeploymentSuccessRate(namespace, name)
if err != nil {
d.Logger.Errorf("Observer Prometheus query error: %v", err)
return false
}
pass = val > d.Threshold
if !pass {
d.Logger.Warnf("%s.%s rollout halted due to low HTTP success rate %v threshold %v", name, namespace, val, d.Threshold)
}
return pass
}
func (d *Deployer) scaleToZeroCanary(pd *ProgressiveDeployment) {
canary, err := d.KubeClientSet.AppsV1().Deployments(pd.Namespace).Get(pd.DeploymentCanary, v1.GetOptions{})
if err != nil {
d.Logger.Errorf("Deployment %s.%s not found", pd.DeploymentCanary, pd.Namespace)
return
}
//HPA https://github.com/kubernetes/kubernetes/pull/29212
canary.Spec.Replicas = int32p(0)
_, err = d.KubeClientSet.AppsV1().Deployments(canary.Namespace).Update(canary)
if err != nil {
d.Logger.Errorf("Scaling down %s.%s failed: %v", canary.GetName(), canary.Namespace, err)
return
}
}
func (d *Deployer) deleteCanary(pd *ProgressiveDeployment) {
err := d.KubeClientSet.AppsV1().Deployments(pd.Namespace).Delete(pd.DeploymentCanary, &v1.DeleteOptions{})
if err != nil {
d.Logger.Errorf("Deleting deployment %s.%s failed: %v", pd.DeploymentCanary, pd.Namespace, err)
return
}
err = d.KubeClientSet.CoreV1().Services(pd.Namespace).Delete(pd.ServiceCanary, &v1.DeleteOptions{})
if err != nil {
d.Logger.Errorf("Deleting service %s.%s failed: %v", pd.ServiceCanary, pd.Namespace, err)
return
}
}
@@ -337,3 +402,7 @@ func getDeploymentCondition(status appsv1.DeploymentStatus, condType appsv1.Depl
}
return nil
}
func int32p(i int32) *int32 {
return &i
}

92
pkg/deployer/observer.go Normal file
View File

@@ -0,0 +1,92 @@
package deployer
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
)
type Observer struct {
URL *url.URL
Interval string
}
func NewObserver(addr string, interval string) (*Observer, error) {
promURL, err := url.Parse(addr)
if err != nil {
return nil, err
}
return &Observer{
URL: promURL,
Interval: interval,
}, nil
}
func (c *Observer) queryRange(query string) (*VectorQueryResponse, error) {
u, err := url.Parse(fmt.Sprintf("./api/v1/query?query=%s", query))
if err != nil {
return nil, err
}
u = c.URL.ResolveReference(u)
r, err := http.Get(u.String())
if err != nil {
return nil, err
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, fmt.Errorf("error reading body: %s", err.Error())
}
if 400 <= r.StatusCode {
return nil, fmt.Errorf("error response: %s", string(b))
}
var values VectorQueryResponse
err = json.Unmarshal(b, &values)
if err != nil {
return nil, fmt.Errorf("error unmarshaling result: %s, '%s'", err.Error(), string(b))
}
return &values, nil
}
type VectorQueryResponse struct {
Data struct {
Result []struct {
Metric struct {
Code string `json:"code"`
FunctionName string `json:"function_name"`
}
Value []interface{} `json:"value"`
}
}
}
func (c *Observer) GetDeploymentSuccessRate(namespace string, name string) (float64, error) {
var rate float64
querySt := url.QueryEscape(`sum(rate(istio_requests_total{reporter="destination",destination_workload_namespace=~"` + namespace + `",destination_workload=~"` + name + `",response_code!~"5.*"}[1m])) / sum(rate(istio_requests_total{reporter="destination",destination_workload_namespace=~"` + namespace + `",destination_workload=~"` + name + `"}[` + c.Interval + `])) * 100 `)
result, err := c.queryRange(querySt)
if err != nil {
return rate, err
}
for _, v := range result.Data.Result {
metricValue := v.Value[1]
switch metricValue.(type) {
case string:
f, err := strconv.ParseFloat(metricValue.(string), 64)
if err != nil {
return rate, err
}
rate = f
}
}
return rate, nil
}