mirror of
https://github.com/fluxcd/flagger.git
synced 2026-02-14 18:10:00 +00:00
feat: add knative integration
Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com> Co-authored-by: Thomas Banks
This commit is contained in:
committed by
Sanskar Jaiswal
parent
8276bfa5a5
commit
f1c8807c0d
@@ -463,6 +463,14 @@ type LocalObjectReference struct {
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
func (l *LocalObjectReference) IsKnativeService() bool {
|
||||
if l.Kind == "Service" && l.APIVersion == "serving.knative.dev/v1" {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
type AutoscalerRefernce struct {
|
||||
// API version of the scaler
|
||||
// +required
|
||||
|
||||
@@ -7,6 +7,7 @@ const (
|
||||
IstioProvider string = "istio"
|
||||
SMIProvider string = "smi"
|
||||
ContourProvider string = "contour"
|
||||
KnativeProvider string = "knative"
|
||||
GlooProvider string = "gloo"
|
||||
NGINXProvider string = "nginx"
|
||||
KubernetesProvider string = "kubernetes"
|
||||
|
||||
@@ -20,12 +20,15 @@ import (
|
||||
"go.uber.org/zap"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
"github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
|
||||
clientset "github.com/fluxcd/flagger/pkg/client/clientset/versioned"
|
||||
knative "knative.dev/serving/pkg/client/clientset/versioned"
|
||||
)
|
||||
|
||||
type Factory struct {
|
||||
kubeClient kubernetes.Interface
|
||||
flaggerClient clientset.Interface
|
||||
knativeClient knative.Interface
|
||||
logger *zap.SugaredLogger
|
||||
configTracker Tracker
|
||||
labels []string
|
||||
@@ -34,6 +37,7 @@ type Factory struct {
|
||||
|
||||
func NewFactory(kubeClient kubernetes.Interface,
|
||||
flaggerClient clientset.Interface,
|
||||
knativeClient knative.Interface,
|
||||
configTracker Tracker,
|
||||
labels []string,
|
||||
includeLabelPrefix []string,
|
||||
@@ -41,6 +45,7 @@ func NewFactory(kubeClient kubernetes.Interface,
|
||||
return &Factory{
|
||||
kubeClient: kubeClient,
|
||||
flaggerClient: flaggerClient,
|
||||
knativeClient: knativeClient,
|
||||
logger: logger,
|
||||
configTracker: configTracker,
|
||||
labels: labels,
|
||||
@@ -48,7 +53,7 @@ func NewFactory(kubeClient kubernetes.Interface,
|
||||
}
|
||||
}
|
||||
|
||||
func (factory *Factory) Controller(kind string) Controller {
|
||||
func (factory *Factory) Controller(obj v1beta1.LocalObjectReference) Controller {
|
||||
deploymentCtrl := &DeploymentController{
|
||||
logger: factory.logger,
|
||||
kubeClient: factory.kubeClient,
|
||||
@@ -71,14 +76,22 @@ func (factory *Factory) Controller(kind string) Controller {
|
||||
flaggerClient: factory.flaggerClient,
|
||||
includeLabelPrefix: factory.includeLabelPrefix,
|
||||
}
|
||||
knativeCtrl := &KnativeController{
|
||||
flaggerClient: factory.flaggerClient,
|
||||
knativeClient: factory.knativeClient,
|
||||
}
|
||||
|
||||
switch kind {
|
||||
switch obj.Kind {
|
||||
case "DaemonSet":
|
||||
return daemonSetCtrl
|
||||
case "Deployment":
|
||||
return deploymentCtrl
|
||||
case "Service":
|
||||
return serviceCtrl
|
||||
if obj.IsKnativeService() {
|
||||
return knativeCtrl
|
||||
} else {
|
||||
return serviceCtrl
|
||||
}
|
||||
default:
|
||||
return deploymentCtrl
|
||||
}
|
||||
|
||||
161
pkg/canary/knative_controller.go
Normal file
161
pkg/canary/knative_controller.go
Normal file
@@ -0,0 +1,161 @@
|
||||
package canary
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
|
||||
clientset "github.com/fluxcd/flagger/pkg/client/clientset/versioned"
|
||||
"go.uber.org/zap"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
serving "knative.dev/serving/pkg/apis/serving/v1"
|
||||
knative "knative.dev/serving/pkg/client/clientset/versioned"
|
||||
)
|
||||
|
||||
type KnativeController struct {
|
||||
flaggerClient clientset.Interface
|
||||
knativeClient knative.Interface
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
// IsPrimaryReady checks if the primary revision is ready
|
||||
func (kc *KnativeController) IsPrimaryReady(cd *flaggerv1.Canary) (bool, error) {
|
||||
service, err := kc.knativeClient.ServingV1().Services(cd.Namespace).Get(context.TODO(), cd.Spec.TargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("Knative Service %s.%s get query error: %w", cd.Spec.TargetRef.Name, cd.Namespace, err)
|
||||
}
|
||||
revisionName, exists := service.Annotations["flagger.app/primary-revision"]
|
||||
if !exists {
|
||||
return true, fmt.Errorf("Knative Service %s.%s primary revision annotation not found", cd.Spec.TargetRef.Name, cd.Namespace)
|
||||
}
|
||||
revision, err := kc.knativeClient.ServingV1().Revisions(cd.Namespace).Get(context.TODO(), revisionName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("Knative Revision %s.%s get query error: %w", revisionName, cd.Namespace, err)
|
||||
}
|
||||
if !revision.IsReady() {
|
||||
return true, fmt.Errorf("Knative Revision %s.%s is not ready", revision.Name, cd.Namespace)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// IsCanaryReady checks if the canary revision is ready
|
||||
func (kc *KnativeController) IsCanaryReady(cd *flaggerv1.Canary) (bool, error) {
|
||||
service, err := kc.knativeClient.ServingV1().Services(cd.Namespace).Get(context.TODO(), cd.Spec.TargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("Knative Service %s.%s get query error: %w", cd.Spec.TargetRef.Name, cd.Namespace, err)
|
||||
}
|
||||
revision, err := kc.knativeClient.ServingV1().Revisions(cd.Namespace).Get(context.TODO(), service.Status.LatestCreatedRevisionName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("Knative Revision %s.%s get query error: %w", service.Status.LatestCreatedRevisionName, cd.Namespace, err)
|
||||
}
|
||||
if !revision.IsReady() {
|
||||
return true, fmt.Errorf("Knative Revision %s.%s is not ready", revision.Name, cd.Namespace)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (kc *KnativeController) GetMetadata(canary *flaggerv1.Canary) (string, string, map[string]int32, error) {
|
||||
return "", "", make(map[string]int32), nil
|
||||
}
|
||||
|
||||
// SyncStatus encodes list of revisions and updates the canary status
|
||||
func (kc *KnativeController) SyncStatus(cd *flaggerv1.Canary, status flaggerv1.CanaryStatus) error {
|
||||
service, err := kc.knativeClient.ServingV1().Services(cd.Namespace).Get(context.TODO(), cd.Spec.TargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Knative Service %s.%s get query error: %w", cd.Spec.TargetRef.Name, cd.Namespace, err)
|
||||
}
|
||||
return syncCanaryStatus(kc.flaggerClient, cd, status, service.Status.LatestCreatedRevisionName, func(copy *flaggerv1.Canary) {})
|
||||
}
|
||||
|
||||
// SetStatusFailedChecks updates the canary failed checks counter
|
||||
func (kc *KnativeController) SetStatusFailedChecks(cd *flaggerv1.Canary, val int) error {
|
||||
return setStatusFailedChecks(kc.flaggerClient, cd, val)
|
||||
}
|
||||
|
||||
// SetStatusWeight updates the canary status weight value
|
||||
func (kc *KnativeController) SetStatusWeight(cd *flaggerv1.Canary, val int) error {
|
||||
return setStatusWeight(kc.flaggerClient, cd, val)
|
||||
}
|
||||
|
||||
// SetStatusIterations updates the canary status iterations value
|
||||
func (kc *KnativeController) SetStatusIterations(cd *flaggerv1.Canary, val int) error {
|
||||
return setStatusIterations(kc.flaggerClient, cd, val)
|
||||
}
|
||||
|
||||
// SetStatusPhase updates the canary status phase
|
||||
func (kc *KnativeController) SetStatusPhase(cd *flaggerv1.Canary, phase flaggerv1.CanaryPhase) error {
|
||||
return setStatusPhase(kc.flaggerClient, cd, phase)
|
||||
}
|
||||
|
||||
// Initialize configures the Knative Service to be used for canary rollouts.
|
||||
func (kc *KnativeController) Initialize(cd *flaggerv1.Canary) (bool, error) {
|
||||
if cd.Status.Phase == "" || cd.Status.Phase == flaggerv1.CanaryPhaseInitializing {
|
||||
service, err := kc.knativeClient.ServingV1().Services(cd.Namespace).Get(context.TODO(), cd.Spec.TargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("Knative Service %s.%s get query error: %w", cd.Spec.TargetRef.Name, cd.Namespace, err)
|
||||
}
|
||||
|
||||
if service.Annotations == nil {
|
||||
service.Annotations = make(map[string]string, 0)
|
||||
}
|
||||
service.Annotations["flagger.app/primary-revision"] = service.Status.LatestCreatedRevisionName
|
||||
|
||||
canaryPercent := int64(0)
|
||||
primaryPercent := int64(100)
|
||||
latestRevision := true
|
||||
traffic := []serving.TrafficTarget{
|
||||
{
|
||||
LatestRevision: &latestRevision,
|
||||
Percent: &canaryPercent,
|
||||
},
|
||||
{
|
||||
RevisionName: service.Status.LatestCreatedRevisionName,
|
||||
Percent: &primaryPercent,
|
||||
},
|
||||
}
|
||||
service.Spec.Traffic = traffic
|
||||
|
||||
_, err = kc.knativeClient.ServingV1().Services(cd.Namespace).Update(context.TODO(), service, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("Knative Service %s.%s update query error %w", cd.Spec.TargetRef.Name, cd.Namespace, err)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (kc *KnativeController) Promote(cd *flaggerv1.Canary) error {
|
||||
service, err := kc.knativeClient.ServingV1().Services(cd.Namespace).Get(context.TODO(), cd.Spec.TargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Knative Service %s.%s get query error: %w", cd.Spec.TargetRef.Name, cd.Namespace, err)
|
||||
}
|
||||
service.Annotations["flagger.app/primary-revision"] = service.Status.LatestCreatedRevisionName
|
||||
_, err = kc.knativeClient.ServingV1().Services(cd.Namespace).Update(context.TODO(), service, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Knative Service %s.%s update query error %w", cd.Spec.TargetRef.Name, cd.Namespace, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kc *KnativeController) HasTargetChanged(cd *flaggerv1.Canary) (bool, error) {
|
||||
service, err := kc.knativeClient.ServingV1().Services(cd.Namespace).Get(context.TODO(), cd.Spec.TargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("Knative Service %s.%s get query error: %w", cd.Spec.TargetRef.Name, cd.Namespace, err)
|
||||
}
|
||||
return hasSpecChanged(cd, service.Status.LatestCreatedRevisionName)
|
||||
}
|
||||
|
||||
func (kc *KnativeController) HaveDependenciesChanged(canary *flaggerv1.Canary) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (kc *KnativeController) ScaleToZero(canary *flaggerv1.Canary) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kc *KnativeController) ScaleFromZero(canary *flaggerv1.Canary) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kc *KnativeController) Finalize(canary *flaggerv1.Canary) error {
|
||||
return nil
|
||||
}
|
||||
79
pkg/canary/knative_controller_test.go
Normal file
79
pkg/canary/knative_controller_test.go
Normal file
@@ -0,0 +1,79 @@
|
||||
package canary
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestKnativeController_Promote(t *testing.T) {
|
||||
mocks := newKnativeServiceFixture("podinfo")
|
||||
_, err := mocks.controller.Initialize(mocks.canary)
|
||||
require.NoError(t, err)
|
||||
|
||||
service, err := mocks.knativeClient.ServingV1().Services("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
service.Status.LatestCreatedRevisionName = "latest-revision"
|
||||
_, err = mocks.knativeClient.ServingV1().Services("default").UpdateStatus(context.TODO(), service, metav1.UpdateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = mocks.controller.Promote(mocks.canary)
|
||||
require.NoError(t, err)
|
||||
|
||||
service, err = mocks.knativeClient.ServingV1().Services("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "latest-revision", service.Annotations["flagger.app/primary-revision"])
|
||||
}
|
||||
|
||||
func TestKnativeController_Initialize(t *testing.T) {
|
||||
mocks := newKnativeServiceFixture("podinfo")
|
||||
|
||||
mocks.canary.Status.Phase = v1beta1.CanaryPhasePromoting
|
||||
ok, err := mocks.controller.Initialize(mocks.canary)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, true, ok)
|
||||
|
||||
service, err := mocks.knativeClient.ServingV1().Services("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, service.Annotations, 0)
|
||||
assert.Len(t, service.Spec.Traffic, 0)
|
||||
|
||||
mocks.canary.Status.Phase = v1beta1.CanaryPhaseInitializing
|
||||
|
||||
ok, err = mocks.controller.Initialize(mocks.canary)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, true, ok)
|
||||
|
||||
service, err = mocks.knativeClient.ServingV1().Services("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "podinfo-00001", service.Annotations["flagger.app/primary-revision"])
|
||||
assert.Len(t, service.Spec.Traffic, 2)
|
||||
assert.Equal(t, *service.Spec.Traffic[0].Percent, int64(0))
|
||||
assert.True(t, *service.Spec.Traffic[0].LatestRevision)
|
||||
assert.Equal(t, *service.Spec.Traffic[1].Percent, int64(100))
|
||||
assert.Equal(t, service.Spec.Traffic[1].RevisionName, "podinfo-00001")
|
||||
}
|
||||
|
||||
func TestKnativeController_HasTargetChanged(t *testing.T) {
|
||||
mocks := newKnativeServiceFixture("podinfo")
|
||||
_, err := mocks.controller.Initialize(mocks.canary)
|
||||
require.NoError(t, err)
|
||||
|
||||
service, err := mocks.knativeClient.ServingV1().Services("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
mocks.canary.Status.LastAppliedSpec = ComputeHash(service.Status.LatestCreatedRevisionName)
|
||||
|
||||
service.Status.LatestCreatedRevisionName = "latest-revision"
|
||||
_, err = mocks.knativeClient.ServingV1().Services("default").UpdateStatus(context.TODO(), service, metav1.UpdateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
ok, err := mocks.controller.HasTargetChanged(mocks.canary)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, ok)
|
||||
}
|
||||
99
pkg/canary/knative_fixture_test.go
Normal file
99
pkg/canary/knative_fixture_test.go
Normal file
@@ -0,0 +1,99 @@
|
||||
package canary
|
||||
|
||||
import (
|
||||
"go.uber.org/zap"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
|
||||
clientset "github.com/fluxcd/flagger/pkg/client/clientset/versioned"
|
||||
fakeFlagger "github.com/fluxcd/flagger/pkg/client/clientset/versioned/fake"
|
||||
"github.com/fluxcd/flagger/pkg/logger"
|
||||
serving "knative.dev/serving/pkg/apis/serving/v1"
|
||||
knative "knative.dev/serving/pkg/client/clientset/versioned"
|
||||
fakeKnative "knative.dev/serving/pkg/client/clientset/versioned/fake"
|
||||
)
|
||||
|
||||
type knativeControllerFixture struct {
|
||||
canary *flaggerv1.Canary
|
||||
flaggerClient clientset.Interface
|
||||
knativeClient knative.Interface
|
||||
controller KnativeController
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func newKnativeServiceFixture(name string) knativeControllerFixture {
|
||||
canary := newKnativeControllerTestCanary(name)
|
||||
flaggerClient := fakeFlagger.NewSimpleClientset(canary)
|
||||
|
||||
knativeClient := fakeKnative.NewSimpleClientset(newKnativeControllerTestService(name))
|
||||
|
||||
logger, _ := logger.NewLogger("debug")
|
||||
|
||||
ctrl := KnativeController{
|
||||
flaggerClient: flaggerClient,
|
||||
knativeClient: knativeClient,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
return knativeControllerFixture{
|
||||
canary: canary,
|
||||
controller: ctrl,
|
||||
logger: logger,
|
||||
flaggerClient: flaggerClient,
|
||||
knativeClient: knativeClient,
|
||||
}
|
||||
}
|
||||
|
||||
func newKnativeControllerTestService(name string) *serving.Service {
|
||||
s := &serving.Service{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: serving.SchemeGroupVersion.String()},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: "default",
|
||||
},
|
||||
Spec: serving.ServiceSpec{
|
||||
ConfigurationSpec: serving.ConfigurationSpec{
|
||||
Template: serving.RevisionTemplateSpec{
|
||||
Spec: serving.RevisionSpec{
|
||||
PodSpec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "podinfo",
|
||||
Image: "quay.io/stefanprodan/podinfo:1.2.0",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: serving.ServiceStatus{
|
||||
ConfigurationStatusFields: serving.ConfigurationStatusFields{
|
||||
LatestCreatedRevisionName: "podinfo-00001",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func newKnativeControllerTestCanary(name string) *flaggerv1.Canary {
|
||||
cd := &flaggerv1.Canary{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: flaggerv1.SchemeGroupVersion.String()},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: "podinfo",
|
||||
},
|
||||
Spec: flaggerv1.CanarySpec{
|
||||
Provider: "knative",
|
||||
TargetRef: flaggerv1.LocalObjectReference{
|
||||
Name: name,
|
||||
APIVersion: "serving.knative.dev/v1",
|
||||
Kind: "Service",
|
||||
},
|
||||
Analysis: &flaggerv1.CanaryAnalysis{},
|
||||
},
|
||||
}
|
||||
return cd
|
||||
}
|
||||
@@ -45,6 +45,7 @@ import (
|
||||
"github.com/fluxcd/flagger/pkg/metrics/observers"
|
||||
"github.com/fluxcd/flagger/pkg/notifier"
|
||||
"github.com/fluxcd/flagger/pkg/router"
|
||||
knative "knative.dev/serving/pkg/client/clientset/versioned"
|
||||
)
|
||||
|
||||
const controllerAgentName = "flagger"
|
||||
@@ -53,6 +54,7 @@ const controllerAgentName = "flagger"
|
||||
type Controller struct {
|
||||
kubeConfig *rest.Config
|
||||
kubeClient kubernetes.Interface
|
||||
knativeClient knative.Interface
|
||||
flaggerClient clientset.Interface
|
||||
flaggerInformers Informers
|
||||
flaggerSynced cache.InformerSynced
|
||||
@@ -81,6 +83,7 @@ type Informers struct {
|
||||
|
||||
func NewController(
|
||||
kubeClient kubernetes.Interface,
|
||||
knativeClient knative.Interface,
|
||||
flaggerClient clientset.Interface,
|
||||
flaggerInformers Informers,
|
||||
flaggerWindow time.Duration,
|
||||
@@ -111,6 +114,7 @@ func NewController(
|
||||
ctrl := &Controller{
|
||||
kubeConfig: kubeConfig,
|
||||
kubeClient: kubeClient,
|
||||
knativeClient: knativeClient,
|
||||
flaggerClient: flaggerClient,
|
||||
flaggerInformers: flaggerInformers,
|
||||
flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced,
|
||||
@@ -330,6 +334,10 @@ func (c *Controller) verifyCanary(canary *flaggerv1.Canary) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := verifyKnativeCanary(canary); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -352,6 +360,24 @@ func verifyNoCrossNamespaceRefs(canary *flaggerv1.Canary) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func verifyKnativeCanary(canary *flaggerv1.Canary) error {
|
||||
if canary.Spec.TargetRef.IsKnativeService() != (canary.Spec.Provider == flaggerv1.KnativeProvider) {
|
||||
if canary.Spec.TargetRef.IsKnativeService() {
|
||||
return fmt.Errorf("can't use %s provider with Knative Service as target", canary.Spec.Provider)
|
||||
}
|
||||
return fmt.Errorf("can't use %s/%s as target if provider is set to knative",
|
||||
canary.Spec.TargetRef.APIVersion, canary.Spec.TargetRef.Kind)
|
||||
}
|
||||
|
||||
if canary.Spec.TargetRef.IsKnativeService() {
|
||||
if canary.Spec.AutoscalerRef != nil {
|
||||
return fmt.Errorf("can't use autoscaler with Knative Service")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkCustomResourceType(obj interface{}, logger *zap.SugaredLogger) (flaggerv1.Canary, bool) {
|
||||
var roll *flaggerv1.Canary
|
||||
var ok bool
|
||||
|
||||
@@ -90,6 +90,78 @@ func TestController_verifyCanary(t *testing.T) {
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "knative provider with non-knative service should return an error",
|
||||
canary: flaggerv1.Canary{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cd-1",
|
||||
Namespace: "default",
|
||||
},
|
||||
Spec: flaggerv1.CanarySpec{
|
||||
Provider: "knative",
|
||||
TargetRef: flaggerv1.LocalObjectReference{
|
||||
Kind: "Deployment",
|
||||
Name: "podinfo",
|
||||
},
|
||||
},
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "knative service with non-knative provider should return an error",
|
||||
canary: flaggerv1.Canary{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cd-1",
|
||||
Namespace: "default",
|
||||
},
|
||||
Spec: flaggerv1.CanarySpec{
|
||||
Provider: "istio",
|
||||
TargetRef: flaggerv1.LocalObjectReference{
|
||||
Kind: "Service",
|
||||
APIVersion: "serving.knative.dev/v1",
|
||||
Name: "podinfo",
|
||||
},
|
||||
},
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "knative service with autoscaler ref should return an error",
|
||||
canary: flaggerv1.Canary{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cd-1",
|
||||
Namespace: "default",
|
||||
},
|
||||
Spec: flaggerv1.CanarySpec{
|
||||
Provider: "knative",
|
||||
AutoscalerRef: &flaggerv1.AutoscalerRefernce{},
|
||||
TargetRef: flaggerv1.LocalObjectReference{
|
||||
Kind: "Service",
|
||||
APIVersion: "serving.knative.dev/v1",
|
||||
Name: "podinfo",
|
||||
},
|
||||
},
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "knative service with knative provider is okay",
|
||||
canary: flaggerv1.Canary{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cd-1",
|
||||
Namespace: "default",
|
||||
},
|
||||
Spec: flaggerv1.CanarySpec{
|
||||
Provider: "knative",
|
||||
TargetRef: flaggerv1.LocalObjectReference{
|
||||
Kind: "Service",
|
||||
APIVersion: "serving.knative.dev/v1",
|
||||
Name: "podinfo",
|
||||
},
|
||||
},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
ctrl := &Controller{
|
||||
|
||||
@@ -43,7 +43,7 @@ func (c *Controller) finalize(old interface{}) error {
|
||||
}
|
||||
|
||||
// Retrieve a controller
|
||||
canaryController := c.canaryFactory.Controller(canary.Spec.TargetRef.Kind)
|
||||
canaryController := c.canaryFactory.Controller(canary.Spec.TargetRef)
|
||||
|
||||
// Set the status to terminating if not already in that state
|
||||
if canary.Status.Phase != flaggerv1.CanaryPhaseTerminating {
|
||||
|
||||
@@ -179,7 +179,7 @@ func (c *Controller) advanceCanary(name string, namespace string) {
|
||||
}
|
||||
|
||||
// init controller based on target kind
|
||||
canaryController := c.canaryFactory.Controller(cd.Spec.TargetRef.Kind)
|
||||
canaryController := c.canaryFactory.Controller(cd.Spec.TargetRef)
|
||||
|
||||
labelSelector, labelValue, ports, err := canaryController.GetMetadata(cd)
|
||||
if err != nil {
|
||||
|
||||
@@ -92,7 +92,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture {
|
||||
}
|
||||
|
||||
// init router
|
||||
rf := router.NewFactory(nil, kubeClient, flaggerClient, "annotationsPrefix", "", logger, flaggerClient, true)
|
||||
rf := router.NewFactory(nil, kubeClient, flaggerClient, nil, "annotationsPrefix", "", logger, flaggerClient, true)
|
||||
|
||||
// init observer
|
||||
observerFactory, _ := observers.NewFactory(testMetricsServerURL)
|
||||
@@ -103,7 +103,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture {
|
||||
KubeClient: kubeClient,
|
||||
FlaggerClient: flaggerClient,
|
||||
}
|
||||
canaryFactory := canary.NewFactory(kubeClient, flaggerClient, configTracker, []string{"app", "name"}, []string{""}, logger)
|
||||
canaryFactory := canary.NewFactory(kubeClient, flaggerClient, nil, configTracker, []string{"app", "name"}, []string{""}, logger)
|
||||
|
||||
ctrl := &Controller{
|
||||
kubeClient: kubeClient,
|
||||
@@ -129,8 +129,10 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture {
|
||||
meshRouter := rf.MeshRouter("istio", "")
|
||||
|
||||
return daemonSetFixture{
|
||||
canary: c,
|
||||
deployer: canaryFactory.Controller("DaemonSet"),
|
||||
canary: c,
|
||||
deployer: canaryFactory.Controller(flaggerv1.LocalObjectReference{
|
||||
Kind: "DaemonSet",
|
||||
}),
|
||||
logger: logger,
|
||||
flaggerClient: flaggerClient,
|
||||
meshClient: flaggerClient,
|
||||
|
||||
@@ -121,7 +121,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture {
|
||||
}
|
||||
|
||||
// init router
|
||||
rf := router.NewFactory(nil, kubeClient, flaggerClient, "annotationsPrefix", "", logger, flaggerClient, true)
|
||||
rf := router.NewFactory(nil, kubeClient, flaggerClient, nil, "annotationsPrefix", "", logger, flaggerClient, true)
|
||||
|
||||
// init observer
|
||||
observerFactory, _ := observers.NewFactory(testMetricsServerURL)
|
||||
@@ -132,7 +132,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture {
|
||||
KubeClient: kubeClient,
|
||||
FlaggerClient: flaggerClient,
|
||||
}
|
||||
canaryFactory := canary.NewFactory(kubeClient, flaggerClient, configTracker, []string{"app", "name"}, []string{""}, logger)
|
||||
canaryFactory := canary.NewFactory(kubeClient, flaggerClient, nil, configTracker, []string{"app", "name"}, []string{""}, logger)
|
||||
|
||||
ctrl := &Controller{
|
||||
kubeClient: kubeClient,
|
||||
@@ -159,8 +159,10 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture {
|
||||
meshRouter := rf.MeshRouter("istio", "")
|
||||
|
||||
return fixture{
|
||||
canary: c,
|
||||
deployer: canaryFactory.Controller("Deployment"),
|
||||
canary: c,
|
||||
deployer: canaryFactory.Controller(flaggerv1.LocalObjectReference{
|
||||
Kind: "Deployment",
|
||||
}),
|
||||
logger: logger,
|
||||
flaggerClient: flaggerClient,
|
||||
meshClient: flaggerClient,
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
|
||||
"github.com/fluxcd/flagger/pkg/metrics/observers"
|
||||
"github.com/fluxcd/flagger/pkg/metrics/providers"
|
||||
serving "knative.dev/serving/pkg/apis/serving/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -110,10 +111,20 @@ func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
|
||||
}
|
||||
}
|
||||
// set the metrics provider to query Prometheus for the canary Kubernetes service if the canary target is Service
|
||||
if canary.Spec.TargetRef.Kind == "Service" {
|
||||
if canary.Spec.TargetRef.Kind == "Service" && !canary.Spec.TargetRef.IsKnativeService() {
|
||||
metricsProvider = metricsProvider + MetricsProviderServiceSuffix
|
||||
}
|
||||
|
||||
var knativeService *serving.Service
|
||||
if canary.Spec.Provider == flaggerv1.KnativeProvider || c.meshProvider == flaggerv1.KnativeProvider {
|
||||
var err error
|
||||
knativeService, err = c.knativeClient.ServingV1().Services(canary.Namespace).Get(context.TODO(), canary.Spec.TargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
c.recordEventErrorf(canary, "Error fetching Knative service %s/%s %v", canary.Namespace, canary.Spec.TargetRef.Name, err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// create observer based on the mesh provider
|
||||
observerFactory := c.observerFactory
|
||||
|
||||
@@ -135,7 +146,11 @@ func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
|
||||
}
|
||||
|
||||
if metric.Name == "request-success-rate" {
|
||||
val, err := observer.GetRequestSuccessRate(toMetricModel(canary, metric.Interval, metric.TemplateVariables))
|
||||
model := toMetricModel(canary, metric.Interval, metric.TemplateVariables)
|
||||
if knativeService != nil {
|
||||
model.Route = knativeService.Status.LatestCreatedRevisionName
|
||||
}
|
||||
val, err := observer.GetRequestSuccessRate(model)
|
||||
if err != nil {
|
||||
if errors.Is(err, providers.ErrNoValuesFound) {
|
||||
c.recordEventWarningf(canary,
|
||||
@@ -167,7 +182,11 @@ func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
|
||||
}
|
||||
|
||||
if metric.Name == "request-duration" {
|
||||
val, err := observer.GetRequestDuration(toMetricModel(canary, metric.Interval, metric.TemplateVariables))
|
||||
model := toMetricModel(canary, metric.Interval, metric.TemplateVariables)
|
||||
if knativeService != nil {
|
||||
model.Route = knativeService.Status.LatestCreatedRevisionName
|
||||
}
|
||||
val, err := observer.GetRequestDuration(model)
|
||||
if err != nil {
|
||||
if errors.Is(err, providers.ErrNoValuesFound) {
|
||||
c.recordEventWarningf(canary, "Halt advancement no values found for %s metric %s probably %s.%s is not receiving traffic",
|
||||
@@ -199,7 +218,11 @@ func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
|
||||
|
||||
// in-line PromQL
|
||||
if metric.Query != "" {
|
||||
query, err := observers.RenderQuery(metric.Query, toMetricModel(canary, metric.Interval, metric.TemplateVariables))
|
||||
model := toMetricModel(canary, metric.Interval, metric.TemplateVariables)
|
||||
if knativeService != nil {
|
||||
model.Route = knativeService.Status.LatestCreatedRevisionName
|
||||
}
|
||||
query, err := observers.RenderQuery(metric.Query, model)
|
||||
val, err := observerFactory.Client.RunQuery(query)
|
||||
if err != nil {
|
||||
if errors.Is(err, providers.ErrNoValuesFound) {
|
||||
@@ -235,6 +258,16 @@ func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
|
||||
}
|
||||
|
||||
func (c *Controller) runMetricChecks(canary *flaggerv1.Canary) bool {
|
||||
var knativeService *serving.Service
|
||||
if canary.Spec.Provider == flaggerv1.KnativeProvider || c.meshProvider == flaggerv1.KnativeProvider {
|
||||
var err error
|
||||
knativeService, err = c.knativeClient.ServingV1().Services(canary.Namespace).Get(context.TODO(), canary.Spec.TargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
c.recordEventErrorf(canary, "Error fetching Knative service %s/%s %v", canary.Namespace, canary.Spec.TargetRef.Name, err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
for _, metric := range canary.GetAnalysis().Metrics {
|
||||
if metric.TemplateRef != nil {
|
||||
namespace := canary.Namespace
|
||||
@@ -267,7 +300,11 @@ func (c *Controller) runMetricChecks(canary *flaggerv1.Canary) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
query, err := observers.RenderQuery(template.Spec.Query, toMetricModel(canary, metric.Interval, metric.TemplateVariables))
|
||||
model := toMetricModel(canary, metric.Interval, metric.TemplateVariables)
|
||||
if knativeService != nil {
|
||||
model.Route = knativeService.Status.LatestCreatedRevisionName
|
||||
}
|
||||
query, err := observers.RenderQuery(template.Spec.Query, model)
|
||||
c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, namespace)).
|
||||
Debugf("Metric template %s.%s query: %s", metric.TemplateRef.Name, namespace, query)
|
||||
if err != nil {
|
||||
|
||||
@@ -92,6 +92,10 @@ func (factory Factory) Observer(provider string) Interface {
|
||||
return &ApisixObserver{
|
||||
client: factory.Client,
|
||||
}
|
||||
case provider == flaggerv1.KnativeProvider:
|
||||
return &KnativeObserver{
|
||||
client: factory.Client,
|
||||
}
|
||||
default:
|
||||
return &IstioObserver{
|
||||
client: factory.Client,
|
||||
|
||||
92
pkg/metrics/observers/knative.go
Normal file
92
pkg/metrics/observers/knative.go
Normal file
@@ -0,0 +1,92 @@
|
||||
/*
|
||||
Copyright 2024 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package observers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
|
||||
"github.com/fluxcd/flagger/pkg/metrics/providers"
|
||||
)
|
||||
|
||||
//envoy_cluster_name="default/hello-00001"
|
||||
|
||||
var knativeQueries = map[string]string{
|
||||
"request-success-rate": `
|
||||
sum(
|
||||
rate(
|
||||
envoy_cluster_upstream_rq{
|
||||
envoy_cluster_name=~"{{ namespace }}/{{ route }}",
|
||||
envoy_response_code!~"5.*"
|
||||
}[{{ interval }}]
|
||||
)
|
||||
)
|
||||
/
|
||||
sum(
|
||||
rate(
|
||||
envoy_cluster_upstream_rq{
|
||||
envoy_cluster_name=~"{{ namespace }}/{{ route }}",
|
||||
}[{{ interval }}]
|
||||
)
|
||||
)
|
||||
* 100`,
|
||||
"request-duration": `
|
||||
histogram_quantile(
|
||||
0.99,
|
||||
sum(
|
||||
rate(
|
||||
envoy_cluster_upstream_rq_time_bucket{
|
||||
envoy_cluster_name=~"{{ namespace }}/{{ route }}",
|
||||
}[{{ interval }}]
|
||||
)
|
||||
) by (le)
|
||||
)`,
|
||||
}
|
||||
|
||||
type KnativeObserver struct {
|
||||
client providers.Interface
|
||||
}
|
||||
|
||||
func (ob *KnativeObserver) GetRequestSuccessRate(model flaggerv1.MetricTemplateModel) (float64, error) {
|
||||
query, err := RenderQuery(knativeQueries["request-success-rate"], model)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("rendering query failed: %w", err)
|
||||
}
|
||||
|
||||
value, err := ob.client.RunQuery(query)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("running query failed: %w", err)
|
||||
}
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
||||
func (ob *KnativeObserver) GetRequestDuration(model flaggerv1.MetricTemplateModel) (time.Duration, error) {
|
||||
query, err := RenderQuery(knativeQueries["request-duration"], model)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("rendering query failed: %w", err)
|
||||
}
|
||||
|
||||
value, err := ob.client.RunQuery(query)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("running query failed: %w", err)
|
||||
}
|
||||
|
||||
ms := time.Duration(int64(value)) * time.Millisecond
|
||||
return ms, nil
|
||||
}
|
||||
102
pkg/metrics/observers/knative_test.go
Normal file
102
pkg/metrics/observers/knative_test.go
Normal file
@@ -0,0 +1,102 @@
|
||||
/*
|
||||
Copyright 2024 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package observers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
|
||||
"github.com/fluxcd/flagger/pkg/metrics/providers"
|
||||
)
|
||||
|
||||
func TestKnativeObserver_GetRequestSuccessRate(t *testing.T) {
|
||||
expected := ` sum( rate( envoy_cluster_upstream_rq{ envoy_cluster_name=~"default/podinfo-00001", envoy_response_code!~"5.*" }[1m] ) ) / sum( rate( envoy_cluster_upstream_rq{ envoy_cluster_name=~"default/podinfo-00001", }[1m] ) ) * 100`
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
promql := r.URL.Query()["query"][0]
|
||||
assert.Equal(t, expected, promql)
|
||||
|
||||
json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"100"]}]}}`
|
||||
w.Write([]byte(json))
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{
|
||||
Type: "prometheus",
|
||||
Address: ts.URL,
|
||||
SecretRef: nil,
|
||||
}, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
observer := &KnativeObserver{
|
||||
client: client,
|
||||
}
|
||||
|
||||
val, err := observer.GetRequestSuccessRate(flaggerv1.MetricTemplateModel{
|
||||
Name: "podinfo",
|
||||
Namespace: "default",
|
||||
Target: "podinfo",
|
||||
Service: "podinfo",
|
||||
Route: "podinfo-00001",
|
||||
Interval: "1m",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, float64(100), val)
|
||||
}
|
||||
|
||||
func TestKnativeObserver_GetRequestDuration(t *testing.T) {
|
||||
expected := ` histogram_quantile( 0.99, sum( rate( envoy_cluster_upstream_rq_time_bucket{ envoy_cluster_name=~"default/podinfo-00001", }[1m] ) ) by (le) )`
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
promql := r.URL.Query()["query"][0]
|
||||
assert.Equal(t, expected, promql)
|
||||
|
||||
json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"100"]}]}}`
|
||||
w.Write([]byte(json))
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
client, err := providers.NewPrometheusProvider(flaggerv1.MetricTemplateProvider{
|
||||
Type: "prometheus",
|
||||
Address: ts.URL,
|
||||
SecretRef: nil,
|
||||
}, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
observer := &KnativeObserver{
|
||||
client: client,
|
||||
}
|
||||
|
||||
val, err := observer.GetRequestDuration(flaggerv1.MetricTemplateModel{
|
||||
Name: "podinfo",
|
||||
Namespace: "default",
|
||||
Target: "podinfo",
|
||||
Service: "podinfo",
|
||||
Route: "podinfo-00001",
|
||||
Interval: "1m",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 100*time.Millisecond, val)
|
||||
}
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
|
||||
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
|
||||
clientset "github.com/fluxcd/flagger/pkg/client/clientset/versioned"
|
||||
knative "knative.dev/serving/pkg/client/clientset/versioned"
|
||||
)
|
||||
|
||||
type Factory struct {
|
||||
@@ -32,6 +33,7 @@ type Factory struct {
|
||||
kubeClient kubernetes.Interface
|
||||
meshClient clientset.Interface
|
||||
flaggerClient clientset.Interface
|
||||
knativeClient knative.Interface
|
||||
ingressAnnotationsPrefix string
|
||||
ingressClass string
|
||||
logger *zap.SugaredLogger
|
||||
@@ -40,6 +42,7 @@ type Factory struct {
|
||||
|
||||
func NewFactory(kubeConfig *restclient.Config, kubeClient kubernetes.Interface,
|
||||
flaggerClient clientset.Interface,
|
||||
knativeClient knative.Interface,
|
||||
ingressAnnotationsPrefix string,
|
||||
ingressClass string,
|
||||
logger *zap.SugaredLogger,
|
||||
@@ -50,6 +53,7 @@ func NewFactory(kubeConfig *restclient.Config, kubeClient kubernetes.Interface,
|
||||
meshClient: meshClient,
|
||||
kubeClient: kubeClient,
|
||||
flaggerClient: flaggerClient,
|
||||
knativeClient: knativeClient,
|
||||
ingressAnnotationsPrefix: ingressAnnotationsPrefix,
|
||||
ingressClass: ingressClass,
|
||||
logger: logger,
|
||||
@@ -150,6 +154,10 @@ func (factory *Factory) MeshRouter(provider string, labelSelector string) Interf
|
||||
ingressClass: factory.ingressClass,
|
||||
setOwnerRefs: factory.setOwnerRefs,
|
||||
}
|
||||
case provider == flaggerv1.KnativeProvider:
|
||||
return &KnativeRouter{
|
||||
knativeClient: factory.knativeClient,
|
||||
}
|
||||
case strings.HasPrefix(provider, flaggerv1.GlooProvider):
|
||||
return &GlooRouter{
|
||||
logger: factory.logger,
|
||||
|
||||
127
pkg/router/knative.go
Normal file
127
pkg/router/knative.go
Normal file
@@ -0,0 +1,127 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
|
||||
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
|
||||
"go.uber.org/zap"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
serving "knative.dev/serving/pkg/apis/serving/v1"
|
||||
knative "knative.dev/serving/pkg/client/clientset/versioned"
|
||||
)
|
||||
|
||||
type KnativeRouter struct {
|
||||
knativeClient knative.Interface
|
||||
logger *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func (kr *KnativeRouter) Reconcile(canary *flaggerv1.Canary) error {
|
||||
service, err := kr.knativeClient.ServingV1().Services(canary.Namespace).Get(context.TODO(), canary.Spec.TargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Knative Service %s.%s get query error: %w", canary.Spec.TargetRef.Name, canary.Namespace, err)
|
||||
}
|
||||
|
||||
if _, ok := service.Annotations["flagger.app/primary-revision"]; !ok {
|
||||
if service.Annotations == nil {
|
||||
service.Annotations = make(map[string]string)
|
||||
}
|
||||
service.Annotations["flagger.app/primary-revision"] = service.Status.LatestCreatedRevisionName
|
||||
_, err = kr.knativeClient.ServingV1().Services(canary.Namespace).Update(context.TODO(), service, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Knative Service %s.%s update query error: %w", canary.Spec.TargetRef.Name, canary.Namespace, err)
|
||||
}
|
||||
kr.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
|
||||
Infof("Knative Service %s.%s updated", service.Name, service.Namespace)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kr *KnativeRouter) SetRoutes(cd *flaggerv1.Canary, primaryWeight int, canaryWeight int, mirrored bool) error {
|
||||
service, err := kr.knativeClient.ServingV1().Services(cd.Namespace).Get(context.TODO(), cd.Spec.TargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Knative Service %s.%s get query error: %w", cd.Spec.TargetRef.Name, cd.Namespace, err)
|
||||
}
|
||||
|
||||
primaryName, exists := service.Annotations["flagger.app/primary-revision"]
|
||||
if !exists {
|
||||
return fmt.Errorf("Knative Service %s.%s annotation not found", cd.Spec.TargetRef.Name, cd.Namespace)
|
||||
}
|
||||
|
||||
canaryPercent := int64(canaryWeight)
|
||||
primaryPercent := int64(primaryWeight)
|
||||
latestRevision := true
|
||||
traffic := []serving.TrafficTarget{
|
||||
{
|
||||
LatestRevision: &latestRevision,
|
||||
Percent: &canaryPercent,
|
||||
},
|
||||
{
|
||||
RevisionName: primaryName,
|
||||
Percent: &primaryPercent,
|
||||
},
|
||||
}
|
||||
service.Spec.Traffic = traffic
|
||||
|
||||
service, err = kr.knativeClient.ServingV1().Services(cd.Namespace).Update(context.TODO(), service, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Knative Service %s.%s update query error %w", cd.Spec.TargetRef.Name, cd.Namespace, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kr *KnativeRouter) GetRoutes(cd *flaggerv1.Canary) (primaryWeight int, canaryWeight int, mirrored bool, error error) {
|
||||
service, err := kr.knativeClient.ServingV1().Services(cd.Namespace).Get(context.TODO(), cd.Spec.TargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
error = fmt.Errorf("service %s.%s get query error: %w", cd.Spec.TargetRef.Name, cd.Namespace, err)
|
||||
return
|
||||
}
|
||||
primaryName, exists := service.Annotations["flagger.app/primary-revision"]
|
||||
if !exists {
|
||||
error = fmt.Errorf("service %s.%s annotation not found", cd.Spec.TargetRef.Name, cd.Namespace)
|
||||
return
|
||||
}
|
||||
|
||||
canaryRevisionIdx := slices.IndexFunc(service.Status.Traffic, func(target serving.TrafficTarget) bool {
|
||||
return *target.LatestRevision
|
||||
})
|
||||
primaryRevisionIdx := slices.IndexFunc(service.Status.Traffic, func(target serving.TrafficTarget) bool {
|
||||
return target.RevisionName == primaryName
|
||||
})
|
||||
|
||||
if canaryRevisionIdx == -1 || primaryRevisionIdx == -1 {
|
||||
error = fmt.Errorf("Knative Service %s.%s traffic spec invalid", cd.Spec.TargetRef.Name, cd.Namespace)
|
||||
return
|
||||
}
|
||||
if service.Status.Traffic[primaryRevisionIdx].Percent == nil {
|
||||
error = fmt.Errorf("Knative Service %s.%s primary revision traffic percent does not exist", cd.Spec.TargetRef.Name, cd.Namespace)
|
||||
return
|
||||
}
|
||||
if service.Status.Traffic[canaryRevisionIdx].Percent == nil {
|
||||
error = fmt.Errorf("Knative Service %s.%s canary revision traffic percent does not exist", cd.Spec.TargetRef.Name, cd.Namespace)
|
||||
return
|
||||
}
|
||||
|
||||
return int(*service.Status.Traffic[primaryRevisionIdx].Percent), int(*service.Status.Traffic[canaryRevisionIdx].Percent), false, nil
|
||||
}
|
||||
|
||||
func (kr *KnativeRouter) Finalize(canary *flaggerv1.Canary) error {
|
||||
service, err := kr.knativeClient.ServingV1().Services(canary.Namespace).Get(context.TODO(), canary.Spec.TargetRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Knative Service %s.%s get query error: %w", canary.Spec.TargetRef.Name, canary.Namespace, err)
|
||||
}
|
||||
|
||||
if _, ok := service.Annotations["flagger.app/primary-revision"]; ok {
|
||||
delete(service.Annotations, "flagger.app/primary-revision")
|
||||
_, err = kr.knativeClient.ServingV1().Services(canary.Namespace).Update(context.TODO(), service, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Knative Service %s.%s update query error: %w", canary.Spec.TargetRef.Name, canary.Namespace, err)
|
||||
}
|
||||
kr.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).
|
||||
Infof("Knative Service %s.%s updated", service.Name, service.Namespace)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
96
pkg/router/knative_test.go
Normal file
96
pkg/router/knative_test.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
serving "knative.dev/serving/pkg/apis/serving/v1"
|
||||
)
|
||||
|
||||
func TestKnativeRouter_Reconcile(t *testing.T) {
|
||||
canary := newTestKnativeCanary()
|
||||
mocks := newFixture(canary)
|
||||
|
||||
router := &KnativeRouter{
|
||||
knativeClient: mocks.knativeClient,
|
||||
logger: mocks.logger,
|
||||
}
|
||||
|
||||
err := router.Reconcile(canary)
|
||||
require.NoError(t, err)
|
||||
|
||||
service, err := mocks.knativeClient.ServingV1().Services("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "podinfo-00001", service.Annotations["flagger.app/primary-revision"])
|
||||
}
|
||||
|
||||
func TestKnativeRouter_SetRoutes(t *testing.T) {
|
||||
canary := newTestKnativeCanary()
|
||||
mocks := newFixture(canary)
|
||||
|
||||
router := &KnativeRouter{
|
||||
knativeClient: mocks.knativeClient,
|
||||
logger: mocks.logger,
|
||||
}
|
||||
|
||||
// error when annotation is not set
|
||||
err := router.SetRoutes(canary, 10, 90, false)
|
||||
require.Error(t, err)
|
||||
|
||||
err = router.Reconcile(canary)
|
||||
require.NoError(t, err)
|
||||
err = router.SetRoutes(canary, 10, 90, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
service, err := mocks.knativeClient.ServingV1().Services("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, service.Spec.Traffic, 2)
|
||||
assert.Equal(t, *service.Spec.Traffic[0].LatestRevision, true)
|
||||
assert.Equal(t, *service.Spec.Traffic[0].Percent, int64(90))
|
||||
assert.Equal(t, service.Spec.Traffic[1].RevisionName, "podinfo-00001")
|
||||
assert.Equal(t, *service.Spec.Traffic[1].Percent, int64(10))
|
||||
}
|
||||
|
||||
func TestKnativeRouter_GetRoutes(t *testing.T) {
|
||||
canary := newTestKnativeCanary()
|
||||
mocks := newFixture(canary)
|
||||
|
||||
router := &KnativeRouter{
|
||||
knativeClient: mocks.knativeClient,
|
||||
logger: mocks.logger,
|
||||
}
|
||||
|
||||
// error when annotation is not set
|
||||
_, _, _, err := router.GetRoutes(canary)
|
||||
require.Error(t, err)
|
||||
|
||||
err = router.Reconcile(canary)
|
||||
require.NoError(t, err)
|
||||
|
||||
service, err := mocks.knativeClient.ServingV1().Services("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
canaryPercent := int64(90)
|
||||
primaryPercent := int64(10)
|
||||
latestRevision := true
|
||||
service.Status.Traffic = []serving.TrafficTarget{
|
||||
{
|
||||
LatestRevision: &latestRevision,
|
||||
Percent: &canaryPercent,
|
||||
},
|
||||
{
|
||||
RevisionName: "podinfo-00001",
|
||||
Percent: &primaryPercent,
|
||||
},
|
||||
}
|
||||
_, err = mocks.knativeClient.ServingV1().Services("default").Update(context.TODO(), service, metav1.UpdateOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
pWeight, cWeight, _, err := router.GetRoutes(canary)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, pWeight, 10)
|
||||
assert.Equal(t, cWeight, 90)
|
||||
}
|
||||
@@ -21,11 +21,15 @@ import (
|
||||
"go.uber.org/zap"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
netv1 "k8s.io/api/networking/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
serving "knative.dev/serving/pkg/apis/serving/v1"
|
||||
knative "knative.dev/serving/pkg/client/clientset/versioned"
|
||||
fakeKnative "knative.dev/serving/pkg/client/clientset/versioned/fake"
|
||||
|
||||
appmesh "github.com/fluxcd/flagger/pkg/apis/appmesh"
|
||||
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
|
||||
@@ -43,6 +47,7 @@ type fixture struct {
|
||||
appmeshCanary *flaggerv1.Canary
|
||||
ingressCanary *flaggerv1.Canary
|
||||
kubeClient kubernetes.Interface
|
||||
knativeClient knative.Interface
|
||||
meshClient clientset.Interface
|
||||
flaggerClient clientset.Interface
|
||||
logger *zap.SugaredLogger
|
||||
@@ -83,6 +88,7 @@ func newFixture(c *flaggerv1.Canary) fixture {
|
||||
kubeClient: kubeClient,
|
||||
meshClient: meshClient,
|
||||
flaggerClient: flaggerClient,
|
||||
knativeClient: fakeKnative.NewSimpleClientset(newTestKnativeService()),
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
@@ -126,6 +132,39 @@ func newTestApisixRoute() *a6v2.ApisixRoute {
|
||||
return ar
|
||||
}
|
||||
|
||||
func newTestKnativeService() *serving.Service {
|
||||
s := &serving.Service{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: serving.SchemeGroupVersion.String()},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "podinfo",
|
||||
Namespace: "default",
|
||||
},
|
||||
Spec: serving.ServiceSpec{
|
||||
ConfigurationSpec: serving.ConfigurationSpec{
|
||||
Template: serving.RevisionTemplateSpec{
|
||||
Spec: serving.RevisionSpec{
|
||||
PodSpec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "podinfo",
|
||||
Image: "quay.io/stefanprodan/podinfo:1.2.0",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: serving.ServiceStatus{
|
||||
ConfigurationStatusFields: serving.ConfigurationStatusFields{
|
||||
LatestCreatedRevisionName: "podinfo-00001",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func newTestCanary() *flaggerv1.Canary {
|
||||
cd := &flaggerv1.Canary{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: flaggerv1.SchemeGroupVersion.String()},
|
||||
@@ -577,3 +616,39 @@ func newTestGatewayAPICanary() *flaggerv1.Canary {
|
||||
}
|
||||
return cd
|
||||
}
|
||||
|
||||
func newTestKnativeCanary() *flaggerv1.Canary {
|
||||
cd := &flaggerv1.Canary{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: flaggerv1.SchemeGroupVersion.String()},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: "podinfo",
|
||||
},
|
||||
Spec: flaggerv1.CanarySpec{
|
||||
Provider: "knative",
|
||||
TargetRef: flaggerv1.LocalObjectReference{
|
||||
Name: "podinfo",
|
||||
APIVersion: "serving.knative.dev/v1",
|
||||
Kind: "Service",
|
||||
},
|
||||
Analysis: &flaggerv1.CanaryAnalysis{
|
||||
Threshold: 10,
|
||||
StepWeight: 10,
|
||||
MaxWeight: 50,
|
||||
Metrics: []flaggerv1.CanaryMetric{
|
||||
{
|
||||
Name: "request-success-rate",
|
||||
Threshold: 99,
|
||||
Interval: "1m",
|
||||
},
|
||||
{
|
||||
Name: "request-duration",
|
||||
Threshold: 500,
|
||||
Interval: "1m",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return cd
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user