From ac63937df7202ac06f4530911974e37cb3367fd8 Mon Sep 17 00:00:00 2001 From: Joseph Glanville Date: Sun, 15 Jan 2017 08:49:40 +1100 Subject: [PATCH] Switch to new conntrack library --- probe/endpoint/connection_tracker.go | 17 +- probe/endpoint/conntrack.go | 334 +++------------------- probe/endpoint/conntrack_internal_test.go | 253 ---------------- probe/endpoint/nat.go | 24 +- probe/endpoint/nat_internal_test.go | 80 +++--- 5 files changed, 97 insertions(+), 611 deletions(-) delete mode 100644 probe/endpoint/conntrack_internal_test.go diff --git a/probe/endpoint/connection_tracker.go b/probe/endpoint/connection_tracker.go index 6f1facf24..db0e885c9 100644 --- a/probe/endpoint/connection_tracker.go +++ b/probe/endpoint/connection_tracker.go @@ -5,6 +5,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "github.com/weaveworks/scope/probe/endpoint/conntrack" "github.com/weaveworks/scope/probe/endpoint/procspy" "github.com/weaveworks/scope/probe/process" "github.com/weaveworks/scope/report" @@ -53,18 +54,18 @@ func newConnectionTracker(conf connectionTrackerConfig) connectionTracker { return ct } -func flowToTuple(f flow) (ft fourTuple) { +func flowToTuple(f conntrack.Flow) (ft fourTuple) { ft = fourTuple{ - f.Original.Layer3.SrcIP, - f.Original.Layer3.DstIP, + f.Original.Layer3.SrcIP.String(), + f.Original.Layer3.DstIP.String(), uint16(f.Original.Layer4.SrcPort), uint16(f.Original.Layer4.DstPort), } // Handle DNAT-ed connections in the initial state - if f.Original.Layer3.DstIP != f.Reply.Layer3.SrcIP { + if !f.Original.Layer3.DstIP.Equal(f.Reply.Layer3.SrcIP) { ft = fourTuple{ - f.Reply.Layer3.DstIP, - f.Reply.Layer3.SrcIP, + f.Reply.Layer3.DstIP.String(), + f.Reply.Layer3.SrcIP.String(), uint16(f.Reply.Layer4.DstPort), uint16(f.Reply.Layer4.SrcPort), } @@ -118,7 +119,7 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) { // consult the flowWalker for short-lived (conntracked) connections seenTuples := map[string]fourTuple{} - t.flowWalker.walkFlows(func(f flow, alive bool) { + t.flowWalker.walkFlows(func(f conntrack.Flow, alive bool) { tuple := flowToTuple(f) seenTuples[tuple.key()] = tuple t.addConnection(rpt, false, tuple, "", nil, nil) @@ -135,7 +136,7 @@ func (t *connectionTracker) existingFlows() map[string]fourTuple { // log.Warnf("Not using conntrack: disabled") } else if err := IsConntrackSupported(t.conf.ProcRoot); err != nil { log.Warnf("Not using conntrack: not supported by the kernel: %s", err) - } else if existingFlows, err := existingConnections([]string{"--any-nat"}); err != nil { + } else if existingFlows, err := conntrack.Established(t.conf.BufferSize); err != nil { // TODO: worry about --any-nat log.Errorf("conntrack existingConnections error: %v", err) } else { for _, f := range existingFlows { diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 95f6f5531..0363ea186 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -2,85 +2,41 @@ package endpoint import ( "bufio" - "bytes" "fmt" "io" "io/ioutil" "path/filepath" - "strconv" - "strings" "sync" "time" - "unicode" log "github.com/sirupsen/logrus" - "github.com/weaveworks/common/exec" + "github.com/weaveworks/scope/probe/endpoint/conntrack" ) const ( // From https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt eventsPath = "sys/net/netfilter/nf_conntrack_events" - - timeWait = "TIME_WAIT" - tcpProto = "tcp" - newType = "[NEW]" - updateType = "[UPDATE]" - destroyType = "[DESTROY]" ) -var ( - destroyTypeB = []byte(destroyType) - assured = []byte("[ASSURED] ") - unreplied = []byte("[UNREPLIED] ") -) - -type layer3 struct { - SrcIP string - DstIP string -} - -type layer4 struct { - SrcPort int - DstPort int - Proto string -} - -type meta struct { - Layer3 layer3 - Layer4 layer4 - ID int64 - State string -} - -type flow struct { - Type string - Original, Reply, Independent meta -} - -type conntrack struct { - Flows []flow -} - // flowWalker is something that maintains flows, and provides an accessor // method to walk them. type flowWalker interface { - walkFlows(f func(f flow, active bool)) + walkFlows(f func(conntrack.Flow, bool)) stop() } type nilFlowWalker struct{} -func (n nilFlowWalker) stop() {} -func (n nilFlowWalker) walkFlows(f func(flow, bool)) {} +func (n nilFlowWalker) stop() {} +func (n nilFlowWalker) walkFlows(f func(conntrack.Flow, bool)) {} // conntrackWalker uses the conntrack command to track network connections and // implement flowWalker. type conntrackWalker struct { sync.Mutex - cmd exec.Cmd - activeFlows map[int64]flow // active flows in state != TIME_WAIT - bufferedFlows []flow // flows coming out of activeFlows spend 1 walk cycle here + activeFlows map[uint32]conntrack.Flow // active flows in state != TIME_WAIT + bufferedFlows []conntrack.Flow // flows coming out of activeFlows spend 1 walk cycle here bufferSize int args []string quit chan struct{} @@ -95,7 +51,7 @@ func newConntrackFlowWalker(useConntrack bool, procRoot string, bufferSize int, return nilFlowWalker{} } result := &conntrackWalker{ - activeFlows: map[int64]flow{}, + activeFlows: map[uint32]conntrack.Flow{}, bufferSize: bufferSize, args: args, quit: make(chan struct{}), @@ -144,7 +100,7 @@ func (c *conntrackWalker) clearFlows() { c.bufferedFlows = append(c.bufferedFlows, f) } - c.activeFlows = map[int64]flow{} + c.activeFlows = map[uint32]conntrack.Flow{} } func logPipe(prefix string, reader io.Reader) { @@ -158,9 +114,7 @@ func logPipe(prefix string, reader io.Reader) { } func (c *conntrackWalker) run() { - // Fork another conntrack, just to capture existing connections - // for which we don't get events - existingFlows, err := existingConnections(c.args) + existingFlows, err := c.existingConnections() if err != nil { log.Errorf("conntrack existingConnections error: %v", err) return @@ -169,35 +123,12 @@ func (c *conntrackWalker) run() { c.handleFlow(flow, true) } - args := append([]string{ - "--buffer-size", strconv.Itoa(c.bufferSize), "-E", - "-o", "id", "-p", "tcp"}, c.args..., - ) - cmd := exec.Command("conntrack", args...) - stdout, err := cmd.StdoutPipe() + events, stop, err := conntrack.Follow(c.bufferSize) if err != nil { - log.Errorf("conntrack error: %v", err) + log.Errorf("conntract Follow error: %v", err) return } - stderr, err := cmd.StderrPipe() - if err != nil { - log.Errorf("conntrack error: %v", err) - return - } - go logPipe("conntrack stderr:", stderr) - - if err := cmd.Start(); err != nil { - log.Errorf("conntrack error: %v", err) - return - } - - defer func() { - if err := cmd.Wait(); err != nil { - log.Errorf("conntrack error: %v", err) - } - }() - c.Lock() // We may have stopped in the mean time, // so check to see if the channel is open @@ -207,255 +138,56 @@ func (c *conntrackWalker) run() { case <-c.quit: return } - c.cmd = cmd c.Unlock() - scanner := bufio.NewScanner(bufio.NewReader(stdout)) defer log.Infof("conntrack exiting") - // Loop on the output stream + // Handle conntrack events from netlink socket for { - f, err := decodeStreamedFlow(scanner) - if err != nil { - log.Errorf("conntrack error: %v", err) + select { + case <-c.quit: + stop() return - } - c.handleFlow(f, false) - } -} - -// Get a line without [ASSURED]/[UNREPLIED] tags (it simplifies parsing) -func getUntaggedLine(scanner *bufio.Scanner) ([]byte, error) { - success := scanner.Scan() - if !success { - if err := scanner.Err(); err != nil { - return nil, err - } - return nil, io.EOF - } - line := scanner.Bytes() - // Remove [ASSURED]/[UNREPLIED] tags - line = removeInplace(line, assured) - line = removeInplace(line, unreplied) - return line, nil -} - -func removeInplace(s, sep []byte) []byte { - // TODO: See if we can get better performance - // removing multiple substrings at once (with index/suffixarray New()+Lookup()) - // Probably not worth it for only two substrings occurring once. - index := bytes.Index(s, sep) - if index < 0 { - return s - } - copy(s[index:], s[index+len(sep):]) - return s[:len(s)-len(sep)] -} - -// decodeFlowKeyValues parses the key-values from a conntrack line and updates the flow -// It only considers the following key-values: -// src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776 -// Keys can be present twice, so the order is important. -// Conntrack could add other key-values such as secctx=, packets=, bytes=. Those are ignored. -func decodeFlowKeyValues(line []byte, f *flow) error { - var err error - for _, field := range strings.FieldsFunc(string(line), func(c rune) bool { return unicode.IsSpace(c) }) { - kv := strings.SplitN(field, "=", 2) - if len(kv) != 2 { - continue - } - key := kv[0] - value := kv[1] - firstTupleSet := f.Original.Layer4.DstPort != 0 - switch { - case key == "src": - if !firstTupleSet { - f.Original.Layer3.SrcIP = value - } else { - f.Reply.Layer3.SrcIP = value + case f, ok := <-events: + if !ok { + return } - - case key == "dst": - if !firstTupleSet { - f.Original.Layer3.DstIP = value - } else { - f.Reply.Layer3.DstIP = value - } - - case key == "sport": - if !firstTupleSet { - f.Original.Layer4.SrcPort, err = strconv.Atoi(value) - } else { - f.Reply.Layer4.SrcPort, err = strconv.Atoi(value) - } - - case key == "dport": - if !firstTupleSet { - f.Original.Layer4.DstPort, err = strconv.Atoi(value) - } else { - f.Reply.Layer4.DstPort, err = strconv.Atoi(value) - } - - case key == "id": - f.Independent.ID, err = strconv.ParseInt(value, 10, 64) + c.handleFlow(f, false) } } - - return err } -func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) { - var ( - // Use ints for parsing unused fields since their allocations - // are almost for free - unused [2]int - f flow - ) - - // Examples: - // " [UPDATE] udp 17 29 src=192.168.2.100 dst=192.168.2.1 sport=57767 dport=53 src=192.168.2.1 dst=192.168.2.100 sport=53 dport=57767" - // " [NEW] tcp 6 120 SYN_SENT src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776" - // " [UPDATE] tcp 6 120 TIME_WAIT src=10.0.2.15 dst=10.0.2.15 sport=51154 dport=4040 src=10.0.2.15 dst=10.0.2.15 sport=4040 dport=51154 [ASSURED] id=3663628160" - // " [DESTROY] tcp 6 src=172.17.0.1 dst=172.17.0.1 sport=34078 dport=53 src=172.17.0.1 dst=10.0.2.15 sport=53 dport=34078 id=3668417984" (note how the timeout and state field is missing) - - // Remove tags since they are optional and make parsing harder - line, err := getUntaggedLine(scanner) +func (c *conntrackWalker) existingConnections() ([]conntrack.Flow, error) { + flows, err := conntrack.Established(c.bufferSize) if err != nil { - return flow{}, err + return []conntrack.Flow{}, err } - - line = bytes.TrimLeft(line, " ") - if bytes.HasPrefix(line, destroyTypeB) { - // Destroy events don't have a timeout or state field - _, err = fmt.Sscanf(string(line), "%s %s %d", - &f.Type, - &f.Original.Layer4.Proto, - &unused[0], - ) - } else { - _, err = fmt.Sscanf(string(line), "%s %s %d %d %s", - &f.Type, - &f.Original.Layer4.Proto, - &unused[0], - &unused[1], - &f.Independent.State, - ) - } - if err != nil { - return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err) - } - - err = decodeFlowKeyValues(line, &f) - if err != nil { - return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err) - } - - f.Reply.Layer4.Proto = f.Original.Layer4.Proto - return f, nil -} - -func existingConnections(conntrackWalkerArgs []string) ([]flow, error) { - args := append([]string{"-L", "-o", "id", "-p", "tcp"}, conntrackWalkerArgs...) - cmd := exec.Command("conntrack", args...) - stdout, err := cmd.StdoutPipe() - if err != nil { - return []flow{}, err - } - if err := cmd.Start(); err != nil { - return []flow{}, err - } - defer func() { - if err := cmd.Wait(); err != nil { - log.Errorf("conntrack existingConnections exit error: %v", err) - } - }() - - scanner := bufio.NewScanner(bufio.NewReader(stdout)) - var result []flow - for { - f, err := decodeDumpedFlow(scanner) - if err != nil { - if err == io.EOF { - break - } - log.Errorf("conntrack error: %v", err) - return result, err - } - result = append(result, f) - } - return result, nil -} - -func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) { - var ( - // Use ints for parsing unused fields since allocations - // are almost for free - unused [4]int - f flow - ) - - // Examples with different formats: - // With SELinux, there is a "secctx=" - // After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes=" - // - // "tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088" - // "tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208" - // "tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880" - // "tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840" - - // remove tags since they are optional and make parsing harder - line, err := getUntaggedLine(scanner) - if err != nil { - return flow{}, err - } - - _, err = fmt.Sscanf(string(line), "%s %d %d %s", &f.Original.Layer4.Proto, &unused[0], &unused[1], &f.Independent.State) - if err != nil { - return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err) - } - - err = decodeFlowKeyValues(line, &f) - if err != nil { - return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err) - } - - f.Reply.Layer4.Proto = f.Original.Layer4.Proto - return f, nil + return flows, nil } func (c *conntrackWalker) stop() { c.Lock() defer c.Unlock() close(c.quit) - if c.cmd != nil { - c.cmd.Kill() - } } -func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) { +func (c *conntrackWalker) handleFlow(f conntrack.Flow, forceAdd bool) { c.Lock() defer c.Unlock() - // For not, I'm only interested in tcp connections - there is too much udp - // traffic going on (every container talking to weave dns, for example) to - // render nicely. TODO: revisit this. - if f.Original.Layer4.Proto != tcpProto { - return - } - // Ignore flows for which we never saw an update; they are likely // incomplete or wrong. See #1462. switch { - case forceAdd || f.Type == updateType: - if f.Independent.State != timeWait { - c.activeFlows[f.Independent.ID] = f - } else if _, ok := c.activeFlows[f.Independent.ID]; ok { - delete(c.activeFlows, f.Independent.ID) + case forceAdd || f.MsgType == conntrack.NfctMsgUpdate: + if f.State != conntrack.TCPStateTimeWait { + c.activeFlows[f.ID] = f + } else if _, ok := c.activeFlows[f.ID]; ok { + delete(c.activeFlows, f.ID) c.bufferedFlows = append(c.bufferedFlows, f) } - case f.Type == destroyType: - if active, ok := c.activeFlows[f.Independent.ID]; ok { - delete(c.activeFlows, f.Independent.ID) + case f.MsgType == conntrack.NfctMsgDestroy: + if active, ok := c.activeFlows[f.ID]; ok { + delete(c.activeFlows, f.ID) c.bufferedFlows = append(c.bufferedFlows, active) } } @@ -463,7 +195,7 @@ func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) { // walkFlows calls f with all active flows and flows that have come and gone // since the last call to walkFlows -func (c *conntrackWalker) walkFlows(f func(flow, bool)) { +func (c *conntrackWalker) walkFlows(f func(conntrack.Flow, bool)) { c.Lock() defer c.Unlock() for _, flow := range c.activeFlows { diff --git a/probe/endpoint/conntrack_internal_test.go b/probe/endpoint/conntrack_internal_test.go deleted file mode 100644 index a2aa25397..000000000 --- a/probe/endpoint/conntrack_internal_test.go +++ /dev/null @@ -1,253 +0,0 @@ -package endpoint - -import ( - "bufio" - "io" - "strings" - "testing" - "time" - - "github.com/weaveworks/scope/test" -) - -// Obtained though conntrack -E -p tcp -o id and then tweaked -const streamedFlowsSource = `[DESTROY] tcp 6 src=10.0.0.1 dst=127.0.0.1 sport=36826 dport=28106 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36826 [ASSURED] id=347275904 - [NEW] tcp 6 120 SYN_SENT src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 [UNREPLIED] src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 id=347275904 - [UPDATE] tcp 6 60 SYN_RECV src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 id=347275904 - [UPDATE] tcp 6 432000 ESTABLISHED src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 [ASSURED] id=347275904` - -var wantStreamedFlows = []flow{ - { - Type: destroyType, - Original: meta{ - Layer3: layer3{ - SrcIP: "10.0.0.1", - DstIP: "127.0.0.1", - }, - Layer4: layer4{ - SrcPort: 36826, - DstPort: 28106, - Proto: "tcp", - }, - }, - Reply: meta{ - Layer3: layer3{ - SrcIP: "10.0.0.2", - DstIP: "127.0.0.2", - }, - Layer4: layer4{ - SrcPort: 28107, - DstPort: 36826, - Proto: "tcp", - }, - }, - Independent: meta{ - ID: 347275904, - }, - }, - { - Type: newType, - Original: meta{ - Layer3: layer3{ - SrcIP: "10.0.0.1", - DstIP: "127.0.0.1", - }, - Layer4: layer4{ - SrcPort: 36898, - DstPort: 28107, - Proto: "tcp", - }, - }, - Reply: meta{ - Layer3: layer3{ - SrcIP: "10.0.0.2", - DstIP: "127.0.0.2", - }, - Layer4: layer4{ - SrcPort: 28107, - DstPort: 36898, - Proto: "tcp", - }, - }, - Independent: meta{ - ID: 347275904, - State: "SYN_SENT", - }, - }, - { - Type: updateType, - Original: meta{ - Layer3: layer3{ - SrcIP: "10.0.0.1", - DstIP: "127.0.0.1", - }, - Layer4: layer4{ - SrcPort: 36898, - DstPort: 28107, - Proto: "tcp", - }, - }, - Reply: meta{ - Layer3: layer3{ - SrcIP: "10.0.0.2", - DstIP: "127.0.0.2", - }, - Layer4: layer4{ - SrcPort: 28107, - DstPort: 36898, - Proto: "tcp", - }, - }, - Independent: meta{ - ID: 347275904, - State: "SYN_RECV", - }, - }, - { - Type: updateType, - Original: meta{ - Layer3: layer3{ - SrcIP: "10.0.0.1", - DstIP: "127.0.0.1", - }, - Layer4: layer4{ - SrcPort: 36898, - DstPort: 28107, - Proto: "tcp", - }, - }, - Reply: meta{ - Layer3: layer3{ - SrcIP: "10.0.0.2", - DstIP: "127.0.0.2", - }, - Layer4: layer4{ - SrcPort: 28107, - DstPort: 36898, - Proto: "tcp", - }, - }, - Independent: meta{ - ID: 347275904, - State: "ESTABLISHED", - }, - }, -} - -func testFlowDecoding(t *testing.T, source string, want []flow, decoder func(scanner *bufio.Scanner) (flow, error)) { - scanner := bufio.NewScanner(strings.NewReader(source)) - d := time.Millisecond * 100 - for _, wantFlow := range want { - haveFlow, err := decoder(scanner) - if err != nil { - t.Fatalf("Unexpected decoding error: %v", err) - } - test.Poll(t, d, wantFlow, func() interface{} { return haveFlow }) - } - - if _, err := decodeStreamedFlow(scanner); err != io.EOF { - t.Fatalf("Unexpected error value on empty input: %v", err) - } -} - -func TestStreamedFlowDecoding(t *testing.T) { - testFlowDecoding(t, streamedFlowsSource, wantStreamedFlows, decodeStreamedFlow) -} - -// Obtained through conntrack -L -p tcp -o id -// With SELinux, there is a "secctx=" -// After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes=" -const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208 -tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880 -tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840` - -var wantDumpedFlows = []flow{ - { - Original: meta{ - Layer3: layer3{ - SrcIP: "10.0.2.2", - DstIP: "10.0.2.15", - }, - Layer4: layer4{ - SrcPort: 49911, - DstPort: 22, - Proto: "tcp", - }, - }, - Reply: meta{ - Layer3: layer3{ - SrcIP: "10.0.2.15", - DstIP: "10.0.2.2", - }, - Layer4: layer4{ - SrcPort: 22, - DstPort: 49911, - Proto: "tcp", - }, - }, - Independent: meta{ - ID: 2993966208, - State: "ESTABLISHED", - }, - }, - { - Original: meta{ - Layer3: layer3{ - SrcIP: "172.17.0.5", - DstIP: "172.17.0.2", - }, - Layer4: layer4{ - SrcPort: 47010, - DstPort: 80, - Proto: "tcp", - }, - }, - Reply: meta{ - Layer3: layer3{ - SrcIP: "172.17.0.2", - DstIP: "172.17.0.5", - }, - Layer4: layer4{ - SrcPort: 80, - DstPort: 47010, - Proto: "tcp", - }, - }, - Independent: meta{ - ID: 4001098880, - State: "ESTABLISHED", - }, - }, - { - Original: meta{ - Layer3: layer3{ - SrcIP: "192.168.35.116", - DstIP: "216.58.213.227", - }, - Layer4: layer4{ - SrcPort: 49862, - DstPort: 443, - Proto: "tcp", - }, - }, - Reply: meta{ - Layer3: layer3{ - SrcIP: "216.58.213.227", - DstIP: "192.168.35.116", - }, - Layer4: layer4{ - SrcPort: 443, - DstPort: 49862, - Proto: "tcp", - }, - }, - Independent: meta{ - ID: 943643840, - State: "ESTABLISHED", - }, - }, -} - -func TestDumpedFlowDecoding(t *testing.T) { - testFlowDecoding(t, dumpedFlowsSource, wantDumpedFlows, decodeDumpedFlow) -} diff --git a/probe/endpoint/nat.go b/probe/endpoint/nat.go index 52073a209..aa87ab838 100644 --- a/probe/endpoint/nat.go +++ b/probe/endpoint/nat.go @@ -1,19 +1,21 @@ package endpoint import ( + "net" "strconv" + "github.com/weaveworks/scope/probe/endpoint/conntrack" "github.com/weaveworks/scope/report" ) // This is our 'abstraction' of the endpoint that have been rewritten by NAT. // Original is the private IP that has been rewritten. type endpointMapping struct { - originalIP string - originalPort int + originalIP net.IP + originalPort uint16 - rewrittenIP string - rewrittenPort int + rewrittenIP net.IP + rewrittenPort uint16 } // natMapper rewrites a report to deal with NAT'd connections. @@ -25,9 +27,9 @@ func makeNATMapper(fw flowWalker) natMapper { return natMapper{fw} } -func toMapping(f flow) *endpointMapping { +func toMapping(f conntrack.Flow) *endpointMapping { var mapping endpointMapping - if f.Original.Layer3.SrcIP == f.Reply.Layer3.DstIP { + if f.Original.Layer3.SrcIP.Equal(f.Reply.Layer3.DstIP) { mapping = endpointMapping{ originalIP: f.Reply.Layer3.SrcIP, originalPort: f.Reply.Layer4.SrcPort, @@ -49,13 +51,13 @@ func toMapping(f flow) *endpointMapping { // applyNAT duplicates Nodes in the endpoint topology of a report, based on // the NAT table. func (n natMapper) applyNAT(rpt report.Report, scope string) { - n.flowWalker.walkFlows(func(f flow, active bool) { + n.flowWalker.walkFlows(func(f conntrack.Flow, _ bool) { mapping := toMapping(f) - realEndpointPort := strconv.Itoa(mapping.originalPort) - copyEndpointPort := strconv.Itoa(mapping.rewrittenPort) - realEndpointID := report.MakeEndpointNodeID(scope, "", mapping.originalIP, realEndpointPort) - copyEndpointID := report.MakeEndpointNodeID(scope, "", mapping.rewrittenIP, copyEndpointPort) + realEndpointPort := strconv.Itoa(int(mapping.originalPort)) + copyEndpointPort := strconv.Itoa(int(mapping.rewrittenPort)) + realEndpointID := report.MakeEndpointNodeID(scope, "", mapping.originalIP.String(), realEndpointPort) + copyEndpointID := report.MakeEndpointNodeID(scope, "", mapping.rewrittenIP.String(), copyEndpointPort) node, ok := rpt.Endpoint.Nodes[realEndpointID] if !ok { diff --git a/probe/endpoint/nat_internal_test.go b/probe/endpoint/nat_internal_test.go index 51fda2605..57a09cb9c 100644 --- a/probe/endpoint/nat_internal_test.go +++ b/probe/endpoint/nat_internal_test.go @@ -1,19 +1,22 @@ package endpoint import ( + "net" + "syscall" "testing" "github.com/weaveworks/common/mtime" "github.com/weaveworks/common/test" + "github.com/weaveworks/scope/probe/endpoint/conntrack" "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/test/reflect" ) type mockFlowWalker struct { - flows []flow + flows []conntrack.Flow } -func (m *mockFlowWalker) walkFlows(f func(f flow, active bool)) { +func (m *mockFlowWalker) walkFlows(f func(f conntrack.Flow, active bool)) { for _, flow := range m.flows { f(flow, true) } @@ -32,39 +35,42 @@ func TestNat(t *testing.T) { // container2 (10.0.47.2:22222), host2 (2.3.4.5:22223) -> // host1 (1.2.3.4:80), container1 (10.0.47.1:80) + c1 := net.ParseIP("10.0.47.1") + c2 := net.ParseIP("10.0.47.2") + host2 := net.ParseIP("2.3.4.5") + host1 := net.ParseIP("1.2.3.4") + // from the PoV of host1 { - f := flow{ - Type: updateType, - Original: meta{ - Layer3: layer3{ - SrcIP: "2.3.4.5", - DstIP: "1.2.3.4", + f := conntrack.Flow{ + MsgType: conntrack.NfctMsgUpdate, + Original: conntrack.Meta{ + Layer3: conntrack.Layer3{ + SrcIP: host2, + DstIP: host1, }, - Layer4: layer4{ + Layer4: conntrack.Layer4{ SrcPort: 22222, DstPort: 80, - Proto: "tcp", + Proto: syscall.IPPROTO_TCP, }, }, - Reply: meta{ - Layer3: layer3{ - SrcIP: "10.0.47.1", - DstIP: "2.3.4.5", + Reply: conntrack.Meta{ + Layer3: conntrack.Layer3{ + SrcIP: c1, + DstIP: host2, }, - Layer4: layer4{ + Layer4: conntrack.Layer4{ SrcPort: 80, DstPort: 22222, - Proto: "tcp", + Proto: syscall.IPPROTO_TCP, }, }, - Independent: meta{ - ID: 1, - }, + ID: 1, } ct := &mockFlowWalker{ - flows: []flow{f}, + flows: []conntrack.Flow{f}, } have := report.MakeReport() @@ -88,36 +94,34 @@ func TestNat(t *testing.T) { // form the PoV of host2 { - f := flow{ - Type: updateType, - Original: meta{ - Layer3: layer3{ - SrcIP: "10.0.47.2", - DstIP: "1.2.3.4", + f := conntrack.Flow{ + MsgType: conntrack.NfctMsgUpdate, + Original: conntrack.Meta{ + Layer3: conntrack.Layer3{ + SrcIP: c2, + DstIP: host1, }, - Layer4: layer4{ + Layer4: conntrack.Layer4{ SrcPort: 22222, DstPort: 80, - Proto: "tcp", + Proto: syscall.IPPROTO_TCP, }, }, - Reply: meta{ - Layer3: layer3{ - SrcIP: "1.2.3.4", - DstIP: "2.3.4.5", + Reply: conntrack.Meta{ + Layer3: conntrack.Layer3{ + SrcIP: host1, + DstIP: host2, }, - Layer4: layer4{ + Layer4: conntrack.Layer4{ SrcPort: 80, DstPort: 22223, - Proto: "tcp", + Proto: syscall.IPPROTO_TCP, }, }, - Independent: meta{ - ID: 2, - }, + ID: 2, } ct := &mockFlowWalker{ - flows: []flow{f}, + flows: []conntrack.Flow{f}, } have := report.MakeReport()