mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 18:20:27 +00:00
refactor: move websocket state out to a struct to neaten up the send loop
This commit is contained in:
@@ -2,12 +2,14 @@ package app
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"context"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
ot "github.com/opentracing/opentracing-go"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/weaveworks/scope/common/xfer"
|
||||
@@ -120,62 +122,26 @@ func handleWebsocket(
|
||||
}
|
||||
}(conn)
|
||||
|
||||
var (
|
||||
previousTopo detailed.NodeSummaries
|
||||
tick = time.Tick(loop)
|
||||
wait = make(chan struct{}, 1)
|
||||
topologyID = mux.Vars(r)["topology"]
|
||||
startReportingAt = deserializeTimestamp(r.Form.Get("timestamp"))
|
||||
censorCfg = report.GetCensorConfigFromRequest(r)
|
||||
channelOpenedAt = time.Now()
|
||||
)
|
||||
wc := websocketState{
|
||||
rep: rep,
|
||||
values: r.Form,
|
||||
conn: conn,
|
||||
topologyID: mux.Vars(r)["topology"],
|
||||
startReportingAt: deserializeTimestamp(r.Form.Get("timestamp")),
|
||||
censorCfg: report.GetCensorConfigFromRequest(r),
|
||||
channelOpenedAt: time.Now(),
|
||||
}
|
||||
|
||||
wait := make(chan struct{}, 1)
|
||||
rep.WaitOn(ctx, wait)
|
||||
defer rep.UnWait(ctx, wait)
|
||||
|
||||
tick := time.Tick(loop)
|
||||
for {
|
||||
span := ot.StartSpan("ws.Render", ot.Tag{"topology", topologyID})
|
||||
ctx := ot.ContextWithSpan(ctx, span)
|
||||
// We measure how much time has passed since the channel was opened
|
||||
// and add it to the initial report timestamp to get the timestamp
|
||||
// of the snapshot we want to report right now.
|
||||
// NOTE: Multiplying `timestampDelta` by a constant factor here
|
||||
// would have an effect of fast-forward, which is something we
|
||||
// might be interested in implementing in the future.
|
||||
timestampDelta := time.Since(channelOpenedAt)
|
||||
reportTimestamp := startReportingAt.Add(timestampDelta)
|
||||
re, err := rep.Report(ctx, reportTimestamp)
|
||||
if err != nil {
|
||||
log.Errorf("Error generating report: %v", err)
|
||||
span.Finish()
|
||||
if err := wc.update(ctx); err != nil {
|
||||
log.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
renderer, filter, err := topologyRegistry.RendererForTopology(topologyID, r.Form, re)
|
||||
if err != nil {
|
||||
log.Errorf("Error generating report: %v", err)
|
||||
span.Finish()
|
||||
return
|
||||
}
|
||||
|
||||
newTopo := detailed.CensorNodeSummaries(
|
||||
detailed.Summaries(
|
||||
ctx,
|
||||
RenderContextForReporter(rep, re),
|
||||
render.Render(ctx, re, renderer, filter).Nodes,
|
||||
),
|
||||
censorCfg,
|
||||
)
|
||||
diff := detailed.TopoDiff(previousTopo, newTopo)
|
||||
previousTopo = newTopo
|
||||
|
||||
if err := conn.WriteJSON(diff); err != nil {
|
||||
if !xfer.IsExpectedWSCloseError(err) {
|
||||
log.Errorf("cannot serialize topology diff: %s", err)
|
||||
}
|
||||
span.Finish()
|
||||
return
|
||||
}
|
||||
span.Finish()
|
||||
|
||||
select {
|
||||
case <-wait:
|
||||
@@ -185,3 +151,55 @@ func handleWebsocket(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type websocketState struct {
|
||||
rep Reporter
|
||||
values url.Values
|
||||
conn xfer.Websocket
|
||||
previousTopo detailed.NodeSummaries
|
||||
topologyID string
|
||||
startReportingAt time.Time
|
||||
reportTimestamp time.Time
|
||||
censorCfg report.CensorConfig
|
||||
channelOpenedAt time.Time
|
||||
}
|
||||
|
||||
func (wc *websocketState) update(ctx context.Context) error {
|
||||
span := ot.StartSpan("ws.Render", ot.Tag{"topology", wc.topologyID})
|
||||
defer span.Finish()
|
||||
ctx = ot.ContextWithSpan(ctx, span)
|
||||
// We measure how much time has passed since the channel was opened
|
||||
// and add it to the initial report timestamp to get the timestamp
|
||||
// of the snapshot we want to report right now.
|
||||
// NOTE: Multiplying `timestampDelta` by a constant factor here
|
||||
// would have an effect of fast-forward, which is something we
|
||||
// might be interested in implementing in the future.
|
||||
timestampDelta := time.Since(wc.channelOpenedAt)
|
||||
reportTimestamp := wc.startReportingAt.Add(timestampDelta)
|
||||
re, err := wc.rep.Report(ctx, reportTimestamp)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Error generating report: %v")
|
||||
}
|
||||
renderer, filter, err := topologyRegistry.RendererForTopology(wc.topologyID, wc.values, re)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Error generating report: %v")
|
||||
}
|
||||
|
||||
newTopo := detailed.CensorNodeSummaries(
|
||||
detailed.Summaries(
|
||||
ctx,
|
||||
RenderContextForReporter(wc.rep, re),
|
||||
render.Render(ctx, re, renderer, filter).Nodes,
|
||||
),
|
||||
wc.censorCfg,
|
||||
)
|
||||
diff := detailed.TopoDiff(wc.previousTopo, newTopo)
|
||||
wc.previousTopo = newTopo
|
||||
|
||||
if err := wc.conn.WriteJSON(diff); err != nil {
|
||||
if !xfer.IsExpectedWSCloseError(err) {
|
||||
return errors.Wrapf(err, "cannot serialize topology diff: %s")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user