diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index 2639c88fb..67dc58d9e 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -44,7 +44,7 @@ type flow struct { Metas []meta `xml:"meta"` } -type conntrack struct { +type conntrackOutput struct { XMLName xml.Name `xml:"conntrack"` Flows []flow `xml:"flow"` } @@ -61,7 +61,7 @@ type endpointMapping struct { // natTable returns a list of endpoints that have been remapped by NAT. func natTable() ([]endpointMapping, error) { - var conntrack conntrack + var conntrack conntrackOutput cmd := exec.Command("conntrack", "-L", "--any-nat", "-o", "xml") stdout, err := cmd.StdoutPipe() if err != nil { diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 0c7c5bfad..32acbddd6 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -1,11 +1,12 @@ package endpoint import ( - "fmt" + "log" "strconv" "time" "github.com/prometheus/client_golang/prometheus" + "github.com/typetypetype/conntrack" "github.com/weaveworks/procspy" "github.com/weaveworks/scope/probe/process" @@ -24,6 +25,7 @@ type Reporter struct { hostName string includeProcesses bool includeNAT bool + conntracker *conntrack.ConnTrack } // SpyDuration is an exported prometheus metric @@ -44,11 +46,30 @@ var SpyDuration = prometheus.NewSummaryVec( // is stored in the Endpoint topology. It optionally enriches that topology // with process (PID) information. func NewReporter(hostID, hostName string, includeProcesses bool) *Reporter { + var ( + conntrackModulePresent = conntrackModulePresent() + conntracker *conntrack.ConnTrack + err error + ) + if conntrackModulePresent { + conntracker, err = conntrack.New() + if err != nil { + log.Printf("Failed to start conntracker: %v", err) + } + } return &Reporter{ hostID: hostID, hostName: hostName, includeProcesses: includeProcesses, - includeNAT: conntrackModulePresent(), + includeNAT: conntrackModulePresent, + conntracker: conntracker, + } +} + +// Stop stop stop +func (r *Reporter) Stop() { + if r.conntracker != nil { + r.conntracker.Close() } } @@ -65,7 +86,20 @@ func (r *Reporter) Report() (report.Report, error) { } for conn := conns.Next(); conn != nil; conn = conns.Next() { - r.addConnection(&rpt, conn) + r.addConnection(&rpt, conn.LocalAddress.String(), conn.RemoteAddress.String(), + conn.LocalPort, conn.RemotePort, &conn.Proc) + } + + for _, conn := range r.conntracker.Connections() { + localPort, err := strconv.ParseUint(conn.LocalPort, 10, 16) + if err != nil { + continue + } + remotePort, err := strconv.ParseUint(conn.RemotePort, 10, 16) + if err != nil { + continue + } + r.addConnection(&rpt, conn.Local, conn.Remote, uint16(localPort), uint16(remotePort), nil) } if r.includeNAT { @@ -75,40 +109,45 @@ func (r *Reporter) Report() (report.Report, error) { return rpt, err } -func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { - var ( - localIsClient = int(c.LocalPort) > int(c.RemotePort) - localAddressNodeID = report.MakeAddressNodeID(r.hostID, c.LocalAddress.String()) - remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, c.RemoteAddress.String()) - adjacencyID = "" - edgeID = "" - ) +func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr string, localPort, remotePort uint16, proc *procspy.Proc) { + localIsClient := int(localPort) > int(remotePort) - if localIsClient { - adjacencyID = report.MakeAdjacencyID(localAddressNodeID) - rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID) - - edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID) - } else { - adjacencyID = report.MakeAdjacencyID(remoteAddressNodeID) - rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(localAddressNodeID) - - edgeID = report.MakeEdgeID(remoteAddressNodeID, localAddressNodeID) - } - - if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok { - rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{ - "name": r.hostName, - Addr: c.LocalAddress.String(), - }) - } - - countTCPConnection(rpt.Address.EdgeMetadatas, edgeID) - - if c.Proc.PID > 0 { + // Update address topology + { var ( - localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort))) - remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort))) + localAddressNodeID = report.MakeAddressNodeID(r.hostID, localAddr) + remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, remoteAddr) + adjacencyID = "" + edgeID = "" + ) + + if localIsClient { + adjacencyID = report.MakeAdjacencyID(localAddressNodeID) + rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID) + + edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID) + } else { + adjacencyID = report.MakeAdjacencyID(remoteAddressNodeID) + rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(localAddressNodeID) + + edgeID = report.MakeEdgeID(remoteAddressNodeID, localAddressNodeID) + } + + countTCPConnection(rpt.Address.EdgeMetadatas, edgeID) + + if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok { + rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{ + "name": r.hostName, + Addr: localAddr, + }) + } + } + + // Update endpoint topology + { + var ( + localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, localAddr, strconv.Itoa(int(localPort))) + remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, remoteAddr, strconv.Itoa(int(remotePort))) adjacencyID = "" edgeID = "" ) @@ -125,18 +164,24 @@ func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) { edgeID = report.MakeEdgeID(remoteEndpointNodeID, localEndpointNodeID) } - if _, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID]; !ok { - // First hit establishes NodeMetadata for scoped local address + port - md := report.MakeNodeMetadataWith(map[string]string{ - Addr: c.LocalAddress.String(), - Port: strconv.Itoa(int(c.LocalPort)), - process.PID: fmt.Sprint(c.Proc.PID), - }) + countTCPConnection(rpt.Endpoint.EdgeMetadatas, edgeID) + md, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID] + updated := !ok + if !ok { + md = report.MakeNodeMetadataWith(map[string]string{ + Addr: localAddr, + Port: strconv.Itoa(int(localPort)), + }) + } + if proc != nil && proc.PID > 0 { + pid := strconv.FormatUint(uint64(proc.PID), 10) + updated = updated || md.Metadata[process.PID] != pid + md.Metadata[process.PID] = pid + } + if updated { rpt.Endpoint.NodeMetadatas[localEndpointNodeID] = md } - - countTCPConnection(rpt.Endpoint.EdgeMetadatas, edgeID) } } diff --git a/probe/main.go b/probe/main.go index d636b386a..026f70682 100644 --- a/probe/main.go +++ b/probe/main.go @@ -103,13 +103,16 @@ func main() { } var ( - taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)} - reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)} - processCache *process.CachingWalker + endpointReporter = endpoint.NewReporter(hostID, hostName, *spyProcs) + processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) + reporters = []Reporter{ + endpointReporter, + host.NewReporter(hostID, hostName, localNets), + process.NewReporter(processCache, hostID), + } + taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)} ) - - processCache = process.NewCachingWalker(process.NewWalker(*procRoot)) - reporters = append(reporters, process.NewReporter(processCache, hostID)) + defer endpointReporter.Stop() if *dockerEnabled { if err := report.AddLocalBridge(*dockerBridge); err != nil {