diff --git a/app/api_topology.go b/app/api_topology.go index 9a48ada0f..09adfc18e 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -2,11 +2,15 @@ package app import ( "net/http" + "net/url" "time" "context" "github.com/gorilla/mux" + ot "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/weaveworks/scope/common/xfer" @@ -119,54 +123,24 @@ func handleWebsocket( } }(conn) - var ( - previousTopo detailed.NodeSummaries - tick = time.Tick(loop) - wait = make(chan struct{}, 1) - topologyID = mux.Vars(r)["topology"] - startReportingAt = deserializeTimestamp(r.Form.Get("timestamp")) - censorCfg = report.GetCensorConfigFromRequest(r) - channelOpenedAt = time.Now() - ) + wc := websocketState{ + rep: rep, + values: r.Form, + conn: conn, + topologyID: mux.Vars(r)["topology"], + startReportingAt: deserializeTimestamp(r.Form.Get("timestamp")), + censorCfg: report.GetCensorConfigFromRequest(r), + channelOpenedAt: time.Now(), + } + wait := make(chan struct{}, 1) rep.WaitOn(ctx, wait) defer rep.UnWait(ctx, wait) + tick := time.Tick(loop) for { - // We measure how much time has passed since the channel was opened - // and add it to the initial report timestamp to get the timestamp - // of the snapshot we want to report right now. - // NOTE: Multiplying `timestampDelta` by a constant factor here - // would have an effect of fast-forward, which is something we - // might be interested in implementing in the future. - timestampDelta := time.Since(channelOpenedAt) - reportTimestamp := startReportingAt.Add(timestampDelta) - re, err := rep.Report(ctx, reportTimestamp) - if err != nil { - log.Errorf("Error generating report: %v", err) - return - } - renderer, filter, err := topologyRegistry.RendererForTopology(topologyID, r.Form, re) - if err != nil { - log.Errorf("Error generating report: %v", err) - return - } - - newTopo := detailed.CensorNodeSummaries( - detailed.Summaries( - ctx, - RenderContextForReporter(rep, re), - render.Render(ctx, re, renderer, filter).Nodes, - ), - censorCfg, - ) - diff := detailed.TopoDiff(previousTopo, newTopo) - previousTopo = newTopo - - if err := conn.WriteJSON(diff); err != nil { - if !xfer.IsExpectedWSCloseError(err) { - log.Errorf("cannot serialize topology diff: %s", err) - } + if err := wc.update(ctx); err != nil { + log.Errorf("%v", err) return } @@ -178,3 +152,57 @@ func handleWebsocket( } } } + +type websocketState struct { + rep Reporter + values url.Values + conn xfer.Websocket + previousTopo detailed.NodeSummaries + topologyID string + startReportingAt time.Time + reportTimestamp time.Time + censorCfg report.CensorConfig + channelOpenedAt time.Time +} + +func (wc *websocketState) update(ctx context.Context) error { + span := ot.StartSpan("websocket.Render", ot.Tag{"topology", wc.topologyID}) + defer span.Finish() + ctx = ot.ContextWithSpan(ctx, span) + // We measure how much time has passed since the channel was opened + // and add it to the initial report timestamp to get the timestamp + // of the snapshot we want to report right now. + // NOTE: Multiplying `timestampDelta` by a constant factor here + // would have an effect of fast-forward, which is something we + // might be interested in implementing in the future. + timestampDelta := time.Since(wc.channelOpenedAt) + reportTimestamp := wc.startReportingAt.Add(timestampDelta) + span.LogFields(otlog.String("opened-at", wc.channelOpenedAt.String()), + otlog.String("timestamp", reportTimestamp.String())) + re, err := wc.rep.Report(ctx, reportTimestamp) + if err != nil { + return errors.Wrapf(err, "Error generating report: %v") + } + renderer, filter, err := topologyRegistry.RendererForTopology(wc.topologyID, wc.values, re) + if err != nil { + return errors.Wrapf(err, "Error generating report: %v") + } + + newTopo := detailed.CensorNodeSummaries( + detailed.Summaries( + ctx, + RenderContextForReporter(wc.rep, re), + render.Render(ctx, re, renderer, filter).Nodes, + ), + wc.censorCfg, + ) + diff := detailed.TopoDiff(wc.previousTopo, newTopo) + wc.previousTopo = newTopo + + if err := wc.conn.WriteJSON(diff); err != nil { + if !xfer.IsExpectedWSCloseError(err) { + return errors.Wrapf(err, "cannot serialize topology diff: %s") + } + } + return nil +} diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index bdc0f0e6d..88907d9fb 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -370,6 +370,7 @@ func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report. if err != nil { return report.MakeReport(), err } + span.SetTag("userid", userid) end := timestamp start := end.Add(-c.cfg.Window) reportKeys, err := c.getReportKeys(ctx, userid, start, end)