Merge pull request #3849 from weaveworks/cancel-join-result

Exit joinResults.result() if context cancelled
This commit is contained in:
Bryan Boreham
2021-04-22 12:22:07 +01:00
committed by GitHub
7 changed files with 24 additions and 13 deletions

View File

@@ -275,7 +275,7 @@ func (m containerImageRenderer) Render(ctx context.Context, rpt report.Report) N
id := report.MakeContainerImageNodeID(imageID)
ret.addChildAndChildren(n, id, report.ContainerImage)
}
return ret.result(containers)
return ret.result(ctx, containers)
}
func containerImageNodeID(n report.Node) string {
@@ -328,5 +328,5 @@ func (m containerHostnameRenderer) Render(ctx context.Context, rpt report.Report
}
ret.addChildAndChildren(n, id, containerHostnameTopology)
}
return ret.result(containers)
return ret.result(ctx, containers)
}

View File

@@ -3,6 +3,8 @@ package render
import (
"context"
opentracing "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/weaveworks/scope/report"
)
@@ -30,6 +32,8 @@ func MapEndpoints(f endpointMapFunc, topology string) Renderer {
}
func (e mapEndpoints) Render(ctx context.Context, rpt report.Report) Nodes {
span, ctx := opentracing.StartSpanFromContext(ctx, "mapEndpoints.Render")
defer span.Finish()
local := LocalNetworks(rpt)
endpoints := SelectEndpoint.Render(ctx, rpt)
ret := newJoinResults(TopologySelector(e.topology).Render(ctx, rpt).Nodes)
@@ -47,5 +51,7 @@ func (e mapEndpoints) Render(ctx context.Context, rpt report.Report) Nodes {
ret.addChild(n, id, e.topology)
}
}
return ret.result(endpoints)
span.LogFields(otlog.Int("input.nodes", len(endpoints.Nodes)),
otlog.Int("ouput.nodes", len(ret.nodes)))
return ret.result(ctx, endpoints)
}

View File

@@ -17,7 +17,7 @@ const (
// in one call - useful for functions that need to consider the entire graph.
// We should minimise the use of this renderer type, as it is very inflexible.
type CustomRenderer struct {
RenderFunc func(Nodes) Nodes
RenderFunc func(context.Context, Nodes) Nodes
Renderer
}
@@ -26,7 +26,7 @@ func (c CustomRenderer) Render(ctx context.Context, rpt report.Report) Nodes {
if ctx.Err() != nil {
return Nodes{}
}
return c.RenderFunc(c.Renderer.Render(ctx, rpt))
return c.RenderFunc(ctx, c.Renderer.Render(ctx, rpt))
}
// FilterFunc is the function type used by Filters
@@ -189,7 +189,7 @@ func filterInternetAdjacencies(nodes report.Nodes) {
func ColorConnected(r Renderer) Renderer {
return CustomRenderer{
Renderer: r,
RenderFunc: func(input Nodes) Nodes {
RenderFunc: func(ctx context.Context, input Nodes) Nodes {
output := input.Copy()
for id := range connected(input.Nodes) {
output[id] = output[id].WithLatest(IsConnectedMark, mtime.Now(), "true")

View File

@@ -1,6 +1,8 @@
package render
import (
"context"
"github.com/weaveworks/scope/report"
)
@@ -25,7 +27,7 @@ var HostRenderer = MakeReduce(
// format for a host, but without any Major or Minor labels. It does
// not have enough info to do that, and the resulting graph must be
// merged with a host graph to get that info.
func nodes2Hosts(nodes Nodes) Nodes {
func nodes2Hosts(ctx context.Context, nodes Nodes) Nodes {
ret := newJoinResults(nil)
for _, n := range nodes.Nodes {
@@ -47,7 +49,7 @@ func nodes2Hosts(nodes Nodes) Nodes {
}
}
}
return ret.result(nodes)
return ret.result(ctx, nodes)
}
func endpoint2Host(n report.Node) string {

View File

@@ -197,5 +197,5 @@ func (m Map2Parent) Render(ctx context.Context, rpt report.Report) Nodes {
ret.addChildAndChildren(n, id, Pseudo)
}
}
return ret.result(input)
return ret.result(ctx, input)
}

View File

@@ -95,7 +95,7 @@ func hasMoreThanOneConnection(n report.Node, endpoints report.Nodes) bool {
var processNameTopology = MakeGroupNodeTopology(report.Process, report.Name)
// processes2Names maps process Nodes to Nodes for each process name.
func processes2Names(processes Nodes) Nodes {
func processes2Names(ctx context.Context, processes Nodes) Nodes {
ret := newJoinResults(nil)
for _, n := range processes.Nodes {
@@ -105,5 +105,5 @@ func processes2Names(processes Nodes) Nodes {
ret.addChildAndChildren(n, name, processNameTopology)
}
}
return ret.result(processes)
return ret.result(ctx, processes)
}

View File

@@ -133,7 +133,7 @@ func (m Map) Render(ctx context.Context, rpt report.Report) Nodes {
span.LogFields(otlog.Int("input.nodes", len(input.Nodes)),
otlog.Int("ouput.nodes", len(output.nodes)))
return output.result(input)
return output.result(ctx, input)
}
// Condition is a predecate over the entire report that can evaluate to true or false.
@@ -233,8 +233,11 @@ func (ret *joinResults) passThrough(n report.Node) {
// Rewrite Adjacency of nodes in ret mapped from original nodes in
// input, and return the result.
func (ret *joinResults) result(input Nodes) Nodes {
func (ret *joinResults) result(ctx context.Context, input Nodes) Nodes {
for _, n := range input.Nodes {
if ctx.Err() != nil { // check if cancelled
return Nodes{}
}
outID, ok := ret.mapped[n.ID]
if !ok {
continue