Merge pull request #270 from tomwilkie/245-docker-port-traverse

Duplicate endpoints in the endpoint topology to account for NAT mapping.
This commit is contained in:
Tom Wilkie
2015-06-23 12:37:35 +02:00
8 changed files with 301 additions and 113 deletions

View File

@@ -23,7 +23,7 @@ $(SCOPE_EXPORT): $(APP_EXE) $(PROBE_EXE) docker/*
$(APP_EXE): app/*.go render/*.go report/*.go xfer/*.go
$(PROBE_EXE): probe/*.go probe/tag/*.go probe/docker/*.go probe/process/*.go report/*.go xfer/*.go
$(PROBE_EXE): probe/*.go probe/tag/*.go probe/docker/*.go probe/endpoint/*.go probe/process/*.go report/*.go xfer/*.go
$(APP_EXE) $(PROBE_EXE):
go get -tags netgo ./$(@D)

View File

@@ -2,7 +2,7 @@ FROM gliderlabs/alpine
MAINTAINER Weaveworks Inc <help@weave.works>
WORKDIR /home/weave
RUN echo "http://dl-4.alpinelinux.org/alpine/edge/testing" >>/etc/apk/repositories && \
apk add --update runit && \
apk add --update runit conntrack-tools && \
rm -rf /var/cache/apk/*
COPY ./app ./probe ./entrypoint.sh /home/weave/
COPY ./run-app /etc/service/app/run

170
probe/endpoint/nat.go Normal file
View File

@@ -0,0 +1,170 @@
package endpoint
import (
"bufio"
"encoding/xml"
"io"
"log"
"os"
"os/exec"
"strconv"
"strings"
"github.com/weaveworks/scope/report"
)
const (
modules = "/proc/modules"
conntrackModule = "nf_conntrack"
)
// these structs are for the parsed conntrack output
type layer3 struct {
XMLName xml.Name `xml:"layer3"`
SrcIP string `xml:"src"`
DstIP string `xml:"dst"`
}
type layer4 struct {
XMLName xml.Name `xml:"layer4"`
SrcPort int `xml:"sport"`
DstPort int `xml:"dport"`
Proto string `xml:"protoname,attr"`
}
type meta struct {
XMLName xml.Name `xml:"meta"`
Direction string `xml:"direction,attr"`
Layer3 layer3 `xml:"layer3"`
Layer4 layer4 `xml:"layer4"`
}
type flow struct {
XMLName xml.Name `xml:"flow"`
Metas []meta `xml:"meta"`
}
type conntrack struct {
XMLName xml.Name `xml:"conntrack"`
Flows []flow `xml:"flow"`
}
// This is our 'abstraction' of the endpoint that have been rewritten by NAT.
// Original is the private IP that has been rewritten.
type endpointMapping struct {
originalIP string
originalPort int
rewrittenIP string
rewrittenPort int
}
// natTable returns a list of endpoints that have been remapped by NAT.
func natTable() ([]endpointMapping, error) {
var conntrack conntrack
cmd := exec.Command("conntrack", "-L", "--any-nat", "-o", "xml")
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
if err := cmd.Start(); err != nil {
return nil, err
}
defer func() {
if err := cmd.Wait(); err != nil {
log.Printf("conntrack error: %v", err)
}
}()
if err := xml.NewDecoder(stdout).Decode(&conntrack); err != nil {
if err == io.EOF {
return []endpointMapping{}, nil
}
return nil, err
}
output := []endpointMapping{}
for _, flow := range conntrack.Flows {
// A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this
// host) and the 'reply' 4 tuple, which is what it has been rewritten to.
// This code finds those metas, which are identified by a Direction
// attribute.
original, reply := meta{}, meta{}
for _, meta := range flow.Metas {
if meta.Direction == "original" {
original = meta
} else if meta.Direction == "reply" {
reply = meta
}
}
if original.Layer4.Proto != "tcp" {
continue
}
var conn endpointMapping
if original.Layer3.SrcIP == reply.Layer3.DstIP {
conn = endpointMapping{
originalIP: reply.Layer3.SrcIP,
originalPort: reply.Layer4.SrcPort,
rewrittenIP: original.Layer3.DstIP,
rewrittenPort: original.Layer4.DstPort,
}
} else {
conn = endpointMapping{
originalIP: original.Layer3.SrcIP,
originalPort: original.Layer4.SrcPort,
rewrittenIP: reply.Layer3.DstIP,
rewrittenPort: reply.Layer4.DstPort,
}
}
output = append(output, conn)
}
return output, nil
}
// applyNAT duplicates NodeMetadatas in the endpoint topology of a
// report, based on the NAT table as returns by natTable.
func applyNAT(rpt report.Report, scope string) error {
mappings, err := natTable()
if err != nil {
return err
}
for _, mapping := range mappings {
realEndpointID := report.MakeEndpointNodeID(scope, mapping.originalIP, strconv.Itoa(mapping.originalPort))
copyEndpointID := report.MakeEndpointNodeID(scope, mapping.rewrittenIP, strconv.Itoa(mapping.rewrittenPort))
nmd, ok := rpt.Endpoint.NodeMetadatas[realEndpointID]
if !ok {
continue
}
rpt.Endpoint.NodeMetadatas[copyEndpointID] = nmd.Copy()
}
return nil
}
func conntrackModulePresent() bool {
f, err := os.Open(modules)
if err != nil {
log.Printf("conntrack error: %v", err)
return false
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, conntrackModule) {
return true
}
}
if err := scanner.Err(); err != nil {
log.Printf("conntrack error: %v", err)
}
log.Printf("conntrack: failed to find module %s", conntrackModule)
return false
}

118
probe/endpoint/reporter.go Normal file
View File

@@ -0,0 +1,118 @@
package endpoint
import (
"fmt"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/procspy"
"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
)
type reporter struct {
hostID string
hostName string
includeProcesses bool
includeNAT bool
}
// SpyDuration is an exported prometheus metric
var SpyDuration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "scope",
Subsystem: "probe",
Name: "spy_time_nanoseconds",
Help: "Total time spent spying on active connections.",
MaxAge: 10 * time.Second, // like statsd
},
[]string{},
)
// NewReporter creates a new Reporter that invokes procspy.Connections to
// generate a report.Report that contains every discovered (spied) connection
// on the host machine, at the granularity of host and port. It optionally
// enriches that topology with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool) tag.Reporter {
return &reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
includeNAT: conntrackModulePresent(),
}
}
func (rep *reporter) Report() (report.Report, error) {
defer func(begin time.Time) {
SpyDuration.WithLabelValues().Observe(float64(time.Since(begin)))
}(time.Now())
r := report.MakeReport()
conns, err := procspy.Connections(rep.includeProcesses)
if err != nil {
return r, err
}
for conn := conns.Next(); conn != nil; conn = conns.Next() {
rep.addConnection(&r, conn)
}
if rep.includeNAT {
err = applyNAT(r, rep.hostID)
}
return r, err
}
func (rep *reporter) addConnection(r *report.Report, c *procspy.Connection) {
var (
scopedLocal = report.MakeAddressNodeID(rep.hostID, c.LocalAddress.String())
scopedRemote = report.MakeAddressNodeID(rep.hostID, c.RemoteAddress.String())
key = report.MakeAdjacencyID(scopedLocal)
edgeKey = report.MakeEdgeID(scopedLocal, scopedRemote)
)
r.Address.Adjacency[key] = r.Address.Adjacency[key].Add(scopedRemote)
if _, ok := r.Address.NodeMetadatas[scopedLocal]; !ok {
r.Address.NodeMetadatas[scopedLocal] = report.NodeMetadata{
"name": rep.hostName,
"addr": c.LocalAddress.String(),
}
}
// Count the TCP connection.
edgeMeta := r.Address.EdgeMetadatas[edgeKey]
edgeMeta.WithConnCountTCP = true
edgeMeta.MaxConnCountTCP++
r.Address.EdgeMetadatas[edgeKey] = edgeMeta
if c.Proc.PID > 0 {
var (
scopedLocal = report.MakeEndpointNodeID(rep.hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort)))
scopedRemote = report.MakeEndpointNodeID(rep.hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort)))
key = report.MakeAdjacencyID(scopedLocal)
edgeKey = report.MakeEdgeID(scopedLocal, scopedRemote)
)
r.Endpoint.Adjacency[key] = r.Endpoint.Adjacency[key].Add(scopedRemote)
if _, ok := r.Endpoint.NodeMetadatas[scopedLocal]; !ok {
// First hit establishes NodeMetadata for scoped local address + port
md := report.NodeMetadata{
"addr": c.LocalAddress.String(),
"port": strconv.Itoa(int(c.LocalPort)),
"pid": fmt.Sprintf("%d", c.Proc.PID),
}
r.Endpoint.NodeMetadatas[scopedLocal] = md
}
// Count the TCP connection.
edgeMeta := r.Endpoint.EdgeMetadatas[edgeKey]
edgeMeta.WithConnCountTCP = true
edgeMeta.MaxConnCountTCP++
r.Endpoint.EdgeMetadatas[edgeKey] = edgeMeta
}
}

View File

@@ -1,4 +1,4 @@
package main
package endpoint_test
import (
"net"
@@ -6,6 +6,7 @@ import (
"testing"
"github.com/weaveworks/procspy"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/report"
)
@@ -70,7 +71,8 @@ func TestSpyNoProcesses(t *testing.T) {
nodeName = "frenchs-since-1904" // TODO rename to hostNmae
)
r := spy(nodeID, nodeName, false)
reporter := endpoint.NewReporter(nodeID, nodeName, false)
r, _ := reporter.Report()
//buf, _ := json.MarshalIndent(r, "", " ")
//t.Logf("\n%s\n", buf)
@@ -106,7 +108,8 @@ func TestSpyWithProcesses(t *testing.T) {
nodeName = "fishermans-friend" // TODO rename to hostNmae
)
r := spy(nodeID, nodeName, true)
reporter := endpoint.NewReporter(nodeID, nodeName, false)
r, _ := reporter.Report()
// buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf)
var (

View File

@@ -2,9 +2,10 @@ package main
import (
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/scope/probe/endpoint"
)
var (
@@ -17,20 +18,10 @@ var (
},
[]string{},
)
spyDuration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "scope",
Subsystem: "probe",
Name: "spy_time_nanoseconds",
Help: "Total time spent spying on active connections.",
MaxAge: 10 * time.Second, // like statsd
},
[]string{},
)
)
func makePrometheusHandler() http.Handler {
prometheus.MustRegister(publishTicks)
prometheus.MustRegister(spyDuration)
prometheus.MustRegister(endpoint.SpyDuration)
return prometheus.Handler()
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/weaveworks/procspy"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/probe/tag"
"github.com/weaveworks/scope/report"
@@ -80,7 +81,7 @@ func main() {
)
taggers := []tag.Tagger{tag.NewTopologyTagger(), tag.NewOriginHostTagger(hostID)}
reporters := []tag.Reporter{}
reporters := []tag.Reporter{endpoint.NewReporter(hostID, hostName, *spyProcs)}
if *dockerEnabled && runtime.GOOS == linux {
if err = report.AddLocalBridge(*dockerBridge); err != nil {
@@ -130,8 +131,6 @@ func main() {
r = report.MakeReport()
case <-spyTick:
r.Merge(spy(hostID, hostName, *spyProcs))
// Do this every tick so it gets tagged by the OriginHostTagger
r.Host = hostTopology(hostID, hostName)

View File

@@ -1,93 +0,0 @@
package main
import (
"fmt"
"log"
"strconv"
"time"
"github.com/weaveworks/procspy"
"github.com/weaveworks/scope/report"
)
// spy invokes procspy.Connections to generate a report.Report that contains
// every discovered (spied) connection on the host machine, at the granularity
// of host and port. It optionally enriches that topology with process (PID)
// information.
func spy(
hostID, hostName string,
includeProcesses bool,
) report.Report {
defer func(begin time.Time) {
spyDuration.WithLabelValues().Observe(float64(time.Since(begin)))
}(time.Now())
r := report.MakeReport()
conns, err := procspy.Connections(includeProcesses)
if err != nil {
log.Printf("spy connections: %v", err)
return r
}
for conn := conns.Next(); conn != nil; conn = conns.Next() {
addConnection(&r, conn, hostID, hostName)
}
return r
}
func addConnection(
r *report.Report,
c *procspy.Connection,
hostID, hostName string,
) {
var (
scopedLocal = report.MakeAddressNodeID(hostID, c.LocalAddress.String())
scopedRemote = report.MakeAddressNodeID(hostID, c.RemoteAddress.String())
key = report.MakeAdjacencyID(scopedLocal)
edgeKey = report.MakeEdgeID(scopedLocal, scopedRemote)
)
r.Address.Adjacency[key] = r.Address.Adjacency[key].Add(scopedRemote)
if _, ok := r.Address.NodeMetadatas[scopedLocal]; !ok {
r.Address.NodeMetadatas[scopedLocal] = report.NodeMetadata{
"name": hostName,
"addr": c.LocalAddress.String(),
}
}
// Count the TCP connection.
edgeMeta := r.Address.EdgeMetadatas[edgeKey]
edgeMeta.WithConnCountTCP = true
edgeMeta.MaxConnCountTCP++
r.Address.EdgeMetadatas[edgeKey] = edgeMeta
if c.Proc.PID > 0 {
var (
scopedLocal = report.MakeEndpointNodeID(hostID, c.LocalAddress.String(), strconv.Itoa(int(c.LocalPort)))
scopedRemote = report.MakeEndpointNodeID(hostID, c.RemoteAddress.String(), strconv.Itoa(int(c.RemotePort)))
key = report.MakeAdjacencyID(scopedLocal)
edgeKey = report.MakeEdgeID(scopedLocal, scopedRemote)
)
r.Endpoint.Adjacency[key] = r.Endpoint.Adjacency[key].Add(scopedRemote)
if _, ok := r.Endpoint.NodeMetadatas[scopedLocal]; !ok {
// First hit establishes NodeMetadata for scoped local address + port
md := report.NodeMetadata{
"addr": c.LocalAddress.String(),
"port": strconv.Itoa(int(c.LocalPort)),
"pid": fmt.Sprintf("%d", c.Proc.PID),
}
r.Endpoint.NodeMetadatas[scopedLocal] = md
}
// Count the TCP connection.
edgeMeta := r.Endpoint.EdgeMetadatas[edgeKey]
edgeMeta.WithConnCountTCP = true
edgeMeta.MaxConnCountTCP++
r.Endpoint.EdgeMetadatas[edgeKey] = edgeMeta
}
}