Merge nodeToIP, endpoints2Nodes and ipToNode into one Renderer

This is much more efficient as we skip creating then merging all intermediate Nodes
This commit is contained in:
Bryan Boreham
2017-10-19 17:27:17 +00:00
parent b684e3c6fc
commit e16aaf6c43
2 changed files with 75 additions and 64 deletions

View File

@@ -39,85 +39,82 @@ var ContainerRenderer = Memoise(MakeFilter(
),
))
var mapEndpoint2IP = Memoise(MakeMap(endpoint2IP, SelectEndpoint))
const originalNodeID = "original_node_id"
// ConnectionJoin joins the given renderer with connections from the
// endpoints topology, using the toIPs function to extract IPs from
// the nodes.
func ConnectionJoin(toIPs func(report.Node) []string, r Renderer) Renderer {
nodeToIP := func(n report.Node, _ report.Networks) report.Nodes {
result := report.Nodes{}
for _, ip := range toIPs(n) {
result[ip] = report.MakeNode(ip).
WithTopology(IP).
WithLatests(map[string]string{
originalNodeID: n.ID,
}).
WithCounters(map[string]int{IP: 1})
}
return result
}
return MakeReduce(
MakeMap(
ipToNode,
MakeReduce(
MakeMap(nodeToIP, r),
mapEndpoint2IP,
),
),
r,
)
return connectionJoin{toIPs: toIPs, r: r}
}
func ipToNode(n report.Node, _ report.Networks) report.Nodes {
// propagate non-IP nodes
if n.Topology != IP {
return report.Nodes{n.ID: n}
}
// If an IP is shared between multiple nodes, we can't reliably
// attribute an connection based on its IP
if count, _ := n.Counters.Lookup(IP); count > 1 {
return report.Nodes{}
}
// If this node is not of the original type, exclude it. This
// excludes all the nodes we've dragged in from endpoint that we
// failed to join to a node.
id, ok := n.Latest.Lookup(originalNodeID)
if !ok {
return report.Nodes{}
}
return report.Nodes{id: NewDerivedNode(id, n)}
type connectionJoin struct {
toIPs func(report.Node) []string
r Renderer
}
// endpoint2IP maps endpoint nodes to their IP address, for joining
// with container nodes.
func endpoint2IP(m report.Node, local report.Networks) report.Nodes {
scope, addr, port, ok := report.ParseEndpointNodeID(m.ID)
if !ok {
return report.Nodes{}
}
func (c connectionJoin) Render(rpt report.Report, dct Decorator) report.Nodes {
local := LocalNetworks(rpt)
inputNodes := c.r.Render(rpt, dct)
endpoints := SelectEndpoint.Render(rpt, dct)
// Nodes without a hostid may be pseudo nodes
if _, ok := m.Latest.Lookup(report.HostNodeID); !ok {
if externalNode, ok := NewDerivedExternalNode(m, addr, local); ok {
return report.Nodes{externalNode.ID: externalNode}
// Collect all the IPs we are trying to map to, and which ID they map from
var ipNodes = map[string]string{}
for _, n := range inputNodes {
for _, ip := range c.toIPs(n) {
if _, exists := ipNodes[ip]; exists {
// If an IP is shared between multiple nodes, we can't reliably
// attribute an connection based on its IP
ipNodes[ip] = "" // blank out the mapping so we don't use it
} else {
ipNodes[ip] = n.ID
}
}
}
var ret = make(report.Nodes)
var mapped = map[string]string{} // input node ID -> output node ID
// We also allow for joining on ip:port pairs. This is useful for
// connections to the host IPs which have been port mapped to a
// container can only be unambiguously identified with the port.
// So we need to emit two nodes, for two different cases.
id := report.MakeScopedEndpointNodeID(scope, addr, "")
idWithPort := report.MakeScopedEndpointNodeID(scope, addr, port)
return report.Nodes{
id: NewDerivedNode(id, m).WithTopology(IP),
idWithPort: NewDerivedNode(idWithPort, m).WithTopology(IP),
// Now look at all the endpoints and see which map to IP nodes
for _, m := range endpoints {
scope, addr, port, ok := report.ParseEndpointNodeID(m.ID)
if !ok {
continue
}
// Nodes without a hostid may be pseudo nodes - if so, pass through to result
if _, ok := m.Latest.Lookup(report.HostNodeID); !ok {
if id, ok := externalNodeID(m, addr, local); ok {
addToResults(m, id, ret, mapped, func() report.Node {
return report.MakeNode(id).WithTopology(Pseudo)
})
continue
}
}
id, found := ipNodes[report.MakeScopedEndpointNodeID(scope, addr, "")]
// We also allow for joining on ip:port pairs. This is useful for
// connections to the host IPs which have been port mapped to a
// container can only be unambiguously identified with the port.
if !found {
id, found = ipNodes[report.MakeScopedEndpointNodeID(scope, addr, port)]
}
if found && id != "" { // not one we blanked out earlier
addToResults(m, id, ret, mapped, func() report.Node {
return inputNodes[id]
})
}
}
// Copy through any unmatched input nodes
for _, n := range inputNodes {
if _, found := ret[n.ID]; !found {
ret[n.ID] = n
}
}
fixupAdjancencies(inputNodes, ret, mapped)
fixupAdjancencies(endpoints, ret, mapped)
return ret
}
func (c connectionJoin) Stats(rpt report.Report, _ Decorator) Stats {
return Stats{} // nothing to report
}
// FilterEmpty is a Renderer which filters out nodes which have no children

View File

@@ -104,6 +104,20 @@ func (e endpoints2Hosts) Render(rpt report.Report, dct Decorator) report.Nodes {
return ret
}
// Add Node M to the result set ret under id, creating a new result
// node if not already there, and updating the old-id to new-id mapping
// Note we do not update any counters for child topologies here
func addToResults(m report.Node, id string, ret report.Nodes, mapped map[string]string, create func() report.Node) {
result, exists := ret[id]
if !exists {
result = create()
}
result.Children = result.Children.Add(m)
result.Children = result.Children.Merge(m.Children)
ret[result.ID] = result
mapped[m.ID] = result.ID
}
// Rewrite Adjacency for new nodes in ret, original nodes in input, and mapping old->new IDs in mapped
func fixupAdjancencies(input, ret report.Nodes, mapped map[string]string) {
for _, n := range input {