From 852b7cd4c096c13d42c90f6b17601ad36eb2522a Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sun, 15 Sep 2019 18:53:56 +0000 Subject: [PATCH 1/4] tracing(app): spans for report rendering via websocket --- app/api_topology.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/app/api_topology.go b/app/api_topology.go index 9a48ada0f..67ecf89ce 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -7,6 +7,7 @@ import ( "context" "github.com/gorilla/mux" + ot "github.com/opentracing/opentracing-go" log "github.com/sirupsen/logrus" "github.com/weaveworks/scope/common/xfer" @@ -133,6 +134,8 @@ func handleWebsocket( defer rep.UnWait(ctx, wait) for { + span := ot.StartSpan("ws.Render", ot.Tag{"topology", topologyID}) + ctx := ot.ContextWithSpan(ctx, span) // We measure how much time has passed since the channel was opened // and add it to the initial report timestamp to get the timestamp // of the snapshot we want to report right now. @@ -144,11 +147,13 @@ func handleWebsocket( re, err := rep.Report(ctx, reportTimestamp) if err != nil { log.Errorf("Error generating report: %v", err) + span.Finish() return } renderer, filter, err := topologyRegistry.RendererForTopology(topologyID, r.Form, re) if err != nil { log.Errorf("Error generating report: %v", err) + span.Finish() return } @@ -167,8 +172,10 @@ func handleWebsocket( if !xfer.IsExpectedWSCloseError(err) { log.Errorf("cannot serialize topology diff: %s", err) } + span.Finish() return } + span.Finish() select { case <-wait: From 04af634065e8438b6f14e4ed2abf0643e050d652 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sun, 15 Sep 2019 19:22:08 +0000 Subject: [PATCH 2/4] tracing(app): set a tag for userid on awsCollector.Report --- app/multitenant/aws_collector.go | 1 + 1 file changed, 1 insertion(+) diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index bdc0f0e6d..88907d9fb 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -370,6 +370,7 @@ func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report. if err != nil { return report.MakeReport(), err } + span.SetTag("userid", userid) end := timestamp start := end.Add(-c.cfg.Window) reportKeys, err := c.getReportKeys(ctx, userid, start, end) From b0915519dfc1af5be22607107e0e5468d932bdaa Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 16 Sep 2019 11:03:02 +0000 Subject: [PATCH 3/4] refactor: move websocket state out to a struct to neaten up the send loop --- app/api_topology.go | 116 +++++++++++++++++++++++++------------------- 1 file changed, 67 insertions(+), 49 deletions(-) diff --git a/app/api_topology.go b/app/api_topology.go index 67ecf89ce..090b5e2af 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -2,12 +2,14 @@ package app import ( "net/http" + "net/url" "time" "context" "github.com/gorilla/mux" ot "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/weaveworks/scope/common/xfer" @@ -120,62 +122,26 @@ func handleWebsocket( } }(conn) - var ( - previousTopo detailed.NodeSummaries - tick = time.Tick(loop) - wait = make(chan struct{}, 1) - topologyID = mux.Vars(r)["topology"] - startReportingAt = deserializeTimestamp(r.Form.Get("timestamp")) - censorCfg = report.GetCensorConfigFromRequest(r) - channelOpenedAt = time.Now() - ) + wc := websocketState{ + rep: rep, + values: r.Form, + conn: conn, + topologyID: mux.Vars(r)["topology"], + startReportingAt: deserializeTimestamp(r.Form.Get("timestamp")), + censorCfg: report.GetCensorConfigFromRequest(r), + channelOpenedAt: time.Now(), + } + wait := make(chan struct{}, 1) rep.WaitOn(ctx, wait) defer rep.UnWait(ctx, wait) + tick := time.Tick(loop) for { - span := ot.StartSpan("ws.Render", ot.Tag{"topology", topologyID}) - ctx := ot.ContextWithSpan(ctx, span) - // We measure how much time has passed since the channel was opened - // and add it to the initial report timestamp to get the timestamp - // of the snapshot we want to report right now. - // NOTE: Multiplying `timestampDelta` by a constant factor here - // would have an effect of fast-forward, which is something we - // might be interested in implementing in the future. - timestampDelta := time.Since(channelOpenedAt) - reportTimestamp := startReportingAt.Add(timestampDelta) - re, err := rep.Report(ctx, reportTimestamp) - if err != nil { - log.Errorf("Error generating report: %v", err) - span.Finish() + if err := wc.update(ctx); err != nil { + log.Errorf("%v", err) return } - renderer, filter, err := topologyRegistry.RendererForTopology(topologyID, r.Form, re) - if err != nil { - log.Errorf("Error generating report: %v", err) - span.Finish() - return - } - - newTopo := detailed.CensorNodeSummaries( - detailed.Summaries( - ctx, - RenderContextForReporter(rep, re), - render.Render(ctx, re, renderer, filter).Nodes, - ), - censorCfg, - ) - diff := detailed.TopoDiff(previousTopo, newTopo) - previousTopo = newTopo - - if err := conn.WriteJSON(diff); err != nil { - if !xfer.IsExpectedWSCloseError(err) { - log.Errorf("cannot serialize topology diff: %s", err) - } - span.Finish() - return - } - span.Finish() select { case <-wait: @@ -185,3 +151,55 @@ func handleWebsocket( } } } + +type websocketState struct { + rep Reporter + values url.Values + conn xfer.Websocket + previousTopo detailed.NodeSummaries + topologyID string + startReportingAt time.Time + reportTimestamp time.Time + censorCfg report.CensorConfig + channelOpenedAt time.Time +} + +func (wc *websocketState) update(ctx context.Context) error { + span := ot.StartSpan("ws.Render", ot.Tag{"topology", wc.topologyID}) + defer span.Finish() + ctx = ot.ContextWithSpan(ctx, span) + // We measure how much time has passed since the channel was opened + // and add it to the initial report timestamp to get the timestamp + // of the snapshot we want to report right now. + // NOTE: Multiplying `timestampDelta` by a constant factor here + // would have an effect of fast-forward, which is something we + // might be interested in implementing in the future. + timestampDelta := time.Since(wc.channelOpenedAt) + reportTimestamp := wc.startReportingAt.Add(timestampDelta) + re, err := wc.rep.Report(ctx, reportTimestamp) + if err != nil { + return errors.Wrapf(err, "Error generating report: %v") + } + renderer, filter, err := topologyRegistry.RendererForTopology(wc.topologyID, wc.values, re) + if err != nil { + return errors.Wrapf(err, "Error generating report: %v") + } + + newTopo := detailed.CensorNodeSummaries( + detailed.Summaries( + ctx, + RenderContextForReporter(wc.rep, re), + render.Render(ctx, re, renderer, filter).Nodes, + ), + wc.censorCfg, + ) + diff := detailed.TopoDiff(wc.previousTopo, newTopo) + wc.previousTopo = newTopo + + if err := wc.conn.WriteJSON(diff); err != nil { + if !xfer.IsExpectedWSCloseError(err) { + return errors.Wrapf(err, "cannot serialize topology diff: %s") + } + } + return nil +} From 4e8000cbba09718fd6aab6c90048b30b9dfb2ea7 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 16 Sep 2019 11:08:42 +0000 Subject: [PATCH 4/4] review feedback: better tracing info --- app/api_topology.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/app/api_topology.go b/app/api_topology.go index 090b5e2af..09adfc18e 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -9,6 +9,7 @@ import ( "github.com/gorilla/mux" ot "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -165,7 +166,7 @@ type websocketState struct { } func (wc *websocketState) update(ctx context.Context) error { - span := ot.StartSpan("ws.Render", ot.Tag{"topology", wc.topologyID}) + span := ot.StartSpan("websocket.Render", ot.Tag{"topology", wc.topologyID}) defer span.Finish() ctx = ot.ContextWithSpan(ctx, span) // We measure how much time has passed since the channel was opened @@ -176,6 +177,8 @@ func (wc *websocketState) update(ctx context.Context) error { // might be interested in implementing in the future. timestampDelta := time.Since(wc.channelOpenedAt) reportTimestamp := wc.startReportingAt.Add(timestampDelta) + span.LogFields(otlog.String("opened-at", wc.channelOpenedAt.String()), + otlog.String("timestamp", reportTimestamp.String())) re, err := wc.rep.Report(ctx, reportTimestamp) if err != nil { return errors.Wrapf(err, "Error generating report: %v")