Merge pull request #3325 from weaveworks/trace-context-detect

Add middleware to detect jaeger trace context over HTTP
This commit is contained in:
Bryan Boreham
2018-08-28 14:32:31 +01:00
committed by GitHub
8 changed files with 1081 additions and 91 deletions

View File

@@ -70,11 +70,15 @@ func router(collector app.Collector, controlRouter app.ControlRouter, pipeRouter
uiHandler))
router.PathPrefix("/").Name("static").Handler(uiHandler)
instrument := middleware.Instrument{
RouteMatcher: router,
Duration: requestDuration,
}
return instrument.Wrap(router)
middlewares := middleware.Merge(
middleware.Instrument{
RouteMatcher: router,
Duration: requestDuration,
},
middleware.Tracer{},
)
return middlewares.Wrap(router)
}
func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHostname string,

File diff suppressed because it is too large Load Diff

View File

@@ -66,7 +66,7 @@ func (s Server) Handle(ctx context.Context, r *httpgrpc.HTTPRequest) (*httpgrpc.
if recorder.Code/100 == 5 {
return nil, httpgrpc.ErrorFromHTTPResponse(resp)
}
return resp, err
return resp, nil
}
// Client is a http.Handler that forwards the request over gRPC.
@@ -143,13 +143,40 @@ func NewClient(address string) (*Client, error) {
}, nil
}
// ServeHTTP implements http.Handler
func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// HTTPRequest wraps an ordinary HTTPRequest with a gRPC one
func HTTPRequest(r *http.Request) (*httpgrpc.HTTPRequest, error) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
return nil, err
}
return &httpgrpc.HTTPRequest{
Method: r.Method,
Url: r.RequestURI,
Body: body,
Headers: fromHeader(r.Header),
}, nil
}
// WriteResponse converts an httpgrpc response to an HTTP one
func WriteResponse(w http.ResponseWriter, resp *httpgrpc.HTTPResponse) error {
toHeader(resp.Headers, w.Header())
w.WriteHeader(int(resp.Code))
_, err := w.Write(resp.Body)
return err
}
// WriteError converts an httpgrpc error to an HTTP one
func WriteError(w http.ResponseWriter, err error) {
resp, ok := httpgrpc.HTTPResponseFromError(err)
if ok {
WriteResponse(w, resp)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
// ServeHTTP implements http.Handler
func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if tracer := opentracing.GlobalTracer(); tracer != nil {
if span := opentracing.SpanFromContext(r.Context()); span != nil {
if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header)); err != nil {
@@ -157,13 +184,12 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
}
req := &httpgrpc.HTTPRequest{
Method: r.Method,
Url: r.RequestURI,
Body: body,
Headers: fromHeader(r.Header),
}
req, err := HTTPRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp, err := c.client.Handle(r.Context(), req)
if err != nil {
// Some errors will actually contain a valid resp, just need to unpack it
@@ -176,9 +202,7 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
toHeader(resp.Headers, w.Header())
w.WriteHeader(int(resp.Code))
if _, err := w.Write(resp.Body); err != nil {
if err := WriteResponse(w, resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

View File

@@ -0,0 +1,27 @@
package middleware
import (
"net/http"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
)
// Tracer is a middleware which traces incoming requests.
type Tracer struct{}
// Wrap implements Interface
func (t Tracer) Wrap(next http.Handler) http.Handler {
traceHandler := nethttp.Middleware(opentracing.GlobalTracer(), next)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var maybeTracer http.Handler
// Don't try and trace websocket requests because nethttp.Middleware
// doesn't support http.Hijack yet
if IsWSHandshakeRequest(r) {
maybeTracer = next
} else {
maybeTracer = traceHandler
}
maybeTracer.ServeHTTP(w, r)
})
}

View File

@@ -3,7 +3,6 @@ package middleware
import (
"bytes"
"net/http"
"net/http/httputil"
"time"
"github.com/weaveworks/common/logging"
@@ -27,7 +26,7 @@ func (l Log) Wrap(next http.Handler) http.Handler {
begin := time.Now()
uri := r.RequestURI // capture the URI before running next, as it may get rewritten
// Log headers before running 'next' in case other interceptors change the data.
headers, err := httputil.DumpRequest(r, false)
headers, err := dumpRequest(r)
if err != nil {
headers = nil
l.logWithRequest(r).Errorf("Could not dump request headers: %v", err)
@@ -39,14 +38,11 @@ func (l Log) Wrap(next http.Handler) http.Handler {
if 100 <= statusCode && statusCode < 500 || statusCode == http.StatusBadGateway || statusCode == http.StatusServiceUnavailable {
l.logWithRequest(r).Debugf("%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin))
if l.LogRequestHeaders && headers != nil {
l.logWithRequest(r).Debugf("Is websocket request: %v\n%s", IsWSHandshakeRequest(r), string(headers))
l.logWithRequest(r).Debugf("ws: %v; %s", IsWSHandshakeRequest(r), string(headers))
}
} else {
l.logWithRequest(r).Warnf("%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin))
if headers != nil {
l.logWithRequest(r).Warnf("Is websocket request: %v\n%s", IsWSHandshakeRequest(r), string(headers))
}
l.logWithRequest(r).Warnf("Response: %s", buf.Bytes())
l.logWithRequest(r).Warnf("%s %s (%d) %s Response: %q ws: %v; %s",
r.Method, uri, statusCode, time.Since(begin), buf.Bytes(), IsWSHandshakeRequest(r), headers)
}
})
}
@@ -56,3 +52,19 @@ func (l Log) Wrap(next http.Handler) http.Handler {
var Logging = Log{
Log: logging.Global(),
}
func dumpRequest(req *http.Request) ([]byte, error) {
var b bytes.Buffer
// Exclude some headers for security, or just that we don't need them when debugging
err := req.Header.WriteSubset(&b, map[string]bool{
"Cookie": true,
"X-Csrf-Token": true,
})
if err != nil {
return nil, err
}
ret := bytes.Replace(b.Bytes(), []byte("\r\n"), []byte("; "), -1)
return ret, nil
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/gorilla/mux"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/mwitkow/go-grpc-middleware"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
@@ -66,13 +65,13 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type Server struct {
cfg Config
handler *signals.Handler
httpListener net.Listener
grpcListener net.Listener
httpServer *http.Server
httpListener net.Listener
HTTP *mux.Router
GRPC *grpc.Server
Log logging.Interface
HTTP *mux.Router
HTTPServer *http.Server
GRPC *grpc.Server
Log logging.Interface
}
// New makes a new Server
@@ -147,9 +146,7 @@ func New(cfg Config) (*Server, error) {
Duration: requestDuration,
RouteMatcher: router,
},
middleware.Func(func(handler http.Handler) http.Handler {
return nethttp.Middleware(opentracing.GlobalTracer(), handler)
}),
middleware.Tracer{},
}
httpMiddleware = append(httpMiddleware, cfg.HTTPMiddleware...)
httpServer := &http.Server{
@@ -163,12 +160,12 @@ func New(cfg Config) (*Server, error) {
cfg: cfg,
httpListener: httpListener,
grpcListener: grpcListener,
httpServer: httpServer,
handler: signals.NewHandler(log),
HTTP: router,
GRPC: grpcServer,
Log: log,
HTTP: router,
HTTPServer: httpServer,
GRPC: grpcServer,
Log: log,
}, nil
}
@@ -180,7 +177,7 @@ func RegisterInstrumentation(router *mux.Router) {
// Run the server; blocks until SIGTERM or an error is received.
func (s *Server) Run() error {
errChan := make(chan error)
errChan := make(chan error, 1)
// Wait for a signal
go func() {
@@ -192,7 +189,7 @@ func (s *Server) Run() error {
}()
go func() {
err := s.httpServer.Serve(s.httpListener)
err := s.HTTPServer.Serve(s.httpListener)
if err == http.ErrServerClosed {
err = nil
}
@@ -232,6 +229,6 @@ func (s *Server) Shutdown() {
ctx, cancel := context.WithTimeout(context.Background(), s.cfg.ServerGracefulShutdownTimeout)
defer cancel() // releases resources if httpServer.Shutdown completes before timeout elapses
s.httpServer.Shutdown(ctx)
s.HTTPServer.Shutdown(ctx)
s.GRPC.GracefulStop()
}

View File

@@ -8,7 +8,9 @@ import (
// Diff diffs two arbitrary data structures, giving human-readable output.
func Diff(want, have interface{}) string {
config := spew.NewDefaultConfig()
config.ContinueOnMethod = true
// Set ContinueOnMethod to true if you cannot see a difference and
// want to look beyond the String() method
config.ContinueOnMethod = false
config.SortKeys = true
config.SpewKeys = true
text, _ := difflib.GetUnifiedDiffString(difflib.UnifiedDiff{

2
vendor/manifest vendored
View File

@@ -1957,7 +1957,7 @@
"importpath": "github.com/weaveworks/common",
"repository": "https://github.com/weaveworks/common",
"vcs": "git",
"revision": "54b7e30527f846e1515fb5a85d0ff5674f05a267",
"revision": "d442d08d89b51712ca61de3f7c14e2e218a739d7",
"branch": "HEAD",
"notests": true
},