pkg/metrics/providers: wrap ErrNoValuesFound and modify controller accordingly

This commit is contained in:
mathetake
2020-03-08 00:17:52 +09:00
parent 7fb675e8aa
commit 2ec24bb17d
11 changed files with 175 additions and 106 deletions

View File

@@ -192,7 +192,7 @@ func (c *Controller) processNextWorkItem() bool {
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.syncHandler(key); err != nil {
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
return fmt.Errorf("error syncing '%s': %w", key, err)
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
@@ -230,7 +230,7 @@ func (c *Controller) syncHandler(key string) error {
_, err := c.flaggerClient.FlaggerV1beta1().Canaries(cd.Namespace).UpdateStatus(cdCopy)
if err != nil {
c.logger.Errorf("%s status condition update error: %v", key, err)
return fmt.Errorf("%s status condition update error: %v", key, err)
return fmt.Errorf("%s status condition update error: %w", key, err)
}
}
}

View File

@@ -31,14 +31,12 @@ func (c *Controller) recordEventWarningf(r *flaggerv1.Canary, template string, a
func (c *Controller) sendEventToWebhook(r *flaggerv1.Canary, eventType, template string, args []interface{}) {
webhookOverride := false
if len(r.GetAnalysis().Webhooks) > 0 {
for _, canaryWebhook := range r.GetAnalysis().Webhooks {
if canaryWebhook.Type == flaggerv1.EventHook {
webhookOverride = true
err := CallEventWebhook(r, canaryWebhook.URL, fmt.Sprintf(template, args...), eventType)
if err != nil {
c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Errorf("error sending event to webhook: %s", err)
}
for _, canaryWebhook := range r.GetAnalysis().Webhooks {
if canaryWebhook.Type == flaggerv1.EventHook {
webhookOverride = true
err := CallEventWebhook(r, canaryWebhook.URL, fmt.Sprintf(template, args...), eventType)
if err != nil {
c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Errorf("error sending event to webhook: %s", err)
}
}
}

View File

@@ -1,6 +1,7 @@
package controller
import (
"errors"
"fmt"
"strings"
"time"
@@ -333,7 +334,7 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh
// strategy: A/B testing
if len(cd.GetAnalysis().Match) > 0 && cd.GetAnalysis().Iterations > 0 {
c.runAB(cd, canaryController, meshRouter, provider)
c.runAB(cd, canaryController, meshRouter)
return
}
@@ -345,12 +346,13 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh
// strategy: Canary progressive traffic increase
if cd.GetAnalysis().StepWeight > 0 {
c.runCanary(cd, canaryController, meshRouter, provider, mirrored, canaryWeight, primaryWeight, maxWeight)
c.runCanary(cd, canaryController, meshRouter, mirrored, canaryWeight, primaryWeight, maxWeight)
}
}
func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string, mirrored bool, canaryWeight int, primaryWeight int, maxWeight int) {
func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller,
meshRouter router.Interface, mirrored bool, canaryWeight int, primaryWeight int, maxWeight int) {
primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name)
// increase traffic weight
@@ -420,7 +422,8 @@ func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary
}
}
func (c *Controller) runAB(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string) {
func (c *Controller) runAB(canary *flaggerv1.Canary, canaryController canary.Controller,
meshRouter router.Interface) {
primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name)
// route traffic to canary and increment iterations
@@ -462,7 +465,8 @@ func (c *Controller) runAB(canary *flaggerv1.Canary, canaryController canary.Con
}
}
func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string, mirrored bool) {
func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController canary.Controller,
meshRouter router.Interface, provider string, mirrored bool) {
primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name)
// increment iterations
@@ -820,9 +824,10 @@ func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
if metric.Name == "request-success-rate" {
val, err := observer.GetRequestSuccessRate(toMetricModel(canary, metric.Interval))
if err != nil {
if strings.Contains(err.Error(), "no values found") {
c.recordEventWarningf(canary, "Halt advancement no values found for %s metric %s probably %s.%s is not receiving traffic",
metricsProvider, metric.Name, canary.Spec.TargetRef.Name, canary.Namespace)
if errors.Is(err, observers.ErrNoValuesFound) {
c.recordEventWarningf(canary,
"Halt advancement no values found for %s metric %s probably %s.%s is not receiving traffic: %v",
metricsProvider, metric.Name, canary.Spec.TargetRef.Name, canary.Namespace, err)
} else {
c.recordEventErrorf(canary, "Prometheus query failed: %v", err)
}
@@ -851,7 +856,7 @@ func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
if metric.Name == "request-duration" {
val, err := observer.GetRequestDuration(toMetricModel(canary, metric.Interval))
if err != nil {
if strings.Contains(err.Error(), "no values found") {
if errors.Is(err, observers.ErrNoValuesFound) {
c.recordEventWarningf(canary, "Halt advancement no values found for %s metric %s probably %s.%s is not receiving traffic",
metricsProvider, metric.Name, canary.Spec.TargetRef.Name, canary.Namespace)
} else {
@@ -882,7 +887,7 @@ func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
if metric.Query != "" {
val, err := observerFactory.Client.RunQuery(metric.Query)
if err != nil {
if strings.Contains(err.Error(), "no values found") {
if errors.Is(err, observers.ErrNoValuesFound) {
c.recordEventWarningf(canary, "Halt advancement no values found for metric: %s",
metric.Name)
} else {
@@ -955,9 +960,9 @@ func (c *Controller) runMetricChecks(canary *flaggerv1.Canary) bool {
val, err := provider.RunQuery(query)
if err != nil {
if strings.Contains(err.Error(), "no values found") {
c.recordEventWarningf(canary, "Halt advancement no values found for custom metric: %s",
metric.Name)
if errors.Is(err, providers.ErrNoValuesFound) {
c.recordEventWarningf(canary, "Halt advancement no values found for custom metric: %s: %v",
metric.Name, err)
} else {
c.recordEventErrorf(canary, "Metric query failed for %s: %v", metric.Name, err)
}

View File

@@ -0,0 +1,7 @@
package observers
import "errors"
var (
ErrNoValuesFound = errors.New("no values found")
)

View File

@@ -77,12 +77,12 @@ func (p *CloudWatchProvider) RunQuery(query string) (float64, error) {
mr := res.MetricDataResults
if len(mr) < 1 {
return 0, fmt.Errorf("no values found in response: %s", res.String())
return 0, fmt.Errorf("invalid response: %s: %w", res.String(), ErrNoValuesFound)
}
vs := res.MetricDataResults[0].Values
vs := mr[0].Values
if len(vs) < 1 {
return 0, fmt.Errorf("no values found in response: %s", res.String())
return 0, fmt.Errorf("invalid reponse %s: %w", res.String(), ErrNoValuesFound)
}
return aws.Float64Value(vs[0]), nil

View File

@@ -1,8 +1,8 @@
package providers
import (
"errors"
"net/http"
"strings"
"testing"
"time"
@@ -146,13 +146,13 @@ func TestCloudWatchProvider_RunQuery(t *testing.T) {
_, err := p.RunQuery(query)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "no values"))
require.True(t, errors.Is(err, ErrNoValuesFound))
p = CloudWatchProvider{client: cloudWatchClientMock{
o: &cloudwatch.GetMetricDataOutput{}}}
_, err = p.RunQuery(query)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "no values"))
require.True(t, errors.Is(err, ErrNoValuesFound))
})
}

View File

@@ -76,7 +76,7 @@ func NewDatadogProvider(metricInterval string,
md, err := time.ParseDuration(metricInterval)
if err != nil {
return nil, fmt.Errorf("error parsing metric interval: %s", err.Error())
return nil, fmt.Errorf("error parsing metric interval: %w", err)
}
dd.fromDelta = int64(datadogFromDeltaMultiplierOnMetricInterval * md.Seconds())
@@ -89,7 +89,7 @@ func (p *DatadogProvider) RunQuery(query string) (float64, error) {
req, err := http.NewRequest("GET", p.metricsQueryEndpoint, nil)
if err != nil {
return 0, fmt.Errorf("error http.NewRequest: %s", err.Error())
return 0, fmt.Errorf("error http.NewRequest: %w", err)
}
req.Header.Set(datadogAPIKeyHeaderKey, p.apiKey)
@@ -111,26 +111,30 @@ func (p *DatadogProvider) RunQuery(query string) (float64, error) {
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return 0, fmt.Errorf("error reading body: %s", err.Error())
return 0, fmt.Errorf("error reading body: %w", err)
}
if r.StatusCode != http.StatusOK {
return 0, fmt.Errorf("error response: %s", string(b))
return 0, fmt.Errorf("error response: %s: %w", string(b), err)
}
var res datadogResponse
if err := json.Unmarshal(b, &res); err != nil {
return 0, fmt.Errorf("error unmarshaling result: %s, '%s'", err.Error(), string(b))
return 0, fmt.Errorf("error unmarshaling result: %w, '%s'", err, string(b))
}
if len(res.Series) < 1 {
return 0, fmt.Errorf("no values found in response: %s", string(b))
return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound)
}
s := res.Series[0]
vs := s.Pointlist[len(s.Pointlist)-1]
pl := res.Series[0].Pointlist
if len(pl) < 1 {
return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound)
}
vs := pl[len(pl)-1]
if len(vs) < 1 {
return 0, fmt.Errorf("no values found in response: %s", string(b))
return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound)
}
return vs[1], nil
@@ -141,7 +145,7 @@ func (p *DatadogProvider) RunQuery(query string) (float64, error) {
func (p *DatadogProvider) IsOnline() (bool, error) {
req, err := http.NewRequest("GET", p.apiKeyValidationEndpoint, nil)
if err != nil {
return false, fmt.Errorf("error http.NewRequest: %s", err.Error())
return false, fmt.Errorf("error http.NewRequest: %w", err)
}
req.Header.Add(datadogAPIKeyHeaderKey, p.apiKey)
@@ -157,7 +161,7 @@ func (p *DatadogProvider) IsOnline() (bool, error) {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return false, fmt.Errorf("error reading body: %s", err.Error())
return false, fmt.Errorf("error reading body: %w", err)
}
if r.StatusCode != http.StatusOK {

View File

@@ -1,6 +1,7 @@
package providers
import (
"errors"
"fmt"
"net/http"
"net/http/httptest"
@@ -36,45 +37,65 @@ func TestNewDatadogProvider(t *testing.T) {
}
func TestDatadogProvider_RunQuery(t *testing.T) {
eq := `avg:system.cpu.user\{*}by{host}`
appKey := "app-key"
apiKey := "api-key"
expected := 1.11111
t.Run("ok", func(t *testing.T) {
expected := 1.11111
eq := `avg:system.cpu.user{*}by{host}`
now := time.Now().Unix()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aq := r.URL.Query().Get("query")
assert.Equal(t, eq, aq)
assert.Equal(t, appKey, r.Header.Get(datadogApplicationKeyHeaderKey))
assert.Equal(t, apiKey, r.Header.Get(datadogAPIKeyHeaderKey))
now := time.Now().Unix()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aq := r.URL.Query().Get("query")
assert.Equal(t, eq, aq)
assert.Equal(t, appKey, r.Header.Get(datadogApplicationKeyHeaderKey))
assert.Equal(t, apiKey, r.Header.Get(datadogAPIKeyHeaderKey))
from, err := strconv.ParseInt(r.URL.Query().Get("from"), 10, 64)
if assert.NoError(t, err) {
assert.Less(t, from, now)
}
from, err := strconv.ParseInt(r.URL.Query().Get("from"), 10, 64)
if assert.NoError(t, err) {
assert.Less(t, from, now)
}
to, err := strconv.ParseInt(r.URL.Query().Get("to"), 10, 64)
if assert.NoError(t, err) {
assert.GreaterOrEqual(t, to, now)
}
to, err := strconv.ParseInt(r.URL.Query().Get("to"), 10, 64)
if assert.NoError(t, err) {
assert.GreaterOrEqual(t, to, now)
}
json := fmt.Sprintf(`{"series": [{"pointlist": [[1577232000000,29325.102158814265],[1577318400000,56294.46758591842],[1577404800000,%f]]}]}`, expected)
w.Write([]byte(json))
}))
defer ts.Close()
json := fmt.Sprintf(`{"series": [{"pointlist": [[1577232000000,29325.102158814265],[1577318400000,56294.46758591842],[1577404800000,%f]]}]}`, expected)
w.Write([]byte(json))
}))
defer ts.Close()
dp, err := NewDatadogProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
map[string][]byte{
datadogApplicationKeySecretKey: []byte(appKey),
datadogAPIKeySecretKey: []byte(apiKey),
},
)
require.NoError(t, err)
dp, err := NewDatadogProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
map[string][]byte{
datadogApplicationKeySecretKey: []byte(appKey),
datadogAPIKeySecretKey: []byte(apiKey),
},
)
require.NoError(t, err)
f, err := dp.RunQuery(eq)
require.NoError(t, err)
assert.Equal(t, expected, f)
})
f, err := dp.RunQuery(eq)
require.NoError(t, err)
assert.Equal(t, expected, f)
t.Run("no values", func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json := fmt.Sprintf(`{"series": [{"pointlist": []}]}`)
w.Write([]byte(json))
}))
defer ts.Close()
dp, err := NewDatadogProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
map[string][]byte{
datadogApplicationKeySecretKey: []byte(appKey),
datadogAPIKeySecretKey: []byte(apiKey),
},
)
require.NoError(t, err)
_, err = dp.RunQuery("")
require.True(t, errors.Is(err, ErrNoValuesFound))
})
}
func TestDatadogProvider_IsOnline(t *testing.T) {

View File

@@ -0,0 +1,7 @@
package providers
import "errors"
var (
ErrNoValuesFound = errors.New("no values found")
)

View File

@@ -74,7 +74,7 @@ func (p *PrometheusProvider) RunQuery(query string) (float64, error) {
query = url.QueryEscape(p.trimQuery(query))
u, err := url.Parse(fmt.Sprintf("./api/v1/query?query=%s", query))
if err != nil {
return 0, err
return 0, fmt.Errorf("url.Parase failed: %w", err)
}
u.Path = path.Join(p.url.Path, u.Path)
@@ -82,7 +82,7 @@ func (p *PrometheusProvider) RunQuery(query string) (float64, error) {
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return 0, err
return 0, fmt.Errorf("http.NewRequest failed: %w", err)
}
if p.username != "" && p.password != "" {
@@ -94,13 +94,13 @@ func (p *PrometheusProvider) RunQuery(query string) (float64, error) {
r, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return 0, err
return 0, fmt.Errorf("request failed: %w", err)
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return 0, fmt.Errorf("error reading body: %s", err.Error())
return 0, fmt.Errorf("error reading body: %w", err)
}
if 400 <= r.StatusCode {
@@ -110,7 +110,7 @@ func (p *PrometheusProvider) RunQuery(query string) (float64, error) {
var result prometheusResponse
err = json.Unmarshal(b, &result)
if err != nil {
return 0, fmt.Errorf("error unmarshaling result: %s, '%s'", err.Error(), string(b))
return 0, fmt.Errorf("error unmarshaling result: %w, '%s'", err, string(b))
}
var value *float64
@@ -126,7 +126,7 @@ func (p *PrometheusProvider) RunQuery(query string) (float64, error) {
}
}
if value == nil {
return 0, fmt.Errorf("no values found")
return 0, fmt.Errorf("%w", ErrNoValuesFound)
}
return *value, nil
@@ -136,7 +136,7 @@ func (p *PrometheusProvider) RunQuery(query string) (float64, error) {
func (p *PrometheusProvider) IsOnline() (bool, error) {
u, err := url.Parse("./api/v1/status/flags")
if err != nil {
return false, err
return false, fmt.Errorf("url.Parse failed: %w", err)
}
u.Path = path.Join(p.url.Path, u.Path)
@@ -144,7 +144,7 @@ func (p *PrometheusProvider) IsOnline() (bool, error) {
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return false, err
return false, fmt.Errorf("http.NewRequest failed: %w", err)
}
if p.username != "" && p.password != "" {
@@ -156,13 +156,13 @@ func (p *PrometheusProvider) IsOnline() (bool, error) {
r, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return false, err
return false, fmt.Errorf("request failed: %w", err)
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return false, fmt.Errorf("error reading body: %s", err.Error())
return false, fmt.Errorf("error reading body: %w", err)
}
if 400 <= r.StatusCode {

View File

@@ -1,6 +1,7 @@
package providers
import (
"errors"
"net/http"
"net/http/httptest"
"strings"
@@ -82,40 +83,66 @@ func TestNewPrometheusProvider(t *testing.T) {
}
func TestPrometheusProvider_RunQueryWithBasicAuth(t *testing.T) {
expected := `sum(envoy_cluster_upstream_rq)`
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
promql := r.URL.Query()["query"][0]
assert.Equal(t, expected, promql)
t.Run("ok", func(t *testing.T) {
expected := `sum(envoy_cluster_upstream_rq)`
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
promql := r.URL.Query()["query"][0]
assert.Equal(t, expected, promql)
if assert.Contains(t, r.Header, "Authorization") {
if assert.Contains(t, r.Header, "Authorization") {
}
header, ok := r.Header["Authorization"]
if assert.True(t, ok, "Authorization header not found") {
assert.True(t, strings.Contains(header[0], "Basic"), "Basic authorization header not found")
}
}
header, ok := r.Header["Authorization"]
if assert.True(t, ok, "Authorization header not found") {
assert.True(t, strings.Contains(header[0], "Basic"), "Basic authorization header not found")
}
json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"100"]}]}}`
w.Write([]byte(json))
}))
defer ts.Close()
json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"100"]}]}}`
w.Write([]byte(json))
}))
defer ts.Close()
clients := prometheusFake()
clients := prometheusFake()
template, err := clients.flaggerClient.FlaggerV1beta1().MetricTemplates("default").Get("prometheus", metav1.GetOptions{})
require.NoError(t, err)
template.Spec.Provider.Address = ts.URL
template, err := clients.flaggerClient.FlaggerV1beta1().MetricTemplates("default").Get("prometheus", metav1.GetOptions{})
require.NoError(t, err)
template.Spec.Provider.Address = ts.URL
secret, err := clients.kubeClient.CoreV1().Secrets("default").Get("prometheus", metav1.GetOptions{})
require.NoError(t, err)
secret, err := clients.kubeClient.CoreV1().Secrets("default").Get("prometheus", metav1.GetOptions{})
require.NoError(t, err)
prom, err := NewPrometheusProvider(template.Spec.Provider, secret.Data)
require.NoError(t, err)
prom, err := NewPrometheusProvider(template.Spec.Provider, secret.Data)
require.NoError(t, err)
val, err := prom.RunQuery(template.Spec.Query)
require.NoError(t, err)
val, err := prom.RunQuery(template.Spec.Query)
require.NoError(t, err)
assert.Equal(t, float64(100), val)
assert.Equal(t, float64(100), val)
})
t.Run("no values", func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json := `{"status":"success","data":{"resultType":"vector","result":[]}}`
w.Write([]byte(json))
}))
defer ts.Close()
clients := prometheusFake()
template, err := clients.flaggerClient.FlaggerV1beta1().
MetricTemplates("default").Get("prometheus", metav1.GetOptions{})
require.NoError(t, err)
template.Spec.Provider.Address = ts.URL
secret, err := clients.kubeClient.CoreV1().Secrets("default").Get("prometheus", metav1.GetOptions{})
require.NoError(t, err)
prom, err := NewPrometheusProvider(template.Spec.Provider, secret.Data)
require.NoError(t, err)
_, err = prom.RunQuery(template.Spec.Query)
require.True(t, errors.Is(err, ErrNoValuesFound))
})
}
func TestPrometheusProvider_IsOnline(t *testing.T) {