Add tracing for pipe operations

This commit is contained in:
Bryan Boreham
2020-01-06 17:46:08 +00:00
parent 35451b4826
commit 634e8f1158
4 changed files with 35 additions and 12 deletions

View File

@@ -2,11 +2,13 @@ package multitenant
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
consul "github.com/hashicorp/consul/api"
opentracing "github.com/opentracing/opentracing-go"
log "github.com/sirupsen/logrus"
)
@@ -17,8 +19,8 @@ const (
// ConsulClient is a high-level client for Consul, that exposes operations
// such as CAS and Watch which take callbacks. It also deals with serialisation.
type ConsulClient interface {
Get(key string, out interface{}) error
CAS(key string, out interface{}, f CASCallback) error
Get(ctx context.Context, key string, out interface{}) error
CAS(ctx context.Context, key string, out interface{}, f CASCallback) error
WatchPrefix(prefix string, out interface{}, done chan struct{}, f func(string, interface{}) bool)
}
@@ -58,7 +60,9 @@ type consulClient struct {
}
// Get and deserialise a JSON value from consul.
func (c *consulClient) Get(key string, out interface{}) error {
func (c *consulClient) Get(ctx context.Context, key string, out interface{}) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "Consul Get", opentracing.Tag{"key", key})
defer span.Finish()
kvp, _, err := c.kv.Get(key, queryOptions)
if err != nil {
return err
@@ -71,7 +75,9 @@ func (c *consulClient) Get(key string, out interface{}) error {
// CAS atomically modify a value in a callback.
// If value doesn't exist you'll get nil as a argument to your callback.
func (c *consulClient) CAS(key string, out interface{}, f CASCallback) error {
func (c *consulClient) CAS(ctx context.Context, key string, out interface{}, f CASCallback) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "Consul CAS", opentracing.Tag{"key", key})
defer span.Finish()
var (
index = uint64(0)
retries = 10

View File

@@ -11,8 +11,10 @@ import (
"context"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
opentracing "github.com/opentracing/opentracing-go"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/mtime"
"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/common/xfer"
@@ -259,8 +261,9 @@ func (pr *consulPipeRouter) privateAPI() {
}
})
handler := middleware.Tracer{RouteMatcher: router}.Wrap(router)
log.Infof("Serving private API on endpoint %s.", pr.advertise)
log.Infof("Private API terminated: %v", http.ListenAndServe(pr.advertise, router))
log.Infof("Private API terminated: %v", http.ListenAndServe(pr.advertise, handler))
}
func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error) {
@@ -270,7 +273,7 @@ func (pr *consulPipeRouter) Exists(ctx context.Context, id string) (bool, error)
}
key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
consulPipe := consulPipe{}
err = pr.client.Get(key, &consulPipe)
err = pr.client.Get(ctx, key, &consulPipe)
if err == ErrNotFound {
return false, nil
} else if err != nil {
@@ -286,10 +289,12 @@ func (pr *consulPipeRouter) Get(ctx context.Context, id string, e app.End) (xfer
}
key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
log.Infof("Get %s:%s", key, e)
span, ctx := opentracing.StartSpanFromContext(ctx, "PipeRouter Get", opentracing.Tag{"key", key})
defer span.Finish()
// Try to ensure the given end of the given pipe
// is 'owned' by this pipe service replica in consul.
err = pr.client.CAS(key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
err = pr.client.CAS(ctx, key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
var pipe *consulPipe
if in == nil {
pipe = &consulPipe{
@@ -328,9 +333,11 @@ func (pr *consulPipeRouter) Release(ctx context.Context, id string, e app.End) e
}
key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
log.Infof("Release %s:%s", key, e)
span, ctx := opentracing.StartSpanFromContext(ctx, "PipeRouter Release", opentracing.Tag{"key", key})
defer span.Finish()
// atomically clear my end of the pipe in consul
return pr.client.CAS(key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
return pr.client.CAS(ctx, key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
if in == nil {
return nil, false, fmt.Errorf("pipe %s not found", id)
}
@@ -353,8 +360,10 @@ func (pr *consulPipeRouter) Delete(ctx context.Context, id string) error {
}
key := fmt.Sprintf("%s%s-%s", pr.prefix, userID, id)
log.Infof("Delete %s", key)
span, ctx := opentracing.StartSpanFromContext(ctx, "PipeRouter Delete", opentracing.Tag{"key", key})
defer span.Finish()
return pr.client.CAS(key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
return pr.client.CAS(ctx, key, &consulPipe{}, func(in interface{}) (interface{}, bool, error) {
if in == nil {
return nil, false, fmt.Errorf("Pipe %s not found", id)
}

View File

@@ -5,6 +5,7 @@ import (
"context"
"github.com/gorilla/mux"
opentracing "github.com/opentracing/opentracing-go"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/scope/common/xfer"
@@ -66,8 +67,13 @@ func handlePipeWs(pr PipeRouter, end End) CtxHandlerFunc {
}
defer conn.Close()
if err := pipe.CopyToWebsocket(endIO, conn); err != nil && !xfer.IsExpectedWSCloseError(err) {
log.Errorf("Error copying to pipe %s (%d) websocket: %v", id, end, err)
if err := pipe.CopyToWebsocket(endIO, conn); err != nil {
if span := opentracing.SpanFromContext(ctx); span != nil {
span.LogKV("error", err.Error())
}
if !xfer.IsExpectedWSCloseError(err) {
log.Errorf("Error copying to pipe %s (%d) websocket: %v", id, end, err)
}
}
}
}

View File

@@ -80,7 +80,9 @@ func router(collector app.Collector, controlRouter app.ControlRouter, pipeRouter
RouteMatcher: router,
Duration: requestDuration,
},
middleware.Tracer{},
middleware.Tracer{
RouteMatcher: router,
},
)
return middlewares.Wrap(router)