From b0915519dfc1af5be22607107e0e5468d932bdaa Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 16 Sep 2019 11:03:02 +0000 Subject: [PATCH] refactor: move websocket state out to a struct to neaten up the send loop --- app/api_topology.go | 116 +++++++++++++++++++++++++------------------- 1 file changed, 67 insertions(+), 49 deletions(-) diff --git a/app/api_topology.go b/app/api_topology.go index 67ecf89ce..090b5e2af 100644 --- a/app/api_topology.go +++ b/app/api_topology.go @@ -2,12 +2,14 @@ package app import ( "net/http" + "net/url" "time" "context" "github.com/gorilla/mux" ot "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/weaveworks/scope/common/xfer" @@ -120,62 +122,26 @@ func handleWebsocket( } }(conn) - var ( - previousTopo detailed.NodeSummaries - tick = time.Tick(loop) - wait = make(chan struct{}, 1) - topologyID = mux.Vars(r)["topology"] - startReportingAt = deserializeTimestamp(r.Form.Get("timestamp")) - censorCfg = report.GetCensorConfigFromRequest(r) - channelOpenedAt = time.Now() - ) + wc := websocketState{ + rep: rep, + values: r.Form, + conn: conn, + topologyID: mux.Vars(r)["topology"], + startReportingAt: deserializeTimestamp(r.Form.Get("timestamp")), + censorCfg: report.GetCensorConfigFromRequest(r), + channelOpenedAt: time.Now(), + } + wait := make(chan struct{}, 1) rep.WaitOn(ctx, wait) defer rep.UnWait(ctx, wait) + tick := time.Tick(loop) for { - span := ot.StartSpan("ws.Render", ot.Tag{"topology", topologyID}) - ctx := ot.ContextWithSpan(ctx, span) - // We measure how much time has passed since the channel was opened - // and add it to the initial report timestamp to get the timestamp - // of the snapshot we want to report right now. - // NOTE: Multiplying `timestampDelta` by a constant factor here - // would have an effect of fast-forward, which is something we - // might be interested in implementing in the future. - timestampDelta := time.Since(channelOpenedAt) - reportTimestamp := startReportingAt.Add(timestampDelta) - re, err := rep.Report(ctx, reportTimestamp) - if err != nil { - log.Errorf("Error generating report: %v", err) - span.Finish() + if err := wc.update(ctx); err != nil { + log.Errorf("%v", err) return } - renderer, filter, err := topologyRegistry.RendererForTopology(topologyID, r.Form, re) - if err != nil { - log.Errorf("Error generating report: %v", err) - span.Finish() - return - } - - newTopo := detailed.CensorNodeSummaries( - detailed.Summaries( - ctx, - RenderContextForReporter(rep, re), - render.Render(ctx, re, renderer, filter).Nodes, - ), - censorCfg, - ) - diff := detailed.TopoDiff(previousTopo, newTopo) - previousTopo = newTopo - - if err := conn.WriteJSON(diff); err != nil { - if !xfer.IsExpectedWSCloseError(err) { - log.Errorf("cannot serialize topology diff: %s", err) - } - span.Finish() - return - } - span.Finish() select { case <-wait: @@ -185,3 +151,55 @@ func handleWebsocket( } } } + +type websocketState struct { + rep Reporter + values url.Values + conn xfer.Websocket + previousTopo detailed.NodeSummaries + topologyID string + startReportingAt time.Time + reportTimestamp time.Time + censorCfg report.CensorConfig + channelOpenedAt time.Time +} + +func (wc *websocketState) update(ctx context.Context) error { + span := ot.StartSpan("ws.Render", ot.Tag{"topology", wc.topologyID}) + defer span.Finish() + ctx = ot.ContextWithSpan(ctx, span) + // We measure how much time has passed since the channel was opened + // and add it to the initial report timestamp to get the timestamp + // of the snapshot we want to report right now. + // NOTE: Multiplying `timestampDelta` by a constant factor here + // would have an effect of fast-forward, which is something we + // might be interested in implementing in the future. + timestampDelta := time.Since(wc.channelOpenedAt) + reportTimestamp := wc.startReportingAt.Add(timestampDelta) + re, err := wc.rep.Report(ctx, reportTimestamp) + if err != nil { + return errors.Wrapf(err, "Error generating report: %v") + } + renderer, filter, err := topologyRegistry.RendererForTopology(wc.topologyID, wc.values, re) + if err != nil { + return errors.Wrapf(err, "Error generating report: %v") + } + + newTopo := detailed.CensorNodeSummaries( + detailed.Summaries( + ctx, + RenderContextForReporter(wc.rep, re), + render.Render(ctx, re, renderer, filter).Nodes, + ), + wc.censorCfg, + ) + diff := detailed.TopoDiff(wc.previousTopo, newTopo) + wc.previousTopo = newTopo + + if err := wc.conn.WriteJSON(diff); err != nil { + if !xfer.IsExpectedWSCloseError(err) { + return errors.Wrapf(err, "cannot serialize topology diff: %s") + } + } + return nil +}