Revert "Also use conntrack to populate the endpoint topology."

This reverts commit f89044a381.
This commit is contained in:
Tom Wilkie
2015-08-21 14:55:39 +00:00
parent 3672cb3c4f
commit 7ca9dd32e6
3 changed files with 48 additions and 96 deletions

View File

@@ -44,7 +44,7 @@ type flow struct {
Metas []meta `xml:"meta"`
}
type conntrackOutput struct {
type conntrack struct {
XMLName xml.Name `xml:"conntrack"`
Flows []flow `xml:"flow"`
}
@@ -61,7 +61,7 @@ type endpointMapping struct {
// natTable returns a list of endpoints that have been remapped by NAT.
func natTable() ([]endpointMapping, error) {
var conntrack conntrackOutput
var conntrack conntrack
cmd := exec.Command("conntrack", "-L", "--any-nat", "-o", "xml")
stdout, err := cmd.StdoutPipe()
if err != nil {

View File

@@ -1,12 +1,11 @@
package endpoint
import (
"log"
"fmt"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/typetypetype/conntrack"
"github.com/weaveworks/procspy"
"github.com/weaveworks/scope/probe/process"
@@ -25,7 +24,6 @@ type Reporter struct {
hostName string
includeProcesses bool
includeNAT bool
conntracker *conntrack.ConnTrack
}
// SpyDuration is an exported prometheus metric
@@ -46,30 +44,11 @@ var SpyDuration = prometheus.NewSummaryVec(
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool) *Reporter {
var (
conntrackModulePresent = conntrackModulePresent()
conntracker *conntrack.ConnTrack
err error
)
if conntrackModulePresent {
conntracker, err = conntrack.New()
if err != nil {
log.Printf("Failed to start conntracker: %v", err)
}
}
return &Reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
includeNAT: conntrackModulePresent,
conntracker: conntracker,
}
}
// Stop stop stop
func (r *Reporter) Stop() {
if r.conntracker != nil {
r.conntracker.Close()
includeNAT: conntrackModulePresent(),
}
}
@@ -86,20 +65,7 @@ func (r *Reporter) Report() (report.Report, error) {
}
for conn := conns.Next(); conn != nil; conn = conns.Next() {
r.addConnection(&rpt, conn.LocalAddress.String(), conn.RemoteAddress.String(),
conn.LocalPort, conn.RemotePort, &conn.Proc)
}
for _, conn := range r.conntracker.Connections() {
localPort, err := strconv.ParseUint(conn.LocalPort, 10, 16)
if err != nil {
continue
}
remotePort, err := strconv.ParseUint(conn.RemotePort, 10, 16)
if err != nil {
continue
}
r.addConnection(&rpt, conn.Local, conn.Remote, uint16(localPort), uint16(remotePort), nil)
r.addConnection(&rpt, conn)
}
if r.includeNAT {
@@ -109,45 +75,40 @@ func (r *Reporter) Report() (report.Report, error) {
return rpt, err
}
func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr string, localPort, remotePort uint16, proc *procspy.Proc) {
localIsClient := int(localPort) > int(remotePort)
func (r *Reporter) addConnection(rpt *report.Report, c *procspy.Connection) {
var (
localIsClient = int(c.LocalPort) > int(c.RemotePort)
localAddressNodeID = report.MakeAddressNodeID(r.hostID, c.LocalAddress.String())
remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, c.RemoteAddress.String())
adjacencyID = ""
edgeID = ""
)
// Update address topology
{
var (
localAddressNodeID = report.MakeAddressNodeID(r.hostID, localAddr)
remoteAddressNodeID = report.MakeAddressNodeID(r.hostID, remoteAddr)
adjacencyID = ""
edgeID = ""
)
if localIsClient {
adjacencyID = report.MakeAdjacencyID(localAddressNodeID)
rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID)
if localIsClient {
adjacencyID = report.MakeAdjacencyID(localAddressNodeID)
rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(remoteAddressNodeID)
edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID)
} else {
adjacencyID = report.MakeAdjacencyID(remoteAddressNodeID)
rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(localAddressNodeID)
edgeID = report.MakeEdgeID(localAddressNodeID, remoteAddressNodeID)
} else {
adjacencyID = report.MakeAdjacencyID(remoteAddressNodeID)
rpt.Address.Adjacency[adjacencyID] = rpt.Address.Adjacency[adjacencyID].Add(localAddressNodeID)
edgeID = report.MakeEdgeID(remoteAddressNodeID, localAddressNodeID)
}
countTCPConnection(rpt.Address.EdgeMetadatas, edgeID)
if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok {
rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{
"name": r.hostName,
Addr: localAddr,
})
}
edgeID = report.MakeEdgeID(remoteAddressNodeID, localAddressNodeID)
}
// Update endpoint topology
{
if _, ok := rpt.Address.NodeMetadatas[localAddressNodeID]; !ok {
rpt.Address.NodeMetadatas[localAddressNodeID] = report.MakeNodeMetadataWith(map[string]string{
"name": r.hostName,
Addr: c.LocalAddress.String(),
})
}
countTCPConnection(rpt.Address.EdgeMetadatas, edgeID)
if c.Proc.PID > 0 {
var (
localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, localAddr, strconv.Itoa(int(localPort)))
remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, remoteAddr, strconv.Itoa(int(remotePort)))
localEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort)))
remoteEndpointNodeID = report.MakeEndpointNodeID(r.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort)))
adjacencyID = ""
edgeID = ""
)
@@ -164,24 +125,18 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
edgeID = report.MakeEdgeID(remoteEndpointNodeID, localEndpointNodeID)
}
countTCPConnection(rpt.Endpoint.EdgeMetadatas, edgeID)
md, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID]
updated := !ok
if !ok {
md = report.MakeNodeMetadataWith(map[string]string{
Addr: localAddr,
Port: strconv.Itoa(int(localPort)),
if _, ok := rpt.Endpoint.NodeMetadatas[localEndpointNodeID]; !ok {
// First hit establishes NodeMetadata for scoped local address + port
md := report.MakeNodeMetadataWith(map[string]string{
Addr: c.LocalAddress.String(),
Port: strconv.Itoa(int(c.LocalPort)),
process.PID: fmt.Sprint(c.Proc.PID),
})
}
if proc != nil && proc.PID > 0 {
pid := strconv.FormatUint(uint64(proc.PID), 10)
updated = updated || md.Metadata[process.PID] != pid
md.Metadata[process.PID] = pid
}
if updated {
rpt.Endpoint.NodeMetadatas[localEndpointNodeID] = md
}
countTCPConnection(rpt.Endpoint.EdgeMetadatas, edgeID)
}
}

View File

@@ -103,16 +103,13 @@ func main() {
}
var (
endpointReporter = endpoint.NewReporter(hostID, hostName, *spyProcs)
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = []Reporter{
endpointReporter,
host.NewReporter(hostID, hostName, localNets),
process.NewReporter(processCache, hostID),
}
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
reporters = []Reporter{host.NewReporter(hostID, hostName, localNets), endpoint.NewReporter(hostID, hostName, *spyProcs)}
processCache *process.CachingWalker
)
defer endpointReporter.Stop()
processCache = process.NewCachingWalker(process.NewWalker(*procRoot))
reporters = append(reporters, process.NewReporter(processCache, hostID))
if *dockerEnabled {
if err := report.AddLocalBridge(*dockerBridge); err != nil {