diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index 67dc58d9e..2639c88fb 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -44,7 +44,7 @@ type flow struct { Metas []meta `xml:"meta"` } -type conntrackOutput struct { +type conntrack struct { XMLName xml.Name `xml:"conntrack"` Flows []flow `xml:"flow"` } @@ -61,7 +61,7 @@ type endpointMapping struct { // natTable returns a list of endpoints that have been remapped by NAT. func natTable() ([]endpointMapping, error) { - var conntrack conntrackOutput + var conntrack conntrack cmd := exec.Command("conntrack", "-L", "--any-nat", "-o", "xml") stdout, err := cmd.StdoutPipe() if err != nil { diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 32acbddd6..0c7c5bfad 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -1,12 +1,11 @@ package endpoint import ( - "log" + "fmt" "strconv" "time" "github.com/prometheus/client_golang/prometheus" - "github.com/typetypetype/conntrack" "github.com/weaveworks/procspy" "github.com/weaveworks/scope/probe/process" @@ -25,7 +24,6 @@ type Reporter struct { hostName string includeProcesses bool includeNAT bool - conntracker *conntrack.ConnTrack } // SpyDuration is an exported prometheus metric @@ -46,30 +44,11 @@ var SpyDuration = prometheus.NewSummaryVec( // is stored in the Endpoint topology. It optionally enriches that topology // with process (PID) information. func NewReporter(hostID, hostName string, includeProcesses bool) *Reporter { - var ( - conntrackModulePresent = conntrackModulePresent() - conntracker *conntrack.ConnTrack - err error - ) - if conntrackModulePresent { - conntracker, err = conntrack.New() - if err != nil { - log.Printf("Failed to start conntracker: %v", err) - } - } return &Reporter{ hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, - includeNAT: conntrackModulePresent, - conntracker: conntracker, - } -} - -// Stop stop stop -func (r *Reporter) Stop() { - if r.conntracker != nil { - r.conntracker.Close() + includeNAT: conntrackModulePresent(), } } @@ -86,20 +65,7 @@ func (r *Reporter) Report() (report.Report, error) { } for conn := conns.Next(); conn != nil; conn = conns.Next() { - r.addConnection(&rpt, conn.LocalAddress.String(), conn.RemoteAddress.String(), - conn.LocalPort, conn.RemotePort, &conn.Proc) - } - - for _, conn := range r.conntracker.Connections() { - localPort, err := strconv.ParseUint(conn.LocalPort, 10, 16) - if err != nil { - continue - } - remotePort, err := strconv.ParseUint(conn.RemotePort, 10, 16) - if err != nil { - continue - } - r.addConnection(&rpt, conn.Local, conn.Remote, uint16(localPort), uint16(remotePort), nil) + r.addConnection(&rpt, conn) } if r.includeNAT { @@ -109,45 +75,40 @@ func (r *Reporter) Report() (report.Report, error) { return rpt, err } -func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr string, localPort, remotePort uint16, proc *procspy.Proc) { - localIsClient := int(localPort) > int(remotePort) +func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { + var ( + localIsClient = int(c.LocalPort) > int(c.RemotePort) + localAddressNodeID = report.MakeAddressNodeID(r.hostID, c.LocalAddress.String()) + remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, c.RemoteAddress.String()) + adjacencyID = "" + edgeID = "" + ) - // Update address topology - { - var ( - localAddressNodeID = report.MakeAddressNodeID(r.hostID, localAddr) - remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, remoteAddr) - adjacencyID = "" - edgeID = "" - ) + if localIsClient { + adjacencyID = report.MakeAdjacencyID(localAddressNodeID) + rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID) - if localIsClient { - adjacencyID = report.MakeAdjacencyID(localAddressNodeID) - rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID) + edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID) + } else { + adjacencyID = report.MakeAdjacencyID(remoteAddressNodeID) + rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(localAddressNodeID) - edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID) - } else { - adjacencyID = report.MakeAdjacencyID(remoteAddressNodeID) - rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(localAddressNodeID) - - edgeID = report.MakeEdgeID(remoteAddressNodeID, localAddressNodeID) - } - - countTCPConnection(rpt.Address.EdgeMetadatas, edgeID) - - if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok { - rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{ - "name": r.hostName, - Addr: localAddr, - }) - } + edgeID = report.MakeEdgeID(remoteAddressNodeID, localAddressNodeID) } - // Update endpoint topology - { + if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok { + rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{ + "name": r.hostName, + Addr: c.LocalAddress.String(), + }) + } + + countTCPConnection(rpt.Address.EdgeMetadatas, edgeID) + + if c.Proc.PID > 0 { var ( - localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, localAddr, strconv.Itoa(int(localPort))) - remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, remoteAddr, strconv.Itoa(int(remotePort))) + localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort))) + remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort))) adjacencyID = "" edgeID = "" ) @@ -164,24 +125,18 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin edgeID = report.MakeEdgeID(remoteEndpointNodeID, localEndpointNodeID) } - countTCPConnection(rpt.Endpoint.EdgeMetadatas, edgeID) - - md, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID] - updated := !ok - if !ok { - md = report.MakeNodeMetadataWith(map[string]string{ - Addr: localAddr, - Port: strconv.Itoa(int(localPort)), + if _, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID]; !ok { + // First hit establishes NodeMetadata for scoped local address + port + md := report.MakeNodeMetadataWith(map[string]string{ + Addr: c.LocalAddress.String(), + Port: strconv.Itoa(int(c.LocalPort)), + process.PID: fmt.Sprint(c.Proc.PID), }) - } - if proc != nil && proc.PID > 0 { - pid := strconv.FormatUint(uint64(proc.PID), 10) - updated = updated || md.Metadata[process.PID] != pid - md.Metadata[process.PID] = pid - } - if updated { + rpt.Endpoint.NodeMetadatas[localEndpointNodeID] = md } + + countTCPConnection(rpt.Endpoint.EdgeMetadatas, edgeID) } } diff --git a/probe/main.go b/probe/main.go index 026f70682..d636b386a 100644 --- a/probe/main.go +++ b/probe/main.go @@ -103,16 +103,13 @@ func main() { } var ( - endpointReporter = endpoint.NewReporter(hostID, hostName, *spyProcs) - processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) - reporters = []Reporter{ - endpointReporter, - host.NewReporter(hostID, hostName, localNets), - process.NewReporter(processCache, hostID), - } - taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)} + taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)} + reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)} + processCache *process.CachingWalker ) - defer endpointReporter.Stop() + + processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) + reporters = append(reporters, process.NewReporter(processCache, hostID)) if *dockerEnabled { if err := report.AddLocalBridge(*dockerBridge); err != nil {