From 0032c14a78a24b0ee6d0aadb1b494e2d1ace62ea Mon Sep 17 00:00:00 2001 From: stefanprodan Date: Mon, 13 May 2019 17:34:08 +0300 Subject: [PATCH] Refactor metrics - add observer interface with builtin metrics functions - add metrics observer factory - add prometheus client - implement the observer interface for istio, envoy and nginx - remove deprecated istio and app mesh metric aliases (istio_requests_total, istio_request_duration_seconds_bucket, envoy_cluster_upstream_rq, envoy_cluster_upstream_rq_time_bucket) --- cmd/flagger/main.go | 22 ++-- pkg/controller/controller.go | 69 +++++------ pkg/controller/controller_test.go | 34 +++--- pkg/controller/scheduler.go | 148 +++++++----------------- pkg/metrics/client.go | 184 ++++++++++++++++++++++++++++++ pkg/metrics/client_test.go | 85 ++++++++++++++ pkg/metrics/envoy.go | 150 +++++++++--------------- pkg/metrics/envoy_test.go | 79 ++++++++----- pkg/metrics/factory.go | 39 +++++++ pkg/metrics/istio.go | 157 +++++++++---------------- pkg/metrics/istio_test.go | 79 ++++++++----- pkg/metrics/nginx.go | 160 ++++++++++---------------- pkg/metrics/nginx_test.go | 79 ++++++++----- pkg/metrics/observer.go | 182 +---------------------------- pkg/metrics/observer_test.go | 119 ------------------- 15 files changed, 735 insertions(+), 851 deletions(-) create mode 100644 pkg/metrics/client.go create mode 100644 pkg/metrics/client_test.go create mode 100644 pkg/metrics/factory.go delete mode 100644 pkg/metrics/observer_test.go diff --git a/cmd/flagger/main.go b/cmd/flagger/main.go index 0c09b7dc..da6a61d6 100644 --- a/cmd/flagger/main.go +++ b/cmd/flagger/main.go @@ -45,8 +45,8 @@ var ( func init() { flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") - flag.StringVar(&metricsServer, "metrics-server", "http://prometheus:9090", "Prometheus URL") - flag.DurationVar(&controlLoopInterval, "control-loop-interval", 10*time.Second, "Kubernetes API sync interval") + flag.StringVar(&metricsServer, "metrics-server", "http://prometheus:9090", "Prometheus URL.") + flag.DurationVar(&controlLoopInterval, "control-loop-interval", 10*time.Second, "Kubernetes API sync interval.") flag.StringVar(&logLevel, "log-level", "debug", "Log level can be: debug, info, warning, error.") flag.StringVar(&port, "port", "8080", "Port to listen on.") flag.StringVar(&slackURL, "slack-url", "", "Slack hook URL.") @@ -55,9 +55,9 @@ func init() { flag.IntVar(&threadiness, "threadiness", 2, "Worker concurrency.") flag.BoolVar(&zapReplaceGlobals, "zap-replace-globals", false, "Whether to change the logging level of the global zap logger.") flag.StringVar(&zapEncoding, "zap-encoding", "json", "Zap logger encoding.") - flag.StringVar(&namespace, "namespace", "", "Namespace that flagger would watch canary object") - flag.StringVar(&meshProvider, "mesh-provider", "istio", "Service mesh provider, can be istio or appmesh") - flag.StringVar(&selectorLabels, "selector-labels", "app,name,app.kubernetes.io/name", "List of pod labels that Flagger uses to create pod selectors") + flag.StringVar(&namespace, "namespace", "", "Namespace that flagger would watch canary object.") + flag.StringVar(&meshProvider, "mesh-provider", "istio", "Service mesh provider, can be istio, appmesh, supergloo, nginx or smi.") + flag.StringVar(&selectorLabels, "selector-labels", "app,name,app.kubernetes.io/name", "List of pod labels that Flagger uses to create pod selectors.") } func main() { @@ -87,12 +87,12 @@ func main() { meshClient, err := clientset.NewForConfig(cfg) if err != nil { - logger.Fatalf("Error building istio clientset: %v", err) + logger.Fatalf("Error building mesh clientset: %v", err) } flaggerClient, err := clientset.NewForConfig(cfg) if err != nil { - logger.Fatalf("Error building example clientset: %s", err.Error()) + logger.Fatalf("Error building flagger clientset: %s", err.Error()) } flaggerInformerFactory := informers.NewSharedInformerFactoryWithOptions(flaggerClient, time.Second*30, informers.WithNamespace(namespace)) @@ -116,7 +116,12 @@ func main() { logger.Infof("Watching namespace %s", namespace) } - ok, err := metrics.CheckMetricsServer(metricsServer) + observerFactory, err := metrics.NewFactory(metricsServer, meshProvider, 5*time.Second) + if err != nil { + logger.Fatalf("Error building prometheus client: %s", err.Error()) + } + + ok, err := observerFactory.Client.IsOnline() if ok { logger.Infof("Connected to metrics server %s", metricsServer) } else { @@ -148,6 +153,7 @@ func main() { logger, slack, routerFactory, + observerFactory, meshProvider, version.VERSION, labels, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 662f4f01..4f44adee 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -33,23 +33,23 @@ const controllerAgentName = "flagger" // Controller is managing the canary objects and schedules canary deployments type Controller struct { - kubeClient kubernetes.Interface - istioClient clientset.Interface - flaggerClient clientset.Interface - flaggerLister flaggerlisters.CanaryLister - flaggerSynced cache.InformerSynced - flaggerWindow time.Duration - workqueue workqueue.RateLimitingInterface - eventRecorder record.EventRecorder - logger *zap.SugaredLogger - canaries *sync.Map - jobs map[string]CanaryJob - deployer canary.Deployer - observer metrics.Observer - recorder metrics.Recorder - notifier *notifier.Slack - routerFactory *router.Factory - meshProvider string + kubeClient kubernetes.Interface + istioClient clientset.Interface + flaggerClient clientset.Interface + flaggerLister flaggerlisters.CanaryLister + flaggerSynced cache.InformerSynced + flaggerWindow time.Duration + workqueue workqueue.RateLimitingInterface + eventRecorder record.EventRecorder + logger *zap.SugaredLogger + canaries *sync.Map + jobs map[string]CanaryJob + deployer canary.Deployer + recorder metrics.Recorder + notifier *notifier.Slack + routerFactory *router.Factory + observerFactory *metrics.Factory + meshProvider string } func NewController( @@ -62,6 +62,7 @@ func NewController( logger *zap.SugaredLogger, notifier *notifier.Slack, routerFactory *router.Factory, + observerFactory *metrics.Factory, meshProvider string, version string, labels []string, @@ -92,23 +93,23 @@ func NewController( recorder.SetInfo(version, meshProvider) ctrl := &Controller{ - kubeClient: kubeClient, - istioClient: istioClient, - flaggerClient: flaggerClient, - flaggerLister: flaggerInformer.Lister(), - flaggerSynced: flaggerInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), - eventRecorder: eventRecorder, - logger: logger, - canaries: new(sync.Map), - jobs: map[string]CanaryJob{}, - flaggerWindow: flaggerWindow, - deployer: deployer, - observer: metrics.NewObserver(metricServer), - recorder: recorder, - notifier: notifier, - routerFactory: routerFactory, - meshProvider: meshProvider, + kubeClient: kubeClient, + istioClient: istioClient, + flaggerClient: flaggerClient, + flaggerLister: flaggerInformer.Lister(), + flaggerSynced: flaggerInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), + eventRecorder: eventRecorder, + logger: logger, + canaries: new(sync.Map), + jobs: map[string]CanaryJob{}, + flaggerWindow: flaggerWindow, + deployer: deployer, + observerFactory: observerFactory, + recorder: recorder, + notifier: notifier, + routerFactory: routerFactory, + meshProvider: meshProvider, } flaggerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 28ac9fcd..423fb27f 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -37,7 +37,6 @@ type Mocks struct { meshClient clientset.Interface flaggerClient clientset.Interface deployer canary.Deployer - observer metrics.Observer ctrl *Controller logger *zap.SugaredLogger router router.Interface @@ -77,7 +76,6 @@ func SetupMocks(abtest bool) Mocks { FlaggerClient: flaggerClient, }, } - observer := metrics.NewObserver("fake") // init controller flaggerInformerFactory := informers.NewSharedInformerFactory(flaggerClient, noResyncPeriodFunc()) @@ -86,21 +84,24 @@ func SetupMocks(abtest bool) Mocks { // init router rf := router.NewFactory(nil, kubeClient, flaggerClient, logger, flaggerClient) + // init observer + observerFactory, _ := metrics.NewFactory("fake", "istio", 5*time.Second) + ctrl := &Controller{ - kubeClient: kubeClient, - istioClient: flaggerClient, - flaggerClient: flaggerClient, - flaggerLister: flaggerInformer.Lister(), - flaggerSynced: flaggerInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), - eventRecorder: &record.FakeRecorder{}, - logger: logger, - canaries: new(sync.Map), - flaggerWindow: time.Second, - deployer: deployer, - observer: observer, - recorder: metrics.NewRecorder(controllerAgentName, false), - routerFactory: rf, + kubeClient: kubeClient, + istioClient: flaggerClient, + flaggerClient: flaggerClient, + flaggerLister: flaggerInformer.Lister(), + flaggerSynced: flaggerInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), + eventRecorder: &record.FakeRecorder{}, + logger: logger, + canaries: new(sync.Map), + flaggerWindow: time.Second, + deployer: deployer, + observerFactory: observerFactory, + recorder: metrics.NewRecorder(controllerAgentName, false), + routerFactory: rf, } ctrl.flaggerSynced = alwaysReady @@ -108,7 +109,6 @@ func SetupMocks(abtest bool) Mocks { return Mocks{ canary: c, - observer: observer, deployer: deployer, logger: logger, flaggerClient: flaggerClient, diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index 0fc06978..e15b4642 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -556,126 +556,56 @@ func (c *Controller) analyseCanary(r *flaggerv1.Canary) bool { } } + // create observer based on the mesh provider + observer := c.observerFactory.Observer() + // run metrics checks for _, metric := range r.Spec.CanaryAnalysis.Metrics { if metric.Interval == "" { metric.Interval = r.GetMetricInterval() } - // App Mesh checks - if c.meshProvider == "appmesh" { - if metric.Name == "request-success-rate" || metric.Name == "envoy_cluster_upstream_rq" { - val, err := c.observer.GetEnvoySuccessRate(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval) - if err != nil { - if strings.Contains(err.Error(), "no values found") { - c.recordEventWarningf(r, "Halt advancement no values found for metric %s probably %s.%s is not receiving traffic", - metric.Name, r.Spec.TargetRef.Name, r.Namespace) - } else { - c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.GetMetricsServer(), err) - } - return false - } - if float64(metric.Threshold) > val { - c.recordEventWarningf(r, "Halt %s.%s advancement success rate %.2f%% < %v%%", - r.Name, r.Namespace, val, metric.Threshold) - return false - } - } - - if metric.Name == "request-duration" || metric.Name == "envoy_cluster_upstream_rq_time_bucket" { - val, err := c.observer.GetEnvoyRequestDuration(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval) - if err != nil { - c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.GetMetricsServer(), err) - return false - } - t := time.Duration(metric.Threshold) * time.Millisecond - if val > t { - c.recordEventWarningf(r, "Halt %s.%s advancement request duration %v > %v", - r.Name, r.Namespace, val, t) - return false - } - } - } - - // Istio checks - if c.meshProvider == "istio" { - if metric.Name == "request-success-rate" || metric.Name == "istio_requests_total" { - val, err := c.observer.GetIstioSuccessRate(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval) - if err != nil { - if strings.Contains(err.Error(), "no values found") { - c.recordEventWarningf(r, "Halt advancement no values found for metric %s probably %s.%s is not receiving traffic", - metric.Name, r.Spec.TargetRef.Name, r.Namespace) - } else { - c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.GetMetricsServer(), err) - } - return false - } - if float64(metric.Threshold) > val { - c.recordEventWarningf(r, "Halt %s.%s advancement success rate %.2f%% < %v%%", - r.Name, r.Namespace, val, metric.Threshold) - return false - } - } - - if metric.Name == "request-duration" || metric.Name == "istio_request_duration_seconds_bucket" { - val, err := c.observer.GetIstioRequestDuration(r.Spec.TargetRef.Name, r.Namespace, metric.Name, metric.Interval) - if err != nil { - c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.GetMetricsServer(), err) - return false - } - t := time.Duration(metric.Threshold) * time.Millisecond - if val > t { - c.recordEventWarningf(r, "Halt %s.%s advancement request duration %v > %v", - r.Name, r.Namespace, val, t) - return false - } - } - } - - // NGINX checks - if c.meshProvider == "nginx" { - if metric.Name == "request-success-rate" { - val, err := c.observer.GetNginxSuccessRate(r.Spec.IngressRef.Name, r.Namespace, metric.Name, metric.Interval) - if err != nil { - if strings.Contains(err.Error(), "no values found") { - c.recordEventWarningf(r, "Halt advancement no values found for metric %s probably %s.%s is not receiving traffic", - metric.Name, r.Spec.TargetRef.Name, r.Namespace) - } else { - c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.GetMetricsServer(), err) - } - return false - } - if float64(metric.Threshold) > val { - c.recordEventWarningf(r, "Halt %s.%s advancement success rate %.2f%% < %v%%", - r.Name, r.Namespace, val, metric.Threshold) - return false - } - } - - if metric.Name == "request-duration" { - val, err := c.observer.GetNginxRequestDuration(r.Spec.IngressRef.Name, r.Namespace, metric.Name, metric.Interval) - if err != nil { - c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.GetMetricsServer(), err) - return false - } - t := time.Duration(metric.Threshold) * time.Millisecond - if val > t { - c.recordEventWarningf(r, "Halt %s.%s advancement request duration %v > %v", - r.Name, r.Namespace, val, t) - return false - } - } - } - - // custom checks - if metric.Query != "" { - val, err := c.observer.GetScalar(metric.Query) + if metric.Name == "request-success-rate" { + val, err := observer.GetRequestSuccessRate(r.Spec.TargetRef.Name, r.Namespace, metric.Interval) if err != nil { if strings.Contains(err.Error(), "no values found") { c.recordEventWarningf(r, "Halt advancement no values found for metric %s probably %s.%s is not receiving traffic", metric.Name, r.Spec.TargetRef.Name, r.Namespace) } else { - c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observer.GetMetricsServer(), err) + c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observerFactory.Client.GetMetricsServer(), err) + } + return false + } + if float64(metric.Threshold) > val { + c.recordEventWarningf(r, "Halt %s.%s advancement success rate %.2f%% < %v%%", + r.Name, r.Namespace, val, metric.Threshold) + return false + } + } + + if metric.Name == "request-duration" { + val, err := observer.GetRequestDuration(r.Spec.TargetRef.Name, r.Namespace, metric.Interval) + if err != nil { + c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observerFactory.Client.GetMetricsServer(), err) + return false + } + t := time.Duration(metric.Threshold) * time.Millisecond + if val > t { + c.recordEventWarningf(r, "Halt %s.%s advancement request duration %v > %v", + r.Name, r.Namespace, val, t) + return false + } + } + + // custom checks + if metric.Query != "" { + val, err := c.observerFactory.Client.RunQuery(metric.Query) + if err != nil { + if strings.Contains(err.Error(), "no values found") { + c.recordEventWarningf(r, "Halt advancement no values found for metric %s probably %s.%s is not receiving traffic", + metric.Name, r.Spec.TargetRef.Name, r.Namespace) + } else { + c.recordEventErrorf(r, "Metrics server %s query failed: %v", c.observerFactory.Client.GetMetricsServer(), err) } return false } diff --git a/pkg/metrics/client.go b/pkg/metrics/client.go new file mode 100644 index 00000000..60541aff --- /dev/null +++ b/pkg/metrics/client.go @@ -0,0 +1,184 @@ +package metrics + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" + "text/template" + "time" +) + +// PrometheusClient is executing promql queries +type PrometheusClient struct { + timeout time.Duration + url url.URL +} + +type prometheusResponse struct { + Data struct { + Result []struct { + Metric struct { + Name string `json:"name"` + } + Value []interface{} `json:"value"` + } + } +} + +// NewPrometheusClient creates a Prometheus client for the provided URL address +func NewPrometheusClient(address string, timeout time.Duration) (*PrometheusClient, error) { + promURL, err := url.Parse(address) + if err != nil { + return nil, err + } + + return &PrometheusClient{timeout: timeout, url: *promURL}, nil +} + +// RenderQuery renders the promql query using the provided text template +func (p *PrometheusClient) RenderQuery(name string, namespace string, interval string, tmpl string) (string, error) { + meta := struct { + Name string + Namespace string + Interval string + }{ + name, + namespace, + interval, + } + + t, err := template.New("tmpl").Parse(tmpl) + if err != nil { + return "", err + } + var data bytes.Buffer + b := bufio.NewWriter(&data) + + if err := t.Execute(b, meta); err != nil { + return "", err + } + + err = b.Flush() + if err != nil { + return "", err + } + + return data.String(), nil +} + +// RunQuery executes the promql and converts the result to float64 +func (p *PrometheusClient) RunQuery(query string) (float64, error) { + if p.url.Host == "fake" { + return 100, nil + } + + query = url.QueryEscape(p.TrimQuery(query)) + u, err := url.Parse(fmt.Sprintf("./api/v1/query?query=%s", query)) + if err != nil { + return 0, err + } + + u = p.url.ResolveReference(u) + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return 0, err + } + + ctx, cancel := context.WithTimeout(req.Context(), p.timeout) + defer cancel() + + r, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return 0, err + } + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return 0, fmt.Errorf("error reading body: %s", err.Error()) + } + + if 400 <= r.StatusCode { + return 0, fmt.Errorf("error response: %s", string(b)) + } + + var result prometheusResponse + err = json.Unmarshal(b, &result) + if err != nil { + return 0, fmt.Errorf("error unmarshaling result: %s, '%s'", err.Error(), string(b)) + } + + var value *float64 + for _, v := range result.Data.Result { + metricValue := v.Value[1] + switch metricValue.(type) { + case string: + f, err := strconv.ParseFloat(metricValue.(string), 64) + if err != nil { + return 0, err + } + value = &f + } + } + if value == nil { + return 0, fmt.Errorf("no values found") + } + + return *value, nil +} + +// TrimQuery takes a promql query and removes spaces, tabs and new lines +func (p *PrometheusClient) TrimQuery(query string) string { + query = strings.Replace(query, "\n", "", -1) + query = strings.Replace(query, "\t", "", -1) + query = strings.Replace(query, " ", "", -1) + + return query +} + +// IsOnline call Prometheus status endpoint and returns an error if the API is unreachable +func (p *PrometheusClient) IsOnline() (bool, error) { + u, err := url.Parse("./api/v1/status/flags") + if err != nil { + return false, err + } + + u = p.url.ResolveReference(u) + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return false, err + } + + ctx, cancel := context.WithTimeout(req.Context(), p.timeout) + defer cancel() + + r, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return false, err + } + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return false, fmt.Errorf("error reading body: %s", err.Error()) + } + + if 400 <= r.StatusCode { + return false, fmt.Errorf("error response: %s", string(b)) + } + + return true, nil +} + +func (p *PrometheusClient) GetMetricsServer() string { + return p.url.RawQuery +} diff --git a/pkg/metrics/client_test.go b/pkg/metrics/client_test.go new file mode 100644 index 00000000..9fdbe85a --- /dev/null +++ b/pkg/metrics/client_test.go @@ -0,0 +1,85 @@ +package metrics + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestPrometheusClient_RunQuery(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"100"]}]}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + client, err := NewPrometheusClient(ts.URL, time.Second) + if err != nil { + t.Fatal(err) + } + + query := ` + histogram_quantile(0.99, + sum( + rate( + http_request_duration_seconds_bucket{ + kubernetes_namespace="test", + kubernetes_pod_name=~"podinfo-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)" + }[1m] + ) + ) by (le) + )` + + val, err := client.RunQuery(query) + if err != nil { + t.Fatal(err.Error()) + } + + if val != 100 { + t.Errorf("Got %v wanted %v", val, 100) + } +} + +func TestPrometheusClient_IsOnline(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json := `{"status":"success","data":{"config.file":"/etc/prometheus/prometheus.yml"}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + client, err := NewPrometheusClient(ts.URL, time.Second) + if err != nil { + t.Fatal(err) + } + + ok, err := client.IsOnline() + if err != nil { + t.Fatal(err.Error()) + } + + if !ok { + t.Errorf("Got %v wanted %v", ok, true) + } +} + +func TestPrometheusClient_IsOffline(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadGateway) + })) + defer ts.Close() + + client, err := NewPrometheusClient(ts.URL, time.Second) + if err != nil { + t.Fatal(err) + } + + ok, err := client.IsOnline() + if err == nil { + t.Errorf("Got no error wanted %v", http.StatusBadGateway) + } + + if ok { + t.Errorf("Got %v wanted %v", ok, false) + } +} diff --git a/pkg/metrics/envoy.go b/pkg/metrics/envoy.go index 5373f532..ce88b540 100644 --- a/pkg/metrics/envoy.go +++ b/pkg/metrics/envoy.go @@ -1,119 +1,73 @@ package metrics import ( - "fmt" - "net/url" - "strconv" "time" ) -const envoySuccessRateQuery = ` -sum(rate( -envoy_cluster_upstream_rq{kubernetes_namespace="{{ .Namespace }}", -kubernetes_pod_name=~"{{ .Name }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)", -envoy_response_code!~"5.*"} -[{{ .Interval }}])) -/ -sum(rate( -envoy_cluster_upstream_rq{kubernetes_namespace="{{ .Namespace }}", -kubernetes_pod_name=~"{{ .Name }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)"} -[{{ .Interval }}])) -* 100 -` - -func (c *Observer) GetEnvoySuccessRate(name string, namespace string, metric string, interval string) (float64, error) { - if c.metricsServer == "fake" { - return 100, nil - } - - meta := struct { - Name string - Namespace string - Interval string - }{ - name, - namespace, - interval, - } - - query, err := render(meta, envoySuccessRateQuery) - if err != nil { - return 0, err - } - - var rate *float64 - querySt := url.QueryEscape(query) - result, err := c.queryMetric(querySt) - if err != nil { - return 0, err - } - - for _, v := range result.Data.Result { - metricValue := v.Value[1] - switch metricValue.(type) { - case string: - f, err := strconv.ParseFloat(metricValue.(string), 64) - if err != nil { - return 0, err - } - rate = &f - } - } - if rate == nil { - return 0, fmt.Errorf("no values found for metric %s", metric) - } - return *rate, nil +var envoyQueries = map[string]string{ + "request-success-rate": ` + sum( + rate( + envoy_cluster_upstream_rq{ + kubernetes_namespace="{{ .Namespace }}", + kubernetes_pod_name=~"{{ .Name }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)", + envoy_response_code!~"5.*" + }[{{ .Interval }}] + ) + ) + / + sum( + rate( + envoy_cluster_upstream_rq{ + kubernetes_namespace="{{ .Namespace }}", + kubernetes_pod_name=~"{{ .Name }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)" + }[{{ .Interval }}] + ) + ) + * 100`, + "request-duration": ` + histogram_quantile( + 0.99, + sum( + rate( + envoy_cluster_upstream_rq_time_bucket{ + kubernetes_namespace="{{ .Namespace }}", + kubernetes_pod_name=~"{{ .Name }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)" + }[{{ .Interval }}] + ) + ) by (le) + )`, } -const envoyRequestDurationQuery = ` -histogram_quantile(0.99, sum(rate( -envoy_cluster_upstream_rq_time_bucket{kubernetes_namespace="{{ .Namespace }}", -kubernetes_pod_name=~"{{ .Name }}-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)"} -[{{ .Interval }}])) by (le)) -` +type EnvoyObserver struct { + client *PrometheusClient +} -// GetEnvoyRequestDuration returns the 99P requests delay using envoy_cluster_upstream_rq_time_bucket metrics -func (c *Observer) GetEnvoyRequestDuration(name string, namespace string, metric string, interval string) (time.Duration, error) { - if c.metricsServer == "fake" { - return 1, nil - } - - meta := struct { - Name string - Namespace string - Interval string - }{ - name, - namespace, - interval, - } - - query, err := render(meta, envoyRequestDurationQuery) +func (ob *EnvoyObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { + query, err := ob.client.RenderQuery(name, namespace, interval, envoyQueries["request-success-rate"]) if err != nil { return 0, err } - var rate *float64 - querySt := url.QueryEscape(query) - result, err := c.queryMetric(querySt) + value, err := ob.client.RunQuery(query) if err != nil { return 0, err } - for _, v := range result.Data.Result { - metricValue := v.Value[1] - switch metricValue.(type) { - case string: - f, err := strconv.ParseFloat(metricValue.(string), 64) - if err != nil { - return 0, err - } - rate = &f - } + return value, nil +} + +func (ob *EnvoyObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { + query, err := ob.client.RenderQuery(name, namespace, interval, envoyQueries["request-duration"]) + if err != nil { + return 0, err } - if rate == nil { - return 0, fmt.Errorf("no values found for metric %s", metric) + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err } - ms := time.Duration(int64(*rate)) * time.Millisecond + + ms := time.Duration(int64(value)) * time.Millisecond return ms, nil } diff --git a/pkg/metrics/envoy_test.go b/pkg/metrics/envoy_test.go index 0ae9a96c..85f27905 100644 --- a/pkg/metrics/envoy_test.go +++ b/pkg/metrics/envoy_test.go @@ -1,51 +1,74 @@ package metrics import ( + "net/http" + "net/http/httptest" "testing" + "time" ) -func Test_EnvoySuccessRateQueryRender(t *testing.T) { - meta := struct { - Name string - Namespace string - Interval string - }{ - "podinfo", - "default", - "1m", - } +func TestEnvoyObserver_GetRequestSuccessRate(t *testing.T) { + expected := `sum(rate(envoy_cluster_upstream_rq{kubernetes_namespace="default",kubernetes_pod_name=~"podinfo-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)",envoy_response_code!~"5.*"}[1m]))/sum(rate(envoy_cluster_upstream_rq{kubernetes_namespace="default",kubernetes_pod_name=~"podinfo-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)"}[1m]))*100` - query, err := render(meta, envoySuccessRateQuery) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + promql := r.URL.Query()["query"][0] + if promql != expected { + t.Errorf("\nGot %s \nWanted %s", promql, expected) + } + + json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"100"]}]}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + client, err := NewPrometheusClient(ts.URL, time.Second) if err != nil { t.Fatal(err) } - expected := `sum(rate(envoy_cluster_upstream_rq{kubernetes_namespace="default",kubernetes_pod_name=~"podinfo-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)",envoy_response_code!~"5.*"}[1m])) / sum(rate(envoy_cluster_upstream_rq{kubernetes_namespace="default",kubernetes_pod_name=~"podinfo-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)"}[1m])) * 100` + observer := &EnvoyObserver{ + client: client, + } - if query != expected { - t.Errorf("\nGot %s \nWanted %s", query, expected) + val, err := observer.GetRequestSuccessRate("podinfo", "default", "1m") + if err != nil { + t.Fatal(err.Error()) + } + + if val != 100 { + t.Errorf("Got %v wanted %v", val, 100) } } -func Test_EnvoyRequestDurationQueryRender(t *testing.T) { - meta := struct { - Name string - Namespace string - Interval string - }{ - "podinfo", - "default", - "1m", - } +func TestEnvoyObserver_GetRequestDuration(t *testing.T) { + expected := `histogram_quantile(0.99,sum(rate(envoy_cluster_upstream_rq_time_bucket{kubernetes_namespace="default",kubernetes_pod_name=~"podinfo-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)"}[1m]))by(le))` - query, err := render(meta, envoyRequestDurationQuery) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + promql := r.URL.Query()["query"][0] + if promql != expected { + t.Errorf("\nGot %s \nWanted %s", promql, expected) + } + + json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"100"]}]}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + client, err := NewPrometheusClient(ts.URL, time.Second) if err != nil { t.Fatal(err) } - expected := `histogram_quantile(0.99, sum(rate(envoy_cluster_upstream_rq_time_bucket{kubernetes_namespace="default",kubernetes_pod_name=~"podinfo-[0-9a-zA-Z]+(-[0-9a-zA-Z]+)"}[1m])) by (le))` + observer := &EnvoyObserver{ + client: client, + } - if query != expected { - t.Errorf("\nGot %s \nWanted %s", query, expected) + val, err := observer.GetRequestDuration("podinfo", "default", "1m") + if err != nil { + t.Fatal(err.Error()) + } + + if val != 100*time.Millisecond { + t.Errorf("Got %v wanted %v", val, 100*time.Millisecond) } } diff --git a/pkg/metrics/factory.go b/pkg/metrics/factory.go new file mode 100644 index 00000000..c717b3a6 --- /dev/null +++ b/pkg/metrics/factory.go @@ -0,0 +1,39 @@ +package metrics + +import ( + "time" +) + +type Factory struct { + MeshProvider string + Client *PrometheusClient +} + +func NewFactory(metricsServer string, meshProvider string, timeout time.Duration) (*Factory, error) { + client, err := NewPrometheusClient(metricsServer, timeout) + if err != nil { + return nil, err + } + + return &Factory{ + MeshProvider: meshProvider, + Client: client, + }, nil +} + +func (factory Factory) Observer() Interface { + switch { + case factory.MeshProvider == "appmesh": + return &EnvoyObserver{ + client: factory.Client, + } + case factory.MeshProvider == "nginx": + return &NginxObserver{ + client: factory.Client, + } + default: + return &IstioObserver{ + client: factory.Client, + } + } +} diff --git a/pkg/metrics/istio.go b/pkg/metrics/istio.go index 5c8e9855..f6fc9c9a 100644 --- a/pkg/metrics/istio.go +++ b/pkg/metrics/istio.go @@ -1,123 +1,76 @@ package metrics import ( - "fmt" - "net/url" - "strconv" "time" ) -const istioSuccessRateQuery = ` -sum(rate( -istio_requests_total{reporter="destination", -destination_workload_namespace="{{ .Namespace }}", -destination_workload=~"{{ .Name }}", -response_code!~"5.*"} -[{{ .Interval }}])) -/ -sum(rate( -istio_requests_total{reporter="destination", -destination_workload_namespace="{{ .Namespace }}", -destination_workload=~"{{ .Name }}"} -[{{ .Interval }}])) -* 100 -` - -// GetIstioSuccessRate returns the requests success rate (non 5xx) using istio_requests_total metric -func (c *Observer) GetIstioSuccessRate(name string, namespace string, metric string, interval string) (float64, error) { - if c.metricsServer == "fake" { - return 100, nil - } - - meta := struct { - Name string - Namespace string - Interval string - }{ - name, - namespace, - interval, - } - - query, err := render(meta, istioSuccessRateQuery) - if err != nil { - return 0, err - } - - var rate *float64 - querySt := url.QueryEscape(query) - result, err := c.queryMetric(querySt) - if err != nil { - return 0, err - } - - for _, v := range result.Data.Result { - metricValue := v.Value[1] - switch metricValue.(type) { - case string: - f, err := strconv.ParseFloat(metricValue.(string), 64) - if err != nil { - return 0, err - } - rate = &f - } - } - if rate == nil { - return 0, fmt.Errorf("no values found for metric %s", metric) - } - return *rate, nil +var istioQueries = map[string]string{ + "request-success-rate": ` + sum( + rate( + istio_requests_total{ + reporter="destination", + destination_workload_namespace="{{ .Namespace }}", + destination_workload=~"{{ .Name }}", + response_code!~"5.*" + }[{{ .Interval }}] + ) + ) + / + sum( + rate( + istio_requests_total{ + reporter="destination", + destination_workload_namespace="{{ .Namespace }}", + destination_workload=~"{{ .Name }}" + }[{{ .Interval }}] + ) + ) + * 100`, + "request-duration": ` + histogram_quantile( + 0.99, + sum( + rate( + istio_request_duration_seconds_bucket{ + reporter="destination", + destination_workload_namespace="{{ .Namespace }}", + destination_workload=~"{{ .Name }}" + }[{{ .Interval }}] + ) + ) by (le) + )`, } -const istioRequestDurationQuery = ` -histogram_quantile(0.99, sum(rate( -istio_request_duration_seconds_bucket{reporter="destination", -destination_workload_namespace="{{ .Namespace }}", -destination_workload=~"{{ .Name }}"} -[{{ .Interval }}])) by (le)) -` +type IstioObserver struct { + client *PrometheusClient +} -// GetIstioRequestDuration returns the 99P requests delay using istio_request_duration_seconds_bucket metrics -func (c *Observer) GetIstioRequestDuration(name string, namespace string, metric string, interval string) (time.Duration, error) { - if c.metricsServer == "fake" { - return 1, nil - } - - meta := struct { - Name string - Namespace string - Interval string - }{ - name, - namespace, - interval, - } - - query, err := render(meta, istioRequestDurationQuery) +func (ob *IstioObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { + query, err := ob.client.RenderQuery(name, namespace, interval, istioQueries["request-success-rate"]) if err != nil { return 0, err } - var rate *float64 - querySt := url.QueryEscape(query) - result, err := c.queryMetric(querySt) + value, err := ob.client.RunQuery(query) if err != nil { return 0, err } - for _, v := range result.Data.Result { - metricValue := v.Value[1] - switch metricValue.(type) { - case string: - f, err := strconv.ParseFloat(metricValue.(string), 64) - if err != nil { - return 0, err - } - rate = &f - } + return value, nil +} + +func (ob *IstioObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { + query, err := ob.client.RenderQuery(name, namespace, interval, istioQueries["request-duration"]) + if err != nil { + return 0, err } - if rate == nil { - return 0, fmt.Errorf("no values found for metric %s", metric) + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err } - ms := time.Duration(int64(*rate*1000)) * time.Millisecond + + ms := time.Duration(int64(value)) * time.Millisecond return ms, nil } diff --git a/pkg/metrics/istio_test.go b/pkg/metrics/istio_test.go index 28826a77..dfba3c05 100644 --- a/pkg/metrics/istio_test.go +++ b/pkg/metrics/istio_test.go @@ -1,51 +1,74 @@ package metrics import ( + "net/http" + "net/http/httptest" "testing" + "time" ) -func Test_IstioSuccessRateQueryRender(t *testing.T) { - meta := struct { - Name string - Namespace string - Interval string - }{ - "podinfo", - "default", - "1m", - } +func TestIstioObserver_GetRequestSuccessRate(t *testing.T) { + expected := `sum(rate(istio_requests_total{reporter="destination",destination_workload_namespace="default",destination_workload=~"podinfo",response_code!~"5.*"}[1m]))/sum(rate(istio_requests_total{reporter="destination",destination_workload_namespace="default",destination_workload=~"podinfo"}[1m]))*100` - query, err := render(meta, istioSuccessRateQuery) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + promql := r.URL.Query()["query"][0] + if promql != expected { + t.Errorf("\nGot %s \nWanted %s", promql, expected) + } + + json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"100"]}]}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + client, err := NewPrometheusClient(ts.URL, time.Second) if err != nil { t.Fatal(err) } - expected := `sum(rate(istio_requests_total{reporter="destination",destination_workload_namespace="default",destination_workload=~"podinfo",response_code!~"5.*"}[1m])) / sum(rate(istio_requests_total{reporter="destination",destination_workload_namespace="default",destination_workload=~"podinfo"}[1m])) * 100` + observer := &IstioObserver{ + client: client, + } - if query != expected { - t.Errorf("\nGot %s \nWanted %s", query, expected) + val, err := observer.GetRequestSuccessRate("podinfo", "default", "1m") + if err != nil { + t.Fatal(err.Error()) + } + + if val != 100 { + t.Errorf("Got %v wanted %v", val, 100) } } -func Test_IstioRequestDurationQueryRender(t *testing.T) { - meta := struct { - Name string - Namespace string - Interval string - }{ - "podinfo", - "default", - "1m", - } +func TestIstioObserver_GetRequestDuration(t *testing.T) { + expected := `histogram_quantile(0.99,sum(rate(istio_request_duration_seconds_bucket{reporter="destination",destination_workload_namespace="default",destination_workload=~"podinfo"}[1m]))by(le))` - query, err := render(meta, istioRequestDurationQuery) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + promql := r.URL.Query()["query"][0] + if promql != expected { + t.Errorf("\nGot %s \nWanted %s", promql, expected) + } + + json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"100"]}]}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + client, err := NewPrometheusClient(ts.URL, time.Second) if err != nil { t.Fatal(err) } - expected := `histogram_quantile(0.99, sum(rate(istio_request_duration_seconds_bucket{reporter="destination",destination_workload_namespace="default",destination_workload=~"podinfo"}[1m])) by (le))` + observer := &IstioObserver{ + client: client, + } - if query != expected { - t.Errorf("\nGot %s \nWanted %s", query, expected) + val, err := observer.GetRequestDuration("podinfo", "default", "1m") + if err != nil { + t.Fatal(err.Error()) + } + + if val != 100*time.Millisecond { + t.Errorf("Got %v wanted %v", val, 100*time.Millisecond) } } diff --git a/pkg/metrics/nginx.go b/pkg/metrics/nginx.go index bdd1c8f3..593d2306 100644 --- a/pkg/metrics/nginx.go +++ b/pkg/metrics/nginx.go @@ -1,122 +1,80 @@ package metrics import ( - "fmt" - "net/url" - "strconv" "time" ) -const nginxSuccessRateQuery = ` -sum(rate( -nginx_ingress_controller_requests{namespace="{{ .Namespace }}", -ingress="{{ .Name }}", -status!~"5.*"} -[{{ .Interval }}])) -/ -sum(rate( -nginx_ingress_controller_requests{namespace="{{ .Namespace }}", -ingress="{{ .Name }}"} -[{{ .Interval }}])) -* 100 -` - -// GetNginxSuccessRate returns the requests success rate (non 5xx) using nginx_ingress_controller_requests metric -func (c *Observer) GetNginxSuccessRate(name string, namespace string, metric string, interval string) (float64, error) { - if c.metricsServer == "fake" { - return 100, nil - } - - meta := struct { - Name string - Namespace string - Interval string - }{ - name, - namespace, - interval, - } - - query, err := render(meta, nginxSuccessRateQuery) - if err != nil { - return 0, err - } - - var rate *float64 - querySt := url.QueryEscape(query) - result, err := c.queryMetric(querySt) - if err != nil { - return 0, err - } - - for _, v := range result.Data.Result { - metricValue := v.Value[1] - switch metricValue.(type) { - case string: - f, err := strconv.ParseFloat(metricValue.(string), 64) - if err != nil { - return 0, err - } - rate = &f - } - } - if rate == nil { - return 0, fmt.Errorf("no values found for metric %s", metric) - } - return *rate, nil +var nginxQueries = map[string]string{ + "request-success-rate": ` + sum( + rate( + nginx_ingress_controller_requests{ + namespace="{{ .Namespace }}", + ingress="{{ .Name }}", + status!~"5.*" + }[{{ .Interval }}] + ) + ) + / + sum( + rate( + nginx_ingress_controller_requests{ + namespace="{{ .Namespace }}", + ingress="{{ .Name }}" + }[{{ .Interval }}] + ) + ) + * 100`, + "request-duration": ` + sum( + rate( + nginx_ingress_controller_ingress_upstream_latency_seconds_sum{ + namespace="{{ .Namespace }}", + ingress="{{ .Name }}" + }[{{ .Interval }}] + ) + ) + / + sum( + rate( + nginx_ingress_controller_ingress_upstream_latency_seconds_count{ + namespace="{{ .Namespace }}", + ingress="{{ .Name }}" + }[{{ .Interval }}] + ) + ) + * 1000`, } -const nginxRequestDurationQuery = ` -sum(rate( -nginx_ingress_controller_ingress_upstream_latency_seconds_sum{namespace="{{ .Namespace }}", -ingress="{{ .Name }}"}[{{ .Interval }}])) -/ -sum(rate(nginx_ingress_controller_ingress_upstream_latency_seconds_count{namespace="{{ .Namespace }}", -ingress="{{ .Name }}"}[{{ .Interval }}])) * 1000 -` +type NginxObserver struct { + client *PrometheusClient +} -// GetNginxRequestDuration returns the avg requests latency using nginx_ingress_controller_ingress_upstream_latency_seconds_sum metric -func (c *Observer) GetNginxRequestDuration(name string, namespace string, metric string, interval string) (time.Duration, error) { - if c.metricsServer == "fake" { - return 1, nil - } - - meta := struct { - Name string - Namespace string - Interval string - }{ - name, - namespace, - interval, - } - - query, err := render(meta, nginxRequestDurationQuery) +func (ob *NginxObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) { + query, err := ob.client.RenderQuery(name, namespace, interval, nginxQueries["request-success-rate"]) if err != nil { return 0, err } - var rate *float64 - querySt := url.QueryEscape(query) - result, err := c.queryMetric(querySt) + value, err := ob.client.RunQuery(query) if err != nil { return 0, err } - for _, v := range result.Data.Result { - metricValue := v.Value[1] - switch metricValue.(type) { - case string: - f, err := strconv.ParseFloat(metricValue.(string), 64) - if err != nil { - return 0, err - } - rate = &f - } + return value, nil +} + +func (ob *NginxObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) { + query, err := ob.client.RenderQuery(name, namespace, interval, nginxQueries["request-duration"]) + if err != nil { + return 0, err } - if rate == nil { - return 0, fmt.Errorf("no values found for metric %s", metric) + + value, err := ob.client.RunQuery(query) + if err != nil { + return 0, err } - ms := time.Duration(int64(*rate)) * time.Millisecond + + ms := time.Duration(int64(value)) * time.Millisecond return ms, nil } diff --git a/pkg/metrics/nginx_test.go b/pkg/metrics/nginx_test.go index 00e9ef90..30b51e31 100644 --- a/pkg/metrics/nginx_test.go +++ b/pkg/metrics/nginx_test.go @@ -1,51 +1,74 @@ package metrics import ( + "net/http" + "net/http/httptest" "testing" + "time" ) -func Test_NginxSuccessRateQueryRender(t *testing.T) { - meta := struct { - Name string - Namespace string - Interval string - }{ - "podinfo", - "nginx", - "1m", - } +func TestNginxObserver_GetRequestSuccessRate(t *testing.T) { + expected := `sum(rate(nginx_ingress_controller_requests{namespace="nginx",ingress="podinfo",status!~"5.*"}[1m]))/sum(rate(nginx_ingress_controller_requests{namespace="nginx",ingress="podinfo"}[1m]))*100` - query, err := render(meta, nginxSuccessRateQuery) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + promql := r.URL.Query()["query"][0] + if promql != expected { + t.Errorf("\nGot %s \nWanted %s", promql, expected) + } + + json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"100"]}]}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + client, err := NewPrometheusClient(ts.URL, time.Second) if err != nil { t.Fatal(err) } - expected := `sum(rate(nginx_ingress_controller_requests{namespace="nginx",ingress="podinfo",status!~"5.*"}[1m])) / sum(rate(nginx_ingress_controller_requests{namespace="nginx",ingress="podinfo"}[1m])) * 100` + observer := &NginxObserver{ + client: client, + } - if query != expected { - t.Errorf("\nGot %s \nWanted %s", query, expected) + val, err := observer.GetRequestSuccessRate("podinfo", "nginx", "1m") + if err != nil { + t.Fatal(err.Error()) + } + + if val != 100 { + t.Errorf("Got %v wanted %v", val, 100) } } -func Test_NginxRequestDurationQueryRender(t *testing.T) { - meta := struct { - Name string - Namespace string - Interval string - }{ - "podinfo", - "nginx", - "1m", - } +func TestNginxObserver_GetRequestDuration(t *testing.T) { + expected := `sum(rate(nginx_ingress_controller_ingress_upstream_latency_seconds_sum{namespace="nginx",ingress="podinfo"}[1m]))/sum(rate(nginx_ingress_controller_ingress_upstream_latency_seconds_count{namespace="nginx",ingress="podinfo"}[1m]))*1000` - query, err := render(meta, nginxRequestDurationQuery) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + promql := r.URL.Query()["query"][0] + if promql != expected { + t.Errorf("\nGot %s \nWanted %s", promql, expected) + } + + json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"100"]}]}}` + w.Write([]byte(json)) + })) + defer ts.Close() + + client, err := NewPrometheusClient(ts.URL, time.Second) if err != nil { t.Fatal(err) } - expected := `sum(rate(nginx_ingress_controller_ingress_upstream_latency_seconds_sum{namespace="nginx",ingress="podinfo"}[1m])) /sum(rate(nginx_ingress_controller_ingress_upstream_latency_seconds_count{namespace="nginx",ingress="podinfo"}[1m])) * 1000` + observer := &NginxObserver{ + client: client, + } - if query != expected { - t.Errorf("\nGot %s \nWanted %s", query, expected) + val, err := observer.GetRequestDuration("podinfo", "nginx", "1m") + if err != nil { + t.Fatal(err.Error()) + } + + if val != 100*time.Millisecond { + t.Errorf("Got %v wanted %v", val, 100*time.Millisecond) } } diff --git a/pkg/metrics/observer.go b/pkg/metrics/observer.go index ef0a5cfd..906285e7 100644 --- a/pkg/metrics/observer.go +++ b/pkg/metrics/observer.go @@ -1,186 +1,10 @@ package metrics import ( - "bufio" - "bytes" - "context" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "strconv" - "strings" - "text/template" "time" ) -// Observer is used to query Prometheus -type Observer struct { - metricsServer string -} - -type vectorQueryResponse struct { - Data struct { - Result []struct { - Metric struct { - Code string `json:"response_code"` - Name string `json:"destination_workload"` - } - Value []interface{} `json:"value"` - } - } -} - -// NewObserver creates a new observer -func NewObserver(metricsServer string) Observer { - return Observer{ - metricsServer: metricsServer, - } -} - -// GetMetricsServer returns the Prometheus URL -func (c *Observer) GetMetricsServer() string { - return c.metricsServer -} - -func (c *Observer) queryMetric(query string) (*vectorQueryResponse, error) { - promURL, err := url.Parse(c.metricsServer) - if err != nil { - return nil, err - } - - u, err := url.Parse(fmt.Sprintf("./api/v1/query?query=%s", query)) - if err != nil { - return nil, err - } - - u = promURL.ResolveReference(u) - - req, err := http.NewRequest("GET", u.String(), nil) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithTimeout(req.Context(), 5*time.Second) - defer cancel() - - r, err := http.DefaultClient.Do(req.WithContext(ctx)) - if err != nil { - return nil, err - } - defer r.Body.Close() - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - return nil, fmt.Errorf("error reading body: %s", err.Error()) - } - - if 400 <= r.StatusCode { - return nil, fmt.Errorf("error response: %s", string(b)) - } - - var values vectorQueryResponse - err = json.Unmarshal(b, &values) - if err != nil { - return nil, fmt.Errorf("error unmarshaling result: %s, '%s'", err.Error(), string(b)) - } - - return &values, nil -} - -// GetScalar runs the promql query and returns the first value found -func (c *Observer) GetScalar(query string) (float64, error) { - if c.metricsServer == "fake" { - return 100, nil - } - - query = strings.Replace(query, "\n", "", -1) - query = strings.Replace(query, " ", "", -1) - - var value *float64 - - querySt := url.QueryEscape(query) - result, err := c.queryMetric(querySt) - if err != nil { - return 0, err - } - - for _, v := range result.Data.Result { - metricValue := v.Value[1] - switch metricValue.(type) { - case string: - f, err := strconv.ParseFloat(metricValue.(string), 64) - if err != nil { - return 0, err - } - value = &f - } - } - if value == nil { - return 0, fmt.Errorf("no values found for query %s", query) - } - return *value, nil -} - -// CheckMetricsServer call Prometheus status endpoint and returns an error if -// the API is unreachable -func CheckMetricsServer(address string) (bool, error) { - promURL, err := url.Parse(address) - if err != nil { - return false, err - } - - u, err := url.Parse("./api/v1/status/flags") - if err != nil { - return false, err - } - - u = promURL.ResolveReference(u) - - req, err := http.NewRequest("GET", u.String(), nil) - if err != nil { - return false, err - } - - ctx, cancel := context.WithTimeout(req.Context(), 5*time.Second) - defer cancel() - - r, err := http.DefaultClient.Do(req.WithContext(ctx)) - if err != nil { - return false, err - } - defer r.Body.Close() - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - return false, fmt.Errorf("error reading body: %s", err.Error()) - } - - if 400 <= r.StatusCode { - return false, fmt.Errorf("error response: %s", string(b)) - } - - return true, nil -} - -func render(meta interface{}, tmpl string) (string, error) { - t, err := template.New("tmpl").Parse(tmpl) - if err != nil { - return "", err - } - var data bytes.Buffer - b := bufio.NewWriter(&data) - - if err := t.Execute(b, meta); err != nil { - return "", err - } - err = b.Flush() - if err != nil { - return "", err - } - - res := strings.ReplaceAll(data.String(), "\n", "") - - return res, nil +type Interface interface { + GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) + GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) } diff --git a/pkg/metrics/observer_test.go b/pkg/metrics/observer_test.go deleted file mode 100644 index fcd12eb4..00000000 --- a/pkg/metrics/observer_test.go +++ /dev/null @@ -1,119 +0,0 @@ -package metrics - -import ( - "net/http" - "net/http/httptest" - "testing" - "time" -) - -func TestCanaryObserver_GetEnvoySuccessRate(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"100"]}]}}` - w.Write([]byte(json)) - })) - defer ts.Close() - - observer := NewObserver(ts.URL) - - val, err := observer.GetEnvoySuccessRate("podinfo", "default", "envoy_cluster_upstream_rq", "1m") - if err != nil { - t.Fatal(err.Error()) - } - - if val != 100 { - t.Errorf("Got %v wanted %v", val, 100) - } - -} - -func TestCanaryObserver_GetEnvoyRequestDuration(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.596,"200"]}]}}` - w.Write([]byte(json)) - })) - defer ts.Close() - - observer := NewObserver(ts.URL) - - val, err := observer.GetEnvoyRequestDuration("podinfo", "default", "envoy_cluster_upstream_rq_time_bucket", "1m") - if err != nil { - t.Fatal(err.Error()) - } - - if val != 200*time.Millisecond { - t.Errorf("Got %v wanted %v", val, 200*time.Millisecond) - } -} - -func TestCanaryObserver_GetIstioSuccessRate(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"100"]}]}}` - w.Write([]byte(json)) - })) - defer ts.Close() - - observer := NewObserver(ts.URL) - - val, err := observer.GetIstioSuccessRate("podinfo", "default", "istio_requests_total", "1m") - if err != nil { - t.Fatal(err.Error()) - } - - if val != 100 { - t.Errorf("Got %v wanted %v", val, 100) - } - -} - -func TestCanaryObserver_GetIstioRequestDuration(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.596,"0.2"]}]}}` - w.Write([]byte(json)) - })) - defer ts.Close() - - observer := NewObserver(ts.URL) - - val, err := observer.GetIstioRequestDuration("podinfo", "default", "istio_request_duration_seconds_bucket", "1m") - if err != nil { - t.Fatal(err.Error()) - } - - if val != 200*time.Millisecond { - t.Errorf("Got %v wanted %v", val, 200*time.Millisecond) - } -} - -func TestCheckMetricsServer(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - json := `{"status":"success","data":{"config.file":"/etc/prometheus/prometheus.yml"}}` - w.Write([]byte(json)) - })) - defer ts.Close() - - ok, err := CheckMetricsServer(ts.URL) - if err != nil { - t.Fatal(err.Error()) - } - - if !ok { - t.Errorf("Got %v wanted %v", ok, true) - } -} - -func TestCheckMetricsServer_Offline(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusBadGateway) - })) - defer ts.Close() - - ok, err := CheckMetricsServer(ts.URL) - if err == nil { - t.Errorf("Got no error wanted %v", http.StatusBadGateway) - } - - if ok { - t.Errorf("Got %v wanted %v", ok, false) - } -}