feat: implement a Keptn metrics provider

Add a Keptn metrics provider for two resources:
* KeptnMetric: Verify the value of a single metric.
* Analysis (via AnalysisDefinition): Run a Keptn analysis over an
  interval validating SLOs.

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>
This commit is contained in:
Florian Bacher
2024-04-08 07:13:02 +02:00
committed by Sanskar Jaiswal
parent adc60596f5
commit 29c8bf5a22
12 changed files with 741 additions and 9 deletions

View File

@@ -1298,6 +1298,7 @@ spec:
- newrelic
- graphite
- dynatrace
- keptn
address:
description: API address of this provider
type: string

View File

@@ -259,6 +259,19 @@ rules:
- update
- patch
- delete
- apiGroups:
- metrics.keptn.sh
resources:
- keptnmetrics
- analyses
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- nonResourceURLs:
- /version
verbs:

View File

@@ -253,6 +253,7 @@ func main() {
fromEnv("EVENT_WEBHOOK_URL", eventWebhook),
clusterName,
noCrossNamespaceRefs,
cfg,
)
// leader election context

View File

@@ -668,3 +668,65 @@ Reference the template in the canary analysis:
max: 1000
interval: 1m
```
## Keptn
You can create custom metric checks using the Keptn provider.
This Provider allows to verify either the value of a single [KeptnMetric](https://keptn.sh/stable/docs/reference/crd-reference/metric/),
representing the value of a single metric,
or of a [Keptn Analysis](https://keptn.sh/stable/docs/reference/crd-reference/analysis/),
which provides a flexible grading logic for analysing and prioritising a number of different
metric values coming from different data sources.
This provider requires [Keptn](https://keptn.sh/stable/docs/installation/) to be installed in the cluster.
Example for a Keptn metric template:
```yaml
apiVersion: flagger.app/v1beta1
kind: MetricTemplate
metadata:
name: response-time
namespace: istio-system
spec:
provider:
type: keptn
query: keptnmetric/my-namespace/response-time/2m/reporter=destination
```
This will reference the `KeptnMetric` with the name `response-time` in
the namespace `my-namespace`, which could look like the following:
```yaml
apiVersion: metrics.keptn.sh/v1beta1
kind: KeptnMetric
metadata:
name: response-time
namespace: my-namespace
spec:
fetchIntervalSeconds: 10
provider:
name: my-prometheus-keptn-provider
query: histogram_quantile(0.8, sum by(le) (rate(http_server_request_latency_seconds_bucket{status_code='200',
job='simple-go-backend'}[5m[])))
```
The `query` contains the following components, which are divided by `/` characters:
```
<type>/<namespace>/<resource-name>/<timeframe>/<arguments>
```
* **type (required)**: Must be either `keptnmetric` or `analysis`.
* **namespace (required)**: The namespace of the referenced `KeptnMetric`/`AnalysisDefinition`.
* **resource-name (required):** The name of the referenced `KeptnMetric`/`AnalysisDefinition`.
* **timeframe (optional)**: The timeframe used for the Analysis.
This will usually be set to the same value as the analysis interval of a `Canary`.
Only relevant if the `type` is set to `analysis`.
* **arguments (optional)**: Arguments to be passed to an `Analysis`.
Arguments are passed as a list of key value pairs, separated by `;` characters,
e.g. `foo=bar;bar=foo`.
Only relevant if the `type` is set to `analysis`.
For the type `analysis`, the value returned by the provider is either `0`
(if the analysis failed), or `1` (analysis passed).

4
go.mod
View File

@@ -9,12 +9,14 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/go-logr/zapr v1.3.0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.12.4
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/influxdata/influxdb-client-go/v2 v2.13.0
github.com/prometheus/client_golang v1.19.1
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.7.0
google.golang.org/api v0.182.0
google.golang.org/genproto v0.0.0-20240528184218-531527333157
google.golang.org/grpc v1.64.0
@@ -47,7 +49,6 @@ require (
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
@@ -77,7 +78,6 @@ require (
golang.org/x/mod v0.15.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect

View File

@@ -1298,6 +1298,7 @@ spec:
- newrelic
- graphite
- dynatrace
- keptn
address:
description: API address of this provider
type: string

View File

@@ -241,6 +241,19 @@ rules:
- update
- patch
- delete
- apiGroups:
- metrics.keptn.sh
resources:
- keptnmetrics
- analyses
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- nonResourceURLs:
- /version
verbs:

View File

@@ -21,6 +21,8 @@ import (
"sync"
"time"
"k8s.io/client-go/rest"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
@@ -49,6 +51,7 @@ const controllerAgentName = "flagger"
// Controller is managing the canary objects and schedules canary deployments
type Controller struct {
kubeConfig *rest.Config
kubeClient kubernetes.Interface
flaggerClient clientset.Interface
flaggerInformers Informers
@@ -91,6 +94,7 @@ func NewController(
eventWebhook string,
clusterName string,
noCrossNamespaceRefs bool,
kubeConfig *rest.Config,
) *Controller {
logger.Debug("Creating event broadcaster")
flaggerscheme.AddToScheme(scheme.Scheme)
@@ -105,6 +109,7 @@ func NewController(
recorder.SetInfo(version, meshProvider)
ctrl := &Controller{
kubeConfig: kubeConfig,
kubeClient: kubeClient,
flaggerClient: flaggerClient,
flaggerInformers: flaggerInformers,

View File

@@ -74,7 +74,7 @@ func (c *Controller) checkMetricProviderAvailability(canary *flaggerv1.Canary) e
}
factory := providers.Factory{}
provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials)
provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials, c.kubeConfig)
if err != nil {
return fmt.Errorf("metric template %s.%s provider %s error: %v",
metric.TemplateRef.Name, namespace, template.Spec.Provider.Type, err)
@@ -260,7 +260,7 @@ func (c *Controller) runMetricChecks(canary *flaggerv1.Canary) bool {
}
factory := providers.Factory{}
provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials)
provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials, c.kubeConfig)
if err != nil {
c.recordEventErrorf(canary, "Metric template %s.%s provider %s error: %v",
metric.TemplateRef.Name, namespace, template.Spec.Provider.Type, err)

View File

@@ -18,15 +18,12 @@ package providers
import (
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
rest "k8s.io/client-go/rest"
)
type Factory struct{}
func (factory Factory) Provider(
metricInterval string,
provider flaggerv1.MetricTemplateProvider,
credentials map[string][]byte,
) (Interface, error) {
func (factory Factory) Provider(metricInterval string, provider flaggerv1.MetricTemplateProvider, credentials map[string][]byte, config *rest.Config) (Interface, error) {
switch provider.Type {
case "prometheus":
return NewPrometheusProvider(provider, credentials)
@@ -44,6 +41,8 @@ func (factory Factory) Provider(
return NewInfluxdbProvider(provider, credentials)
case "dynatrace":
return NewDynatraceProvider(metricInterval, provider, credentials)
case "keptn":
return NewKeptnProvider(config)
default:
return NewPrometheusProvider(provider, credentials)
}

View File

@@ -0,0 +1,256 @@
package providers
import (
"context"
"errors"
"fmt"
"k8s.io/klog/v2"
"strconv"
"strings"
"time"
"github.com/google/uuid"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)
// api version for the Keptn Metric CRDs
const (
apiVersion = "v1beta1"
groupName = "metrics.keptn.sh"
keptnMetricsResourceName = "keptnmetrics"
analysisResourceName = "analyses"
)
var keptnMetricsResource = schema.GroupVersionResource{
Group: groupName,
Version: apiVersion,
Resource: keptnMetricsResourceName,
}
var analysisResource = schema.GroupVersionResource{
Group: groupName,
Version: apiVersion,
Resource: analysisResourceName,
}
type queryObject struct {
GroupVersionResource schema.GroupVersionResource
ResourceName string
DurationString string
Namespace string
Arguments map[string]interface{}
}
type KeptnProvider struct {
client dynamic.Interface
analysisTimeout time.Duration
}
func NewKeptnProvider(cfg *rest.Config) (*KeptnProvider, error) {
if cfg == nil {
return nil, errors.New("could not initialize KeptnProvider: no KubeConfig provided")
}
client, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("could not initialize KeptnProvider: %w", err)
}
return &KeptnProvider{
client: client,
analysisTimeout: 10 * time.Second,
}, nil
}
// RunQuery fetches the value of a KeptnMetric or Analysis,
// based on the selector provided in the query.
// The format of the selector is the following:
// <keptnmetric|analysis>/<namespace>/<resourceName>/<duration>/<arguments>
func (k *KeptnProvider) RunQuery(query string) (float64, error) {
queryObj, err := parseQuery(query)
if err != nil {
return 0, err
}
switch queryObj.GroupVersionResource.Resource {
case keptnMetricsResourceName:
return k.queryKeptnMetric(queryObj)
case analysisResourceName:
return k.queryKeptnAnalysis(queryObj)
default:
return 0, errors.New("unsupported query")
}
}
func (k *KeptnProvider) IsOnline() (bool, error) {
// TODO should we check for the keptn deployment to be up and running in the cluster?
return true, nil
}
func (k *KeptnProvider) queryKeptnMetric(queryObj *queryObject) (float64, error) {
get, err := k.client.Resource(queryObj.GroupVersionResource).
Namespace(queryObj.Namespace).
Get(
context.Background(),
queryObj.ResourceName,
v1.GetOptions{},
)
if err != nil {
return 0, fmt.Errorf("could not retrieve KeptnMetric %s/%s: %w", queryObj.Namespace, queryObj.ResourceName, err)
}
if status, ok := get.Object["status"]; ok {
if statusObj, ok := status.(map[string]interface{}); ok {
if value, ok := statusObj["value"].(string); ok {
floatValue, err := strconv.ParseFloat(value, 64)
if err != nil {
return 0, fmt.Errorf("could not parse value of KeptnMetric %s/%s to float: %w", queryObj.Namespace, queryObj.ResourceName, err)
}
return floatValue, nil
}
}
}
return 0, fmt.Errorf("could not retrieve KeptnMetric - no value found in resource %s/%s", queryObj.Namespace, queryObj.ResourceName)
}
func (k *KeptnProvider) queryKeptnAnalysis(obj *queryObject) (float64, error) {
analysis := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": fmt.Sprintf("metrics.keptn.sh/%s", apiVersion),
"kind": "Analysis",
"metadata": map[string]interface{}{
"name": fmt.Sprintf("%s-%s", obj.ResourceName, uuid.New().String()[:6]),
"namespace": obj.Namespace,
},
"spec": map[string]interface{}{
"analysisDefinition": map[string]interface{}{
"name": obj.ResourceName,
},
"timeframe": map[string]interface{}{
"recent": obj.DurationString,
},
"args": obj.Arguments,
},
},
}
// set the timeout to 10s - this will give Keptn enough time to reconcile the Analysis
// and store the result in the status of the resource created here.
ctx, cancel := context.WithTimeout(context.Background(), k.analysisTimeout)
defer cancel()
createdAnalysis, err := k.client.
Resource(obj.GroupVersionResource).
Namespace(obj.Namespace).
Create(ctx, analysis, v1.CreateOptions{})
if err != nil {
return 0, fmt.Errorf("could not create Keptn Analysis %s/%s: %w", obj.Namespace, obj.ResourceName, err)
}
// delete the created analysis at the end of the function
defer func() {
err := k.client.
Resource(obj.GroupVersionResource).
Namespace(obj.Namespace).
Delete(
context.TODO(),
createdAnalysis.GetName(),
v1.DeleteOptions{},
)
if err != nil {
klog.Errorf("Could not delete Keptn Analysis '%s': %v", createdAnalysis.GetName(), err)
}
}()
for {
// retrieve the current state of the created Analysis resource every 1s, until
// it has been completed, and the evaluation result is available.
// We do this until the timeout of the context expires. If no result is available
// by then, we return an error.
select {
case <-ctx.Done():
return 0, fmt.Errorf("encountered timeout while waiting for Keptn Analysis %s/%s to be finished", obj.Namespace, obj.ResourceName)
case <-time.After(time.Second):
get, err := k.client.Resource(obj.GroupVersionResource).Namespace(obj.Namespace).Get(ctx, createdAnalysis.GetName(), v1.GetOptions{})
if err != nil {
return 0, fmt.Errorf("could not check status of created Keptn Analysis %s/%s: %w", obj.Namespace, obj.ResourceName, err)
}
statusStr, ok, err := unstructured.NestedString(get.Object, "status", "state")
if err != nil {
return 0, fmt.Errorf("could not check status of created Keptn Analysis %s/%s: %w", obj.Namespace, obj.ResourceName, err)
}
if ok && statusStr == "Completed" {
passed, ok, err := unstructured.NestedBool(get.Object, "status", "pass")
if err != nil {
return 0, fmt.Errorf("could not check status of created Keptn Analysis %s/%s: %w", obj.Namespace, obj.ResourceName, err)
}
if ok {
if passed {
return 1, nil
}
return 0, nil
}
}
}
}
}
func parseQuery(query string) (*queryObject, error) {
result := &queryObject{}
// sanitize the query by converting to lower case, trimming spaces and line break characters
split := strings.Split(
strings.TrimSpace(
strings.TrimSuffix(
strings.ToLower(query),
"\n",
),
),
"/",
)
if len(split) < 3 {
return nil, errors.New("unexpected query format. query must be in the format <keptnmetric|analysis>/<namespace>/<resourceName>/<duration>/<arguments>")
}
switch split[0] {
// take into account both singular and plural naming of resource names, to reduce probability of errors
case "keptnmetric", keptnMetricsResourceName:
result.GroupVersionResource = keptnMetricsResource
case "analysis", analysisResourceName:
result.GroupVersionResource = analysisResource
// add the duration for the Analysis, if available
if len(split) >= 4 {
result.DurationString = split[3]
} else {
//set to '1m' by default
result.DurationString = "1m"
}
// add arguments - these are provided as a comma separated list of key/value pairs
result.Arguments = map[string]interface{}{}
if len(split) >= 5 {
args := strings.Split(split[4], ";")
for i := 0; i < len(args); i++ {
keyValue := strings.Split(args[i], "=")
if len(keyValue) == 2 {
result.Arguments[keyValue[0]] = keyValue[1]
}
}
}
default:
return nil, errors.New("unexpected resource kind provided in the query. must be one of: ['keptnmetric', 'analysis']")
}
result.Namespace = split[1]
result.ResourceName = split[2]
return result, nil
}

View File

@@ -0,0 +1,381 @@
package providers
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/rest"
)
func TestNewKeptnProvider(t *testing.T) {
provider, err := NewKeptnProvider(&rest.Config{})
require.Nil(t, err)
require.NotNil(t, provider)
isOnline, err := provider.IsOnline()
require.NoError(t, err)
require.True(t, isOnline)
}
func TestNewKeptnProvider_NoKubeConfig(t *testing.T) {
provider, err := NewKeptnProvider(nil)
require.Error(t, err)
require.Nil(t, provider)
}
func TestKeptnProvider_RunQuery_KeptnMetric(t *testing.T) {
tests := []struct {
name string
setupClient func() *fake.FakeDynamicClient
query string
want float64
wantErr bool
}{
{
name: "wrong query format",
setupClient: func() *fake.FakeDynamicClient {
fakeClient := fake.NewSimpleDynamicClient(
runtime.NewScheme(),
getSampleKeptnMetric("my-metric", "3.0"),
)
return fakeClient
},
query: "invalid/default",
want: 0,
wantErr: true,
},
{
name: "unsupported resource type",
setupClient: func() *fake.FakeDynamicClient {
fakeClient := fake.NewSimpleDynamicClient(
runtime.NewScheme(),
getSampleKeptnMetric("my-metric", "3.0"),
)
return fakeClient
},
query: "invalid/default/my-metric",
want: 0,
wantErr: true,
},
{
name: "get KeptnMetric value",
setupClient: func() *fake.FakeDynamicClient {
fakeClient := fake.NewSimpleDynamicClient(
runtime.NewScheme(),
getSampleKeptnMetric("my-metric", "3.0"),
)
return fakeClient
},
query: "keptnmetric/default/my-metric",
want: 3.0,
wantErr: false,
},
{
name: "KeptnMetric not found",
setupClient: func() *fake.FakeDynamicClient {
fakeClient := fake.NewSimpleDynamicClient(
runtime.NewScheme(),
)
return fakeClient
},
query: "keptnmetric/default/my-metric",
want: 0,
wantErr: true,
},
{
name: "KeptnMetric with invalid value",
setupClient: func() *fake.FakeDynamicClient {
fakeClient := fake.NewSimpleDynamicClient(
runtime.NewScheme(),
getSampleKeptnMetric("my-metric", "invalid"),
)
return fakeClient
},
query: "keptnmetric/default/my-metric",
want: 0,
wantErr: true,
},
{
name: "KeptnMetric with no value",
setupClient: func() *fake.FakeDynamicClient {
keptnMetric := getSampleKeptnMetric("my-metric", "")
data := keptnMetric.Object
delete(data, "status")
keptnMetric.SetUnstructuredContent(data)
fakeClient := fake.NewSimpleDynamicClient(
runtime.NewScheme(),
keptnMetric,
)
return fakeClient
},
query: "keptnmetric/default/my-metric",
want: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
k := &KeptnProvider{
client: tt.setupClient(),
}
got, err := k.RunQuery(tt.query)
if tt.wantErr {
require.NotNil(t, err)
} else {
require.Nil(t, err)
}
require.Equalf(t, tt.want, got, "RunQuery(%v)", tt.query)
})
}
}
func TestKeptnProvider_RunQueryAnalysis(t *testing.T) {
tests := []struct {
name string
setupClient func() *fake.FakeDynamicClient
// verificationFunc() will run in a separate go routine
// and check if the expected resources are created
verificationFunc func(fakeClient *fake.FakeDynamicClient) error
query string
want float64
wantErr bool
}{
{
name: "get passed Analysis",
setupClient: func() *fake.FakeDynamicClient {
scheme := runtime.NewScheme()
scheme.AddKnownTypes(analysisResource.GroupVersion())
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: analysisResource.Group, Version: analysisResource.Version, Kind: "AnalysisList"}, &unstructured.UnstructuredList{})
fakeClient := fake.NewSimpleDynamicClientWithCustomListKinds(
scheme,
map[schema.GroupVersionResource]string{
analysisResource: "AnalysisList",
},
)
return fakeClient
},
verificationFunc: func(fakeClient *fake.FakeDynamicClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return errors.New("timed out waiting for the condition")
case <-time.After(100 * time.Millisecond):
// verify the creation of the expected resource
list, err := fakeClient.Resource(analysisResource).
Namespace("default").
List(ctx, v1.ListOptions{
Limit: 1,
})
if err != nil || len(list.Items) == 0 {
continue
}
createdAnalysis := list.Items[0]
require.Equal(t, map[string]interface{}{
"analysisDefinition": map[string]interface{}{
"name": "my-analysis",
},
"args": map[string]interface{}{
"foo": "bar",
"bar": "foo",
},
"timeframe": map[string]interface{}{
"recent": "5m",
},
}, createdAnalysis.Object["spec"])
err = unstructured.SetNestedMap(
createdAnalysis.Object,
map[string]interface{}{
"state": "Completed",
"pass": true,
},
"status",
)
require.Nil(t, err)
_, err = fakeClient.Resource(analysisResource).Namespace("default").Update(ctx, &createdAnalysis, v1.UpdateOptions{})
require.Nil(t, err)
return nil
}
}
},
query: "analysis/default/my-analysis/5m/foo=bar;bar=foo",
want: 1,
wantErr: false,
},
{
name: "get failed Analysis",
setupClient: func() *fake.FakeDynamicClient {
scheme := runtime.NewScheme()
scheme.AddKnownTypes(analysisResource.GroupVersion())
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: analysisResource.Group, Version: analysisResource.Version, Kind: "AnalysisList"}, &unstructured.UnstructuredList{})
fakeClient := fake.NewSimpleDynamicClientWithCustomListKinds(
scheme,
map[schema.GroupVersionResource]string{
analysisResource: "AnalysisList",
},
)
return fakeClient
},
verificationFunc: func(fakeClient *fake.FakeDynamicClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return errors.New("timed out waiting for the condition")
case <-time.After(10 * time.Millisecond):
// verify the creation of the expected resource
list, err := fakeClient.Resource(analysisResource).
Namespace("default").
List(ctx, v1.ListOptions{
Limit: 1,
})
if err != nil || len(list.Items) == 0 {
continue
}
createdAnalysis := list.Items[0]
require.Equal(t, map[string]interface{}{
"analysisDefinition": map[string]interface{}{
"name": "my-analysis",
},
"args": map[string]interface{}{},
"timeframe": map[string]interface{}{
"recent": "1m",
},
}, createdAnalysis.Object["spec"])
err = unstructured.SetNestedMap(
createdAnalysis.Object,
map[string]interface{}{
"state": "Completed",
"pass": false,
},
"status",
)
require.Nil(t, err)
_, err = fakeClient.Resource(analysisResource).Namespace("default").Update(ctx, &createdAnalysis, v1.UpdateOptions{})
require.Nil(t, err)
return nil
}
}
},
query: "analysis/default/my-analysis",
want: 0,
wantErr: false,
},
{
name: "analysis does not finish",
setupClient: func() *fake.FakeDynamicClient {
scheme := runtime.NewScheme()
scheme.AddKnownTypes(analysisResource.GroupVersion())
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: analysisResource.Group, Version: analysisResource.Version, Kind: "AnalysisList"}, &unstructured.UnstructuredList{})
fakeClient := fake.NewSimpleDynamicClientWithCustomListKinds(
scheme,
map[schema.GroupVersionResource]string{
analysisResource: "AnalysisList",
},
)
return fakeClient
},
verificationFunc: func(fakeClient *fake.FakeDynamicClient) error {
return nil
},
query: "analysis/default/my-analysis",
want: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fakeClient := tt.setupClient()
k := &KeptnProvider{
client: fakeClient,
analysisTimeout: 1 * time.Second,
}
ctx := context.Background()
grp, ctx := errgroup.WithContext(ctx)
grp.Go(func() error {
return tt.verificationFunc(fakeClient)
})
got, err := k.RunQuery(tt.query)
if tt.wantErr {
require.NotNil(t, err)
} else {
require.Nil(t, err)
}
err = grp.Wait()
require.Nil(t, err)
require.Equalf(t, tt.want, got, "RunQuery(%v)", tt.query)
// verify that all created Analysis resources have been cleaned up
list, err := fakeClient.Resource(analysisResource).
Namespace("default").
List(ctx, v1.ListOptions{
Limit: 1,
})
require.NoError(t, err)
require.Empty(t, list.Items)
})
}
}
func getSampleKeptnMetric(metricName, value string) *unstructured.Unstructured {
keptnMetric := &unstructured.Unstructured{}
keptnMetric.SetUnstructuredContent(map[string]interface{}{
"apiVersion": fmt.Sprintf("metrics.keptn.sh/%s", apiVersion),
"kind": "KeptnMetric",
"metadata": map[string]interface{}{
"name": metricName,
"namespace": "default",
},
"spec": map[string]interface{}{
"fetchIntervalSeconds": "2",
"provider": map[string]interface{}{
"name": "my-provider",
},
"query": "my-query",
},
"status": map[string]interface{}{
"value": value,
},
})
return keptnMetric
}