diff --git a/Makefile b/Makefile index eb131de45..e259baa69 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ $(SCOPE_EXPORT): $(APP_EXE) $(PROBE_EXE) docker/* $(APP_EXE): app/*.go render/*.go report/*.go xfer/*.go -$(PROBE_EXE): probe/*.go probe/tag/*.go probe/docker/*.go probe/process/*.go report/*.go xfer/*.go +$(PROBE_EXE): probe/*.go probe/tag/*.go probe/docker/*.go probe/endpoint/*.go probe/process/*.go report/*.go xfer/*.go $(APP_EXE) $(PROBE_EXE): go get -tags netgo ./$(@D) diff --git a/docker/Dockerfile b/docker/Dockerfile index c2ff5b4bc..079ead9f0 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -2,7 +2,7 @@ FROM gliderlabs/alpine MAINTAINER Weaveworks Inc WORKDIR /home/weave RUN echo "http://dl-4.alpinelinux.org/alpine/edge/testing" >>/etc/apk/repositories && \ - apk add --update runit && \ + apk add --update runit conntrack-tools && \ rm -rf /var/cache/apk/* COPY ./app ./probe ./entrypoint.sh /home/weave/ COPY ./run-app /etc/service/app/run diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go new file mode 100644 index 000000000..995c492f3 --- /dev/null +++ b/probe/endpoint/nat.go @@ -0,0 +1,170 @@ +package endpoint + +import ( + "bufio" + "encoding/xml" + "io" + "log" + "os" + "os/exec" + "strconv" + "strings" + + "github.com/weaveworks/scope/report" +) + +const ( + modules = "/proc/modules" + conntrackModule = "nf_conntrack" +) + +// these structs are for the parsed conntrack output +type layer3 struct { + XMLName xml.Name `xml:"layer3"` + SrcIP string `xml:"src"` + DstIP string `xml:"dst"` +} + +type layer4 struct { + XMLName xml.Name `xml:"layer4"` + SrcPort int `xml:"sport"` + DstPort int `xml:"dport"` + Proto string `xml:"protoname,attr"` +} + +type meta struct { + XMLName xml.Name `xml:"meta"` + Direction string `xml:"direction,attr"` + Layer3 layer3 `xml:"layer3"` + Layer4 layer4 `xml:"layer4"` +} + +type flow struct { + XMLName xml.Name `xml:"flow"` + Metas []meta `xml:"meta"` +} + +type conntrack struct { + XMLName xml.Name `xml:"conntrack"` + Flows []flow `xml:"flow"` +} + +// This is our 'abstraction' of the endpoint that have been rewritten by NAT. +// Original is the private IP that has been rewritten. +type endpointMapping struct { + originalIP string + originalPort int + + rewrittenIP string + rewrittenPort int +} + +// natTable returns a list of endpoints that have been remapped by NAT. +func natTable() ([]endpointMapping, error) { + var conntrack conntrack + cmd := exec.Command("conntrack", "-L", "--any-nat", "-o", "xml") + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + if err := cmd.Start(); err != nil { + return nil, err + } + defer func() { + if err := cmd.Wait(); err != nil { + log.Printf("conntrack error: %v", err) + } + }() + if err := xml.NewDecoder(stdout).Decode(&conntrack); err != nil { + if err == io.EOF { + return []endpointMapping{}, nil + } + return nil, err + } + + output := []endpointMapping{} + for _, flow := range conntrack.Flows { + // A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this + // host) and the 'reply' 4 tuple, which is what it has been rewritten to. + // This code finds those metas, which are identified by a Direction + // attribute. + original, reply := meta{}, meta{} + for _, meta := range flow.Metas { + if meta.Direction == "original" { + original = meta + } else if meta.Direction == "reply" { + reply = meta + } + } + + if original.Layer4.Proto != "tcp" { + continue + } + + var conn endpointMapping + if original.Layer3.SrcIP == reply.Layer3.DstIP { + conn = endpointMapping{ + originalIP: reply.Layer3.SrcIP, + originalPort: reply.Layer4.SrcPort, + rewrittenIP: original.Layer3.DstIP, + rewrittenPort: original.Layer4.DstPort, + } + } else { + conn = endpointMapping{ + originalIP: original.Layer3.SrcIP, + originalPort: original.Layer4.SrcPort, + rewrittenIP: reply.Layer3.DstIP, + rewrittenPort: reply.Layer4.DstPort, + } + } + + output = append(output, conn) + } + + return output, nil +} + +// applyNAT duplicates NodeMetadatas in the endpoint topology of a +// report, based on the NAT table as returns by natTable. +func applyNAT(rpt report.Report, scope string) error { + mappings, err := natTable() + if err != nil { + return err + } + + for _, mapping := range mappings { + realEndpointID := report.MakeEndpointNodeID(scope, mapping.originalIP, strconv.Itoa(mapping.originalPort)) + copyEndpointID := report.MakeEndpointNodeID(scope, mapping.rewrittenIP, strconv.Itoa(mapping.rewrittenPort)) + nmd, ok := rpt.Endpoint.NodeMetadatas[realEndpointID] + if !ok { + continue + } + + rpt.Endpoint.NodeMetadatas[copyEndpointID] = nmd.Copy() + } + + return nil +} + +func conntrackModulePresent() bool { + f, err := os.Open(modules) + if err != nil { + log.Printf("conntrack error: %v", err) + return false + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, conntrackModule) { + return true + } + } + if err := scanner.Err(); err != nil { + log.Printf("conntrack error: %v", err) + } + + log.Printf("conntrack: failed to find module %s", conntrackModule) + return false +} diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go new file mode 100644 index 000000000..040e00c4b --- /dev/null +++ b/probe/endpoint/reporter.go @@ -0,0 +1,118 @@ +package endpoint + +import ( + "fmt" + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/weaveworks/procspy" + "github.com/weaveworks/scope/probe/tag" + "github.com/weaveworks/scope/report" +) + +type reporter struct { + hostID string + hostName string + includeProcesses bool + includeNAT bool +} + +// SpyDuration is an exported prometheus metric +var SpyDuration = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "scope", + Subsystem: "probe", + Name: "spy_time_nanoseconds", + Help: "Total time spent spying on active connections.", + MaxAge: 10 * time.Second, // like statsd + }, + []string{}, +) + +// NewReporter creates a new Reporter that invokes procspy.Connections to +// generate a report.Report that contains every discovered (spied) connection +// on the host machine, at the granularity of host and port. It optionally +// enriches that topology with process (PID) information. +func NewReporter(hostID, hostName string, includeProcesses bool) tag.Reporter { + return &reporter{ + hostID: hostID, + hostName: hostName, + includeProcesses: includeProcesses, + includeNAT: conntrackModulePresent(), + } +} + +func (rep *reporter) Report() (report.Report, error) { + defer func(begin time.Time) { + SpyDuration.WithLabelValues().Observe(float64(time.Since(begin))) + }(time.Now()) + + r := report.MakeReport() + conns, err := procspy.Connections(rep.includeProcesses) + if err != nil { + return r, err + } + + for conn := conns.Next(); conn != nil; conn = conns.Next() { + rep.addConnection(&r, conn) + } + + if rep.includeNAT { + err = applyNAT(r, rep.hostID) + } + + return r, err +} + +func (rep *reporter) addConnection(r *report.Report, c *procspy.Connection) { + var ( + scopedLocal = report.MakeAddressNodeID(rep.hostID, c.LocalAddress.String()) + scopedRemote = report.MakeAddressNodeID(rep.hostID, c.RemoteAddress.String()) + key = report.MakeAdjacencyID(scopedLocal) + edgeKey = report.MakeEdgeID(scopedLocal, scopedRemote) + ) + + r.Address.Adjacency[key] = r.Address.Adjacency[key].Add(scopedRemote) + + if _, ok := r.Address.NodeMetadatas[scopedLocal]; !ok { + r.Address.NodeMetadatas[scopedLocal] = report.NodeMetadata{ + "name": rep.hostName, + "addr": c.LocalAddress.String(), + } + } + + // Count the TCP connection. + edgeMeta := r.Address.EdgeMetadatas[edgeKey] + edgeMeta.WithConnCountTCP = true + edgeMeta.MaxConnCountTCP++ + r.Address.EdgeMetadatas[edgeKey] = edgeMeta + + if c.Proc.PID > 0 { + var ( + scopedLocal = report.MakeEndpointNodeID(rep.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort))) + scopedRemote = report.MakeEndpointNodeID(rep.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort))) + key = report.MakeAdjacencyID(scopedLocal) + edgeKey = report.MakeEdgeID(scopedLocal, scopedRemote) + ) + + r.Endpoint.Adjacency[key] = r.Endpoint.Adjacency[key].Add(scopedRemote) + + if _, ok := r.Endpoint.NodeMetadatas[scopedLocal]; !ok { + // First hit establishes NodeMetadata for scoped local address + port + md := report.NodeMetadata{ + "addr": c.LocalAddress.String(), + "port": strconv.Itoa(int(c.LocalPort)), + "pid": fmt.Sprintf("%d", c.Proc.PID), + } + + r.Endpoint.NodeMetadatas[scopedLocal] = md + } + // Count the TCP connection. + edgeMeta := r.Endpoint.EdgeMetadatas[edgeKey] + edgeMeta.WithConnCountTCP = true + edgeMeta.MaxConnCountTCP++ + r.Endpoint.EdgeMetadatas[edgeKey] = edgeMeta + } +} diff --git a/probe/spy_test.go b/probe/endpoint/reporter_test.go similarity index 93% rename from probe/spy_test.go rename to probe/endpoint/reporter_test.go index 3ef351381..88fcb9f3b 100644 --- a/probe/spy_test.go +++ b/probe/endpoint/reporter_test.go @@ -1,4 +1,4 @@ -package main +package endpoint_test import ( "net" @@ -6,6 +6,7 @@ import ( "testing" "github.com/weaveworks/procspy" + "github.com/weaveworks/scope/probe/endpoint" "github.com/weaveworks/scope/report" ) @@ -70,7 +71,8 @@ func TestSpyNoProcesses(t *testing.T) { nodeName = "frenchs-since-1904" // TODO rename to hostNmae ) - r := spy(nodeID, nodeName, false) + reporter := endpoint.NewReporter(nodeID, nodeName, false) + r, _ := reporter.Report() //buf, _ := json.MarshalIndent(r, "", " ") //t.Logf("\n%s\n", buf) @@ -106,7 +108,8 @@ func TestSpyWithProcesses(t *testing.T) { nodeName = "fishermans-friend" // TODO rename to hostNmae ) - r := spy(nodeID, nodeName, true) + reporter := endpoint.NewReporter(nodeID, nodeName, false) + r, _ := reporter.Report() // buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf) var ( diff --git a/probe/instrumentation.go b/probe/instrumentation.go index 56e65545d..266f2a65f 100644 --- a/probe/instrumentation.go +++ b/probe/instrumentation.go @@ -2,9 +2,10 @@ package main import ( "net/http" - "time" "github.com/prometheus/client_golang/prometheus" + + "github.com/weaveworks/scope/probe/endpoint" ) var ( @@ -17,20 +18,10 @@ var ( }, []string{}, ) - spyDuration = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: "scope", - Subsystem: "probe", - Name: "spy_time_nanoseconds", - Help: "Total time spent spying on active connections.", - MaxAge: 10 * time.Second, // like statsd - }, - []string{}, - ) ) func makePrometheusHandler() http.Handler { prometheus.MustRegister(publishTicks) - prometheus.MustRegister(spyDuration) + prometheus.MustRegister(endpoint.SpyDuration) return prometheus.Handler() } diff --git a/probe/main.go b/probe/main.go index 8f1abef77..a73d503aa 100644 --- a/probe/main.go +++ b/probe/main.go @@ -15,6 +15,7 @@ import ( "github.com/weaveworks/procspy" "github.com/weaveworks/scope/probe/docker" + "github.com/weaveworks/scope/probe/endpoint" "github.com/weaveworks/scope/probe/process" "github.com/weaveworks/scope/probe/tag" "github.com/weaveworks/scope/report" @@ -80,7 +81,7 @@ func main() { ) taggers := []tag.Tagger{tag.NewTopologyTagger(), tag.NewOriginHostTagger(hostID)} - reporters := []tag.Reporter{} + reporters := []tag.Reporter{endpoint.NewReporter(hostID, hostName, *spyProcs)} if *dockerEnabled && runtime.GOOS == linux { if err = report.AddLocalBridge(*dockerBridge); err != nil { @@ -130,8 +131,6 @@ func main() { r = report.MakeReport() case <-spyTick: - r.Merge(spy(hostID, hostName, *spyProcs)) - // Do this every tick so it gets tagged by the OriginHostTagger r.Host = hostTopology(hostID, hostName) diff --git a/probe/spy.go b/probe/spy.go deleted file mode 100644 index aa9e1025c..000000000 --- a/probe/spy.go +++ /dev/null @@ -1,93 +0,0 @@ -package main - -import ( - "fmt" - "log" - "strconv" - "time" - - "github.com/weaveworks/procspy" - "github.com/weaveworks/scope/report" -) - -// spy invokes procspy.Connections to generate a report.Report that contains -// every discovered (spied) connection on the host machine, at the granularity -// of host and port. It optionally enriches that topology with process (PID) -// information. -func spy( - hostID, hostName string, - includeProcesses bool, -) report.Report { - defer func(begin time.Time) { - spyDuration.WithLabelValues().Observe(float64(time.Since(begin))) - }(time.Now()) - - r := report.MakeReport() - - conns, err := procspy.Connections(includeProcesses) - if err != nil { - log.Printf("spy connections: %v", err) - return r - } - - for conn := conns.Next(); conn != nil; conn = conns.Next() { - addConnection(&r, conn, hostID, hostName) - } - - return r -} - -func addConnection( - r *report.Report, - c *procspy.Connection, - hostID, hostName string, -) { - var ( - scopedLocal = report.MakeAddressNodeID(hostID, c.LocalAddress.String()) - scopedRemote = report.MakeAddressNodeID(hostID, c.RemoteAddress.String()) - key = report.MakeAdjacencyID(scopedLocal) - edgeKey = report.MakeEdgeID(scopedLocal, scopedRemote) - ) - - r.Address.Adjacency[key] = r.Address.Adjacency[key].Add(scopedRemote) - - if _, ok := r.Address.NodeMetadatas[scopedLocal]; !ok { - r.Address.NodeMetadatas[scopedLocal] = report.NodeMetadata{ - "name": hostName, - "addr": c.LocalAddress.String(), - } - } - - // Count the TCP connection. - edgeMeta := r.Address.EdgeMetadatas[edgeKey] - edgeMeta.WithConnCountTCP = true - edgeMeta.MaxConnCountTCP++ - r.Address.EdgeMetadatas[edgeKey] = edgeMeta - - if c.Proc.PID > 0 { - var ( - scopedLocal = report.MakeEndpointNodeID(hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort))) - scopedRemote = report.MakeEndpointNodeID(hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort))) - key = report.MakeAdjacencyID(scopedLocal) - edgeKey = report.MakeEdgeID(scopedLocal, scopedRemote) - ) - - r.Endpoint.Adjacency[key] = r.Endpoint.Adjacency[key].Add(scopedRemote) - - if _, ok := r.Endpoint.NodeMetadatas[scopedLocal]; !ok { - // First hit establishes NodeMetadata for scoped local address + port - md := report.NodeMetadata{ - "addr": c.LocalAddress.String(), - "port": strconv.Itoa(int(c.LocalPort)), - "pid": fmt.Sprintf("%d", c.Proc.PID), - } - - r.Endpoint.NodeMetadatas[scopedLocal] = md - } - // Count the TCP connection. - edgeMeta := r.Endpoint.EdgeMetadatas[edgeKey] - edgeMeta.WithConnCountTCP = true - edgeMeta.MaxConnCountTCP++ - r.Endpoint.EdgeMetadatas[edgeKey] = edgeMeta - } -}