Files
flagger/pkg/canary/deployer.go

461 lines
14 KiB
Go

package canary
import (
"crypto/rand"
"fmt"
"io"
"github.com/google/go-cmp/cmp"
"github.com/mitchellh/hashstructure"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
hpav1 "k8s.io/api/autoscaling/v2beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3"
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"
)
// Deployer is managing the operations for Kubernetes deployment kind
type Deployer struct {
KubeClient kubernetes.Interface
FlaggerClient clientset.Interface
Logger *zap.SugaredLogger
ConfigTracker ConfigTracker
Labels []string
}
// Initialize creates the primary deployment, hpa,
// scales to zero the canary deployment and returns the pod selector label and container ports
func (c *Deployer) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error) {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
label, ports, err = c.createPrimaryDeployment(cd)
if err != nil {
return "", ports, fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err)
}
if cd.Status.Phase == "" || cd.Status.Phase == flaggerv1.CanaryPhaseInitializing {
if !skipLivenessChecks {
_, readyErr := c.IsPrimaryReady(cd)
if readyErr != nil {
return "", ports, readyErr
}
}
c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)
if err := c.Scale(cd, 0); err != nil {
return "", ports, err
}
}
if cd.Spec.AutoscalerRef != nil && cd.Spec.AutoscalerRef.Kind == "HorizontalPodAutoscaler" {
if err := c.reconcilePrimaryHpa(cd, true); err != nil {
return "", ports, fmt.Errorf("creating HorizontalPodAutoscaler %s.%s failed: %v", primaryName, cd.Namespace, err)
}
}
return label, ports, nil
}
// Promote copies the pod spec, secrets and config maps from canary to primary
func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", targetName)
canary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace)
}
return fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err)
}
label, err := c.getSelectorLabel(canary)
if err != nil {
return fmt.Errorf("invalid label selector! Deployment %s.%s spec.selector.matchLabels must contain selector 'app: %s'",
targetName, cd.Namespace, targetName)
}
primary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("deployment %s.%s not found", primaryName, cd.Namespace)
}
return fmt.Errorf("deployment %s.%s query error %v", primaryName, cd.Namespace, err)
}
// promote secrets and config maps
configRefs, err := c.ConfigTracker.GetTargetConfigs(cd)
if err != nil {
return err
}
if err := c.ConfigTracker.CreatePrimaryConfigs(cd, configRefs); err != nil {
return err
}
primaryCopy := primary.DeepCopy()
primaryCopy.Spec.ProgressDeadlineSeconds = canary.Spec.ProgressDeadlineSeconds
primaryCopy.Spec.MinReadySeconds = canary.Spec.MinReadySeconds
primaryCopy.Spec.RevisionHistoryLimit = canary.Spec.RevisionHistoryLimit
primaryCopy.Spec.Strategy = canary.Spec.Strategy
// update spec with primary secrets and config maps
primaryCopy.Spec.Template.Spec = c.ConfigTracker.ApplyPrimaryConfigs(canary.Spec.Template.Spec, configRefs)
// update pod annotations to ensure a rolling update
annotations, err := c.makeAnnotations(canary.Spec.Template.Annotations)
if err != nil {
return err
}
primaryCopy.Spec.Template.Annotations = annotations
primaryCopy.Spec.Template.Labels = makePrimaryLabels(canary.Spec.Template.Labels, primaryName, label)
// apply update
_, err = c.KubeClient.AppsV1().Deployments(cd.Namespace).Update(primaryCopy)
if err != nil {
return fmt.Errorf("updating deployment %s.%s template spec failed: %v",
primaryCopy.GetName(), primaryCopy.Namespace, err)
}
// update HPA
if cd.Spec.AutoscalerRef != nil && cd.Spec.AutoscalerRef.Kind == "HorizontalPodAutoscaler" {
if err := c.reconcilePrimaryHpa(cd, false); err != nil {
return fmt.Errorf("updating HorizontalPodAutoscaler %s.%s failed: %v", primaryName, cd.Namespace, err)
}
}
return nil
}
// HasDeploymentChanged returns true if the canary deployment pod spec has changed
func (c *Deployer) HasDeploymentChanged(cd *flaggerv1.Canary) (bool, error) {
targetName := cd.Spec.TargetRef.Name
canary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return false, fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace)
}
return false, fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err)
}
if cd.Status.LastAppliedSpec == "" {
return true, nil
}
newHash, err := hashstructure.Hash(canary.Spec.Template, nil)
if err != nil {
return false, fmt.Errorf("hash error %v", err)
}
// do not trigger a canary deployment on manual rollback
if cd.Status.LastPromotedSpec == fmt.Sprintf("%d", newHash) {
return false, nil
}
if cd.Status.LastAppliedSpec != fmt.Sprintf("%d", newHash) {
return true, nil
}
return false, nil
}
// Scale sets the canary deployment replicas
func (c *Deployer) Scale(cd *flaggerv1.Canary, replicas int32) error {
targetName := cd.Spec.TargetRef.Name
dep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace)
}
return fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err)
}
depCopy := dep.DeepCopy()
depCopy.Spec.Replicas = int32p(replicas)
_, err = c.KubeClient.AppsV1().Deployments(dep.Namespace).Update(depCopy)
if err != nil {
return fmt.Errorf("scaling %s.%s to %v failed: %v", depCopy.GetName(), depCopy.Namespace, replicas, err)
}
return nil
}
func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[string]int32, error) {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
canaryDep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return "", nil, fmt.Errorf("deployment %s.%s not found, retrying", targetName, cd.Namespace)
}
return "", nil, err
}
label, err := c.getSelectorLabel(canaryDep)
if err != nil {
return "", nil, fmt.Errorf("invalid label selector! Deployment %s.%s spec.selector.matchLabels must contain selector 'app: %s'",
targetName, cd.Namespace, targetName)
}
var ports map[string]int32
if cd.Spec.Service.PortDiscovery {
p, err := c.getPorts(cd, canaryDep)
if err != nil {
return "", nil, fmt.Errorf("port discovery failed with error: %v", err)
}
ports = p
}
primaryDep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{})
if errors.IsNotFound(err) {
// create primary secrets and config maps
configRefs, err := c.ConfigTracker.GetTargetConfigs(cd)
if err != nil {
return "", nil, err
}
if err := c.ConfigTracker.CreatePrimaryConfigs(cd, configRefs); err != nil {
return "", nil, err
}
annotations, err := c.makeAnnotations(canaryDep.Spec.Template.Annotations)
if err != nil {
return "", nil, err
}
replicas := int32(1)
if canaryDep.Spec.Replicas != nil && *canaryDep.Spec.Replicas > 0 {
replicas = *canaryDep.Spec.Replicas
}
// create primary deployment
primaryDep = &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: primaryName,
Namespace: cd.Namespace,
Labels: map[string]string{
label: primaryName,
},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(cd, schema.GroupVersionKind{
Group: flaggerv1.SchemeGroupVersion.Group,
Version: flaggerv1.SchemeGroupVersion.Version,
Kind: flaggerv1.CanaryKind,
}),
},
},
Spec: appsv1.DeploymentSpec{
ProgressDeadlineSeconds: canaryDep.Spec.ProgressDeadlineSeconds,
MinReadySeconds: canaryDep.Spec.MinReadySeconds,
RevisionHistoryLimit: canaryDep.Spec.RevisionHistoryLimit,
Replicas: int32p(replicas),
Strategy: canaryDep.Spec.Strategy,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
label: primaryName,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: makePrimaryLabels(canaryDep.Spec.Template.Labels, primaryName, label),
Annotations: annotations,
},
// update spec with the primary secrets and config maps
Spec: c.ConfigTracker.ApplyPrimaryConfigs(canaryDep.Spec.Template.Spec, configRefs),
},
},
}
_, err = c.KubeClient.AppsV1().Deployments(cd.Namespace).Create(primaryDep)
if err != nil {
return "", nil, err
}
c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Deployment %s.%s created", primaryDep.GetName(), cd.Namespace)
}
return label, ports, nil
}
func (c *Deployer) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
hpa, err := c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(cd.Spec.AutoscalerRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("HorizontalPodAutoscaler %s.%s not found, retrying",
cd.Spec.AutoscalerRef.Name, cd.Namespace)
}
return err
}
hpaSpec := hpav1.HorizontalPodAutoscalerSpec{
ScaleTargetRef: hpav1.CrossVersionObjectReference{
Name: primaryName,
Kind: hpa.Spec.ScaleTargetRef.Kind,
APIVersion: hpa.Spec.ScaleTargetRef.APIVersion,
},
MinReplicas: hpa.Spec.MinReplicas,
MaxReplicas: hpa.Spec.MaxReplicas,
Metrics: hpa.Spec.Metrics,
}
primaryHpaName := fmt.Sprintf("%s-primary", cd.Spec.AutoscalerRef.Name)
primaryHpa, err := c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(primaryHpaName, metav1.GetOptions{})
// create HPA
if errors.IsNotFound(err) {
primaryHpa = &hpav1.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: primaryHpaName,
Namespace: cd.Namespace,
Labels: hpa.Labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(cd, schema.GroupVersionKind{
Group: flaggerv1.SchemeGroupVersion.Group,
Version: flaggerv1.SchemeGroupVersion.Version,
Kind: flaggerv1.CanaryKind,
}),
},
},
Spec: hpaSpec,
}
_, err = c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Create(primaryHpa)
if err != nil {
return err
}
c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("HorizontalPodAutoscaler %s.%s created", primaryHpa.GetName(), cd.Namespace)
return nil
}
if err != nil {
return err
}
// update HPA
if !init && primaryHpa != nil {
diff := cmp.Diff(hpaSpec.Metrics, primaryHpa.Spec.Metrics)
if diff != "" || int32Default(hpaSpec.MinReplicas) != int32Default(primaryHpa.Spec.MinReplicas) || hpaSpec.MaxReplicas != primaryHpa.Spec.MaxReplicas {
fmt.Println(diff, hpaSpec.MinReplicas, primaryHpa.Spec.MinReplicas, hpaSpec.MaxReplicas, primaryHpa.Spec.MaxReplicas)
hpaClone := primaryHpa.DeepCopy()
hpaClone.Spec.MaxReplicas = hpaSpec.MaxReplicas
hpaClone.Spec.MinReplicas = hpaSpec.MinReplicas
hpaClone.Spec.Metrics = hpaSpec.Metrics
_, upErr := c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Update(hpaClone)
if upErr != nil {
return upErr
}
c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("HorizontalPodAutoscaler %s.%s updated", primaryHpa.GetName(), cd.Namespace)
}
}
return nil
}
// makeAnnotations appends an unique ID to annotations map
func (c *Deployer) makeAnnotations(annotations map[string]string) (map[string]string, error) {
idKey := "flagger-id"
res := make(map[string]string)
uuid := make([]byte, 16)
n, err := io.ReadFull(rand.Reader, uuid)
if n != len(uuid) || err != nil {
return res, err
}
uuid[8] = uuid[8]&^0xc0 | 0x80
uuid[6] = uuid[6]&^0xf0 | 0x40
id := fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:])
for k, v := range annotations {
if k != idKey {
res[k] = v
}
}
res[idKey] = id
return res, nil
}
// getSelectorLabel returns the selector match label
func (c *Deployer) getSelectorLabel(deployment *appsv1.Deployment) (string, error) {
for _, l := range c.Labels {
if _, ok := deployment.Spec.Selector.MatchLabels[l]; ok {
return l, nil
}
}
return "", fmt.Errorf("selector not found")
}
var sidecars = map[string]bool{
"istio-proxy": true,
"envoy": true,
}
// getPorts returns a list of all container ports
func (c *Deployer) getPorts(cd *flaggerv1.Canary, deployment *appsv1.Deployment) (map[string]int32, error) {
ports := make(map[string]int32)
for _, container := range deployment.Spec.Template.Spec.Containers {
// exclude service mesh proxies based on container name
if _, ok := sidecars[container.Name]; ok {
continue
}
for i, p := range container.Ports {
// exclude canary.service.port or canary.service.targetPort
if cd.Spec.Service.TargetPort.String() == "0" {
if p.ContainerPort == cd.Spec.Service.Port {
continue
}
} else {
if cd.Spec.Service.TargetPort.Type == intstr.Int {
if p.ContainerPort == cd.Spec.Service.TargetPort.IntVal {
continue
}
}
if cd.Spec.Service.TargetPort.Type == intstr.String {
if p.Name == cd.Spec.Service.TargetPort.StrVal {
continue
}
}
}
name := fmt.Sprintf("tcp-%s-%v", container.Name, i)
if p.Name != "" {
name = p.Name
}
ports[name] = p.ContainerPort
}
}
return ports, nil
}
func makePrimaryLabels(labels map[string]string, primaryName string, label string) map[string]string {
res := make(map[string]string)
for k, v := range labels {
if k != label {
res[k] = v
}
}
res[label] = primaryName
return res
}
func int32p(i int32) *int32 {
return &i
}
func int32Default(i *int32) int32 {
if i == nil {
return 1
}
return *i
}