mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-05 11:11:13 +00:00
Switch to new conntrack library
This commit is contained in:
committed by
Bryan Boreham
parent
853196f6d1
commit
ac63937df7
@@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/weaveworks/scope/probe/endpoint/conntrack"
|
||||
"github.com/weaveworks/scope/probe/endpoint/procspy"
|
||||
"github.com/weaveworks/scope/probe/process"
|
||||
"github.com/weaveworks/scope/report"
|
||||
@@ -53,18 +54,18 @@ func newConnectionTracker(conf connectionTrackerConfig) connectionTracker {
|
||||
return ct
|
||||
}
|
||||
|
||||
func flowToTuple(f flow) (ft fourTuple) {
|
||||
func flowToTuple(f conntrack.Flow) (ft fourTuple) {
|
||||
ft = fourTuple{
|
||||
f.Original.Layer3.SrcIP,
|
||||
f.Original.Layer3.DstIP,
|
||||
f.Original.Layer3.SrcIP.String(),
|
||||
f.Original.Layer3.DstIP.String(),
|
||||
uint16(f.Original.Layer4.SrcPort),
|
||||
uint16(f.Original.Layer4.DstPort),
|
||||
}
|
||||
// Handle DNAT-ed connections in the initial state
|
||||
if f.Original.Layer3.DstIP != f.Reply.Layer3.SrcIP {
|
||||
if !f.Original.Layer3.DstIP.Equal(f.Reply.Layer3.SrcIP) {
|
||||
ft = fourTuple{
|
||||
f.Reply.Layer3.DstIP,
|
||||
f.Reply.Layer3.SrcIP,
|
||||
f.Reply.Layer3.DstIP.String(),
|
||||
f.Reply.Layer3.SrcIP.String(),
|
||||
uint16(f.Reply.Layer4.DstPort),
|
||||
uint16(f.Reply.Layer4.SrcPort),
|
||||
}
|
||||
@@ -118,7 +119,7 @@ func (t *connectionTracker) ReportConnections(rpt *report.Report) {
|
||||
|
||||
// consult the flowWalker for short-lived (conntracked) connections
|
||||
seenTuples := map[string]fourTuple{}
|
||||
t.flowWalker.walkFlows(func(f flow, alive bool) {
|
||||
t.flowWalker.walkFlows(func(f conntrack.Flow, alive bool) {
|
||||
tuple := flowToTuple(f)
|
||||
seenTuples[tuple.key()] = tuple
|
||||
t.addConnection(rpt, false, tuple, "", nil, nil)
|
||||
@@ -135,7 +136,7 @@ func (t *connectionTracker) existingFlows() map[string]fourTuple {
|
||||
// log.Warnf("Not using conntrack: disabled")
|
||||
} else if err := IsConntrackSupported(t.conf.ProcRoot); err != nil {
|
||||
log.Warnf("Not using conntrack: not supported by the kernel: %s", err)
|
||||
} else if existingFlows, err := existingConnections([]string{"--any-nat"}); err != nil {
|
||||
} else if existingFlows, err := conntrack.Established(t.conf.BufferSize); err != nil { // TODO: worry about --any-nat
|
||||
log.Errorf("conntrack existingConnections error: %v", err)
|
||||
} else {
|
||||
for _, f := range existingFlows {
|
||||
|
||||
@@ -2,85 +2,41 @@ package endpoint
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/weaveworks/common/exec"
|
||||
"github.com/weaveworks/scope/probe/endpoint/conntrack"
|
||||
)
|
||||
|
||||
const (
|
||||
// From https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
|
||||
eventsPath = "sys/net/netfilter/nf_conntrack_events"
|
||||
|
||||
timeWait = "TIME_WAIT"
|
||||
tcpProto = "tcp"
|
||||
newType = "[NEW]"
|
||||
updateType = "[UPDATE]"
|
||||
destroyType = "[DESTROY]"
|
||||
)
|
||||
|
||||
var (
|
||||
destroyTypeB = []byte(destroyType)
|
||||
assured = []byte("[ASSURED] ")
|
||||
unreplied = []byte("[UNREPLIED] ")
|
||||
)
|
||||
|
||||
type layer3 struct {
|
||||
SrcIP string
|
||||
DstIP string
|
||||
}
|
||||
|
||||
type layer4 struct {
|
||||
SrcPort int
|
||||
DstPort int
|
||||
Proto string
|
||||
}
|
||||
|
||||
type meta struct {
|
||||
Layer3 layer3
|
||||
Layer4 layer4
|
||||
ID int64
|
||||
State string
|
||||
}
|
||||
|
||||
type flow struct {
|
||||
Type string
|
||||
Original, Reply, Independent meta
|
||||
}
|
||||
|
||||
type conntrack struct {
|
||||
Flows []flow
|
||||
}
|
||||
|
||||
// flowWalker is something that maintains flows, and provides an accessor
|
||||
// method to walk them.
|
||||
type flowWalker interface {
|
||||
walkFlows(f func(f flow, active bool))
|
||||
walkFlows(f func(conntrack.Flow, bool))
|
||||
stop()
|
||||
}
|
||||
|
||||
type nilFlowWalker struct{}
|
||||
|
||||
func (n nilFlowWalker) stop() {}
|
||||
func (n nilFlowWalker) walkFlows(f func(flow, bool)) {}
|
||||
func (n nilFlowWalker) stop() {}
|
||||
func (n nilFlowWalker) walkFlows(f func(conntrack.Flow, bool)) {}
|
||||
|
||||
// conntrackWalker uses the conntrack command to track network connections and
|
||||
// implement flowWalker.
|
||||
type conntrackWalker struct {
|
||||
sync.Mutex
|
||||
cmd exec.Cmd
|
||||
activeFlows map[int64]flow // active flows in state != TIME_WAIT
|
||||
bufferedFlows []flow // flows coming out of activeFlows spend 1 walk cycle here
|
||||
activeFlows map[uint32]conntrack.Flow // active flows in state != TIME_WAIT
|
||||
bufferedFlows []conntrack.Flow // flows coming out of activeFlows spend 1 walk cycle here
|
||||
bufferSize int
|
||||
args []string
|
||||
quit chan struct{}
|
||||
@@ -95,7 +51,7 @@ func newConntrackFlowWalker(useConntrack bool, procRoot string, bufferSize int,
|
||||
return nilFlowWalker{}
|
||||
}
|
||||
result := &conntrackWalker{
|
||||
activeFlows: map[int64]flow{},
|
||||
activeFlows: map[uint32]conntrack.Flow{},
|
||||
bufferSize: bufferSize,
|
||||
args: args,
|
||||
quit: make(chan struct{}),
|
||||
@@ -144,7 +100,7 @@ func (c *conntrackWalker) clearFlows() {
|
||||
c.bufferedFlows = append(c.bufferedFlows, f)
|
||||
}
|
||||
|
||||
c.activeFlows = map[int64]flow{}
|
||||
c.activeFlows = map[uint32]conntrack.Flow{}
|
||||
}
|
||||
|
||||
func logPipe(prefix string, reader io.Reader) {
|
||||
@@ -158,9 +114,7 @@ func logPipe(prefix string, reader io.Reader) {
|
||||
}
|
||||
|
||||
func (c *conntrackWalker) run() {
|
||||
// Fork another conntrack, just to capture existing connections
|
||||
// for which we don't get events
|
||||
existingFlows, err := existingConnections(c.args)
|
||||
existingFlows, err := c.existingConnections()
|
||||
if err != nil {
|
||||
log.Errorf("conntrack existingConnections error: %v", err)
|
||||
return
|
||||
@@ -169,35 +123,12 @@ func (c *conntrackWalker) run() {
|
||||
c.handleFlow(flow, true)
|
||||
}
|
||||
|
||||
args := append([]string{
|
||||
"--buffer-size", strconv.Itoa(c.bufferSize), "-E",
|
||||
"-o", "id", "-p", "tcp"}, c.args...,
|
||||
)
|
||||
cmd := exec.Command("conntrack", args...)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
events, stop, err := conntrack.Follow(c.bufferSize)
|
||||
if err != nil {
|
||||
log.Errorf("conntrack error: %v", err)
|
||||
log.Errorf("conntract Follow error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
log.Errorf("conntrack error: %v", err)
|
||||
return
|
||||
}
|
||||
go logPipe("conntrack stderr:", stderr)
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
log.Errorf("conntrack error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := cmd.Wait(); err != nil {
|
||||
log.Errorf("conntrack error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
c.Lock()
|
||||
// We may have stopped in the mean time,
|
||||
// so check to see if the channel is open
|
||||
@@ -207,255 +138,56 @@ func (c *conntrackWalker) run() {
|
||||
case <-c.quit:
|
||||
return
|
||||
}
|
||||
c.cmd = cmd
|
||||
c.Unlock()
|
||||
|
||||
scanner := bufio.NewScanner(bufio.NewReader(stdout))
|
||||
defer log.Infof("conntrack exiting")
|
||||
|
||||
// Loop on the output stream
|
||||
// Handle conntrack events from netlink socket
|
||||
for {
|
||||
f, err := decodeStreamedFlow(scanner)
|
||||
if err != nil {
|
||||
log.Errorf("conntrack error: %v", err)
|
||||
select {
|
||||
case <-c.quit:
|
||||
stop()
|
||||
return
|
||||
}
|
||||
c.handleFlow(f, false)
|
||||
}
|
||||
}
|
||||
|
||||
// Get a line without [ASSURED]/[UNREPLIED] tags (it simplifies parsing)
|
||||
func getUntaggedLine(scanner *bufio.Scanner) ([]byte, error) {
|
||||
success := scanner.Scan()
|
||||
if !success {
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, io.EOF
|
||||
}
|
||||
line := scanner.Bytes()
|
||||
// Remove [ASSURED]/[UNREPLIED] tags
|
||||
line = removeInplace(line, assured)
|
||||
line = removeInplace(line, unreplied)
|
||||
return line, nil
|
||||
}
|
||||
|
||||
func removeInplace(s, sep []byte) []byte {
|
||||
// TODO: See if we can get better performance
|
||||
// removing multiple substrings at once (with index/suffixarray New()+Lookup())
|
||||
// Probably not worth it for only two substrings occurring once.
|
||||
index := bytes.Index(s, sep)
|
||||
if index < 0 {
|
||||
return s
|
||||
}
|
||||
copy(s[index:], s[index+len(sep):])
|
||||
return s[:len(s)-len(sep)]
|
||||
}
|
||||
|
||||
// decodeFlowKeyValues parses the key-values from a conntrack line and updates the flow
|
||||
// It only considers the following key-values:
|
||||
// src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776
|
||||
// Keys can be present twice, so the order is important.
|
||||
// Conntrack could add other key-values such as secctx=, packets=, bytes=. Those are ignored.
|
||||
func decodeFlowKeyValues(line []byte, f *flow) error {
|
||||
var err error
|
||||
for _, field := range strings.FieldsFunc(string(line), func(c rune) bool { return unicode.IsSpace(c) }) {
|
||||
kv := strings.SplitN(field, "=", 2)
|
||||
if len(kv) != 2 {
|
||||
continue
|
||||
}
|
||||
key := kv[0]
|
||||
value := kv[1]
|
||||
firstTupleSet := f.Original.Layer4.DstPort != 0
|
||||
switch {
|
||||
case key == "src":
|
||||
if !firstTupleSet {
|
||||
f.Original.Layer3.SrcIP = value
|
||||
} else {
|
||||
f.Reply.Layer3.SrcIP = value
|
||||
case f, ok := <-events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
case key == "dst":
|
||||
if !firstTupleSet {
|
||||
f.Original.Layer3.DstIP = value
|
||||
} else {
|
||||
f.Reply.Layer3.DstIP = value
|
||||
}
|
||||
|
||||
case key == "sport":
|
||||
if !firstTupleSet {
|
||||
f.Original.Layer4.SrcPort, err = strconv.Atoi(value)
|
||||
} else {
|
||||
f.Reply.Layer4.SrcPort, err = strconv.Atoi(value)
|
||||
}
|
||||
|
||||
case key == "dport":
|
||||
if !firstTupleSet {
|
||||
f.Original.Layer4.DstPort, err = strconv.Atoi(value)
|
||||
} else {
|
||||
f.Reply.Layer4.DstPort, err = strconv.Atoi(value)
|
||||
}
|
||||
|
||||
case key == "id":
|
||||
f.Independent.ID, err = strconv.ParseInt(value, 10, 64)
|
||||
c.handleFlow(f, false)
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
|
||||
var (
|
||||
// Use ints for parsing unused fields since their allocations
|
||||
// are almost for free
|
||||
unused [2]int
|
||||
f flow
|
||||
)
|
||||
|
||||
// Examples:
|
||||
// " [UPDATE] udp 17 29 src=192.168.2.100 dst=192.168.2.1 sport=57767 dport=53 src=192.168.2.1 dst=192.168.2.100 sport=53 dport=57767"
|
||||
// " [NEW] tcp 6 120 SYN_SENT src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776"
|
||||
// " [UPDATE] tcp 6 120 TIME_WAIT src=10.0.2.15 dst=10.0.2.15 sport=51154 dport=4040 src=10.0.2.15 dst=10.0.2.15 sport=4040 dport=51154 [ASSURED] id=3663628160"
|
||||
// " [DESTROY] tcp 6 src=172.17.0.1 dst=172.17.0.1 sport=34078 dport=53 src=172.17.0.1 dst=10.0.2.15 sport=53 dport=34078 id=3668417984" (note how the timeout and state field is missing)
|
||||
|
||||
// Remove tags since they are optional and make parsing harder
|
||||
line, err := getUntaggedLine(scanner)
|
||||
func (c *conntrackWalker) existingConnections() ([]conntrack.Flow, error) {
|
||||
flows, err := conntrack.Established(c.bufferSize)
|
||||
if err != nil {
|
||||
return flow{}, err
|
||||
return []conntrack.Flow{}, err
|
||||
}
|
||||
|
||||
line = bytes.TrimLeft(line, " ")
|
||||
if bytes.HasPrefix(line, destroyTypeB) {
|
||||
// Destroy events don't have a timeout or state field
|
||||
_, err = fmt.Sscanf(string(line), "%s %s %d",
|
||||
&f.Type,
|
||||
&f.Original.Layer4.Proto,
|
||||
&unused[0],
|
||||
)
|
||||
} else {
|
||||
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s",
|
||||
&f.Type,
|
||||
&f.Original.Layer4.Proto,
|
||||
&unused[0],
|
||||
&unused[1],
|
||||
&f.Independent.State,
|
||||
)
|
||||
}
|
||||
if err != nil {
|
||||
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
|
||||
}
|
||||
|
||||
err = decodeFlowKeyValues(line, &f)
|
||||
if err != nil {
|
||||
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
|
||||
}
|
||||
|
||||
f.Reply.Layer4.Proto = f.Original.Layer4.Proto
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func existingConnections(conntrackWalkerArgs []string) ([]flow, error) {
|
||||
args := append([]string{"-L", "-o", "id", "-p", "tcp"}, conntrackWalkerArgs...)
|
||||
cmd := exec.Command("conntrack", args...)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return []flow{}, err
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
return []flow{}, err
|
||||
}
|
||||
defer func() {
|
||||
if err := cmd.Wait(); err != nil {
|
||||
log.Errorf("conntrack existingConnections exit error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
scanner := bufio.NewScanner(bufio.NewReader(stdout))
|
||||
var result []flow
|
||||
for {
|
||||
f, err := decodeDumpedFlow(scanner)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
log.Errorf("conntrack error: %v", err)
|
||||
return result, err
|
||||
}
|
||||
result = append(result, f)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
|
||||
var (
|
||||
// Use ints for parsing unused fields since allocations
|
||||
// are almost for free
|
||||
unused [4]int
|
||||
f flow
|
||||
)
|
||||
|
||||
// Examples with different formats:
|
||||
// With SELinux, there is a "secctx="
|
||||
// After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes="
|
||||
//
|
||||
// "tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088"
|
||||
// "tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208"
|
||||
// "tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880"
|
||||
// "tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840"
|
||||
|
||||
// remove tags since they are optional and make parsing harder
|
||||
line, err := getUntaggedLine(scanner)
|
||||
if err != nil {
|
||||
return flow{}, err
|
||||
}
|
||||
|
||||
_, err = fmt.Sscanf(string(line), "%s %d %d %s", &f.Original.Layer4.Proto, &unused[0], &unused[1], &f.Independent.State)
|
||||
if err != nil {
|
||||
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
|
||||
}
|
||||
|
||||
err = decodeFlowKeyValues(line, &f)
|
||||
if err != nil {
|
||||
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
|
||||
}
|
||||
|
||||
f.Reply.Layer4.Proto = f.Original.Layer4.Proto
|
||||
return f, nil
|
||||
return flows, nil
|
||||
}
|
||||
|
||||
func (c *conntrackWalker) stop() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
close(c.quit)
|
||||
if c.cmd != nil {
|
||||
c.cmd.Kill()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {
|
||||
func (c *conntrackWalker) handleFlow(f conntrack.Flow, forceAdd bool) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
// For not, I'm only interested in tcp connections - there is too much udp
|
||||
// traffic going on (every container talking to weave dns, for example) to
|
||||
// render nicely. TODO: revisit this.
|
||||
if f.Original.Layer4.Proto != tcpProto {
|
||||
return
|
||||
}
|
||||
|
||||
// Ignore flows for which we never saw an update; they are likely
|
||||
// incomplete or wrong. See #1462.
|
||||
switch {
|
||||
case forceAdd || f.Type == updateType:
|
||||
if f.Independent.State != timeWait {
|
||||
c.activeFlows[f.Independent.ID] = f
|
||||
} else if _, ok := c.activeFlows[f.Independent.ID]; ok {
|
||||
delete(c.activeFlows, f.Independent.ID)
|
||||
case forceAdd || f.MsgType == conntrack.NfctMsgUpdate:
|
||||
if f.State != conntrack.TCPStateTimeWait {
|
||||
c.activeFlows[f.ID] = f
|
||||
} else if _, ok := c.activeFlows[f.ID]; ok {
|
||||
delete(c.activeFlows, f.ID)
|
||||
c.bufferedFlows = append(c.bufferedFlows, f)
|
||||
}
|
||||
case f.Type == destroyType:
|
||||
if active, ok := c.activeFlows[f.Independent.ID]; ok {
|
||||
delete(c.activeFlows, f.Independent.ID)
|
||||
case f.MsgType == conntrack.NfctMsgDestroy:
|
||||
if active, ok := c.activeFlows[f.ID]; ok {
|
||||
delete(c.activeFlows, f.ID)
|
||||
c.bufferedFlows = append(c.bufferedFlows, active)
|
||||
}
|
||||
}
|
||||
@@ -463,7 +195,7 @@ func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {
|
||||
|
||||
// walkFlows calls f with all active flows and flows that have come and gone
|
||||
// since the last call to walkFlows
|
||||
func (c *conntrackWalker) walkFlows(f func(flow, bool)) {
|
||||
func (c *conntrackWalker) walkFlows(f func(conntrack.Flow, bool)) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
for _, flow := range c.activeFlows {
|
||||
|
||||
@@ -1,253 +0,0 @@
|
||||
package endpoint
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/weaveworks/scope/test"
|
||||
)
|
||||
|
||||
// Obtained though conntrack -E -p tcp -o id and then tweaked
|
||||
const streamedFlowsSource = `[DESTROY] tcp 6 src=10.0.0.1 dst=127.0.0.1 sport=36826 dport=28106 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36826 [ASSURED] id=347275904
|
||||
[NEW] tcp 6 120 SYN_SENT src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 [UNREPLIED] src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 id=347275904
|
||||
[UPDATE] tcp 6 60 SYN_RECV src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 id=347275904
|
||||
[UPDATE] tcp 6 432000 ESTABLISHED src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 [ASSURED] id=347275904`
|
||||
|
||||
var wantStreamedFlows = []flow{
|
||||
{
|
||||
Type: destroyType,
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.1",
|
||||
DstIP: "127.0.0.1",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 36826,
|
||||
DstPort: 28106,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.2",
|
||||
DstIP: "127.0.0.2",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 28107,
|
||||
DstPort: 36826,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 347275904,
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: newType,
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.1",
|
||||
DstIP: "127.0.0.1",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 36898,
|
||||
DstPort: 28107,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.2",
|
||||
DstIP: "127.0.0.2",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 28107,
|
||||
DstPort: 36898,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 347275904,
|
||||
State: "SYN_SENT",
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: updateType,
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.1",
|
||||
DstIP: "127.0.0.1",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 36898,
|
||||
DstPort: 28107,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.2",
|
||||
DstIP: "127.0.0.2",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 28107,
|
||||
DstPort: 36898,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 347275904,
|
||||
State: "SYN_RECV",
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: updateType,
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.1",
|
||||
DstIP: "127.0.0.1",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 36898,
|
||||
DstPort: 28107,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.2",
|
||||
DstIP: "127.0.0.2",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 28107,
|
||||
DstPort: 36898,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 347275904,
|
||||
State: "ESTABLISHED",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func testFlowDecoding(t *testing.T, source string, want []flow, decoder func(scanner *bufio.Scanner) (flow, error)) {
|
||||
scanner := bufio.NewScanner(strings.NewReader(source))
|
||||
d := time.Millisecond * 100
|
||||
for _, wantFlow := range want {
|
||||
haveFlow, err := decoder(scanner)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected decoding error: %v", err)
|
||||
}
|
||||
test.Poll(t, d, wantFlow, func() interface{} { return haveFlow })
|
||||
}
|
||||
|
||||
if _, err := decodeStreamedFlow(scanner); err != io.EOF {
|
||||
t.Fatalf("Unexpected error value on empty input: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamedFlowDecoding(t *testing.T) {
|
||||
testFlowDecoding(t, streamedFlowsSource, wantStreamedFlows, decodeStreamedFlow)
|
||||
}
|
||||
|
||||
// Obtained through conntrack -L -p tcp -o id
|
||||
// With SELinux, there is a "secctx="
|
||||
// After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes="
|
||||
const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208
|
||||
tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880
|
||||
tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840`
|
||||
|
||||
var wantDumpedFlows = []flow{
|
||||
{
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.2.2",
|
||||
DstIP: "10.0.2.15",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 49911,
|
||||
DstPort: 22,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.2.15",
|
||||
DstIP: "10.0.2.2",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 22,
|
||||
DstPort: 49911,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 2993966208,
|
||||
State: "ESTABLISHED",
|
||||
},
|
||||
},
|
||||
{
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "172.17.0.5",
|
||||
DstIP: "172.17.0.2",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 47010,
|
||||
DstPort: 80,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "172.17.0.2",
|
||||
DstIP: "172.17.0.5",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 80,
|
||||
DstPort: 47010,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 4001098880,
|
||||
State: "ESTABLISHED",
|
||||
},
|
||||
},
|
||||
{
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "192.168.35.116",
|
||||
DstIP: "216.58.213.227",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 49862,
|
||||
DstPort: 443,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "216.58.213.227",
|
||||
DstIP: "192.168.35.116",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 443,
|
||||
DstPort: 49862,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 943643840,
|
||||
State: "ESTABLISHED",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestDumpedFlowDecoding(t *testing.T) {
|
||||
testFlowDecoding(t, dumpedFlowsSource, wantDumpedFlows, decodeDumpedFlow)
|
||||
}
|
||||
@@ -1,19 +1,21 @@
|
||||
package endpoint
|
||||
|
||||
import (
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/weaveworks/scope/probe/endpoint/conntrack"
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
// This is our 'abstraction' of the endpoint that have been rewritten by NAT.
|
||||
// Original is the private IP that has been rewritten.
|
||||
type endpointMapping struct {
|
||||
originalIP string
|
||||
originalPort int
|
||||
originalIP net.IP
|
||||
originalPort uint16
|
||||
|
||||
rewrittenIP string
|
||||
rewrittenPort int
|
||||
rewrittenIP net.IP
|
||||
rewrittenPort uint16
|
||||
}
|
||||
|
||||
// natMapper rewrites a report to deal with NAT'd connections.
|
||||
@@ -25,9 +27,9 @@ func makeNATMapper(fw flowWalker) natMapper {
|
||||
return natMapper{fw}
|
||||
}
|
||||
|
||||
func toMapping(f flow) *endpointMapping {
|
||||
func toMapping(f conntrack.Flow) *endpointMapping {
|
||||
var mapping endpointMapping
|
||||
if f.Original.Layer3.SrcIP == f.Reply.Layer3.DstIP {
|
||||
if f.Original.Layer3.SrcIP.Equal(f.Reply.Layer3.DstIP) {
|
||||
mapping = endpointMapping{
|
||||
originalIP: f.Reply.Layer3.SrcIP,
|
||||
originalPort: f.Reply.Layer4.SrcPort,
|
||||
@@ -49,13 +51,13 @@ func toMapping(f flow) *endpointMapping {
|
||||
// applyNAT duplicates Nodes in the endpoint topology of a report, based on
|
||||
// the NAT table.
|
||||
func (n natMapper) applyNAT(rpt report.Report, scope string) {
|
||||
n.flowWalker.walkFlows(func(f flow, active bool) {
|
||||
n.flowWalker.walkFlows(func(f conntrack.Flow, _ bool) {
|
||||
mapping := toMapping(f)
|
||||
|
||||
realEndpointPort := strconv.Itoa(mapping.originalPort)
|
||||
copyEndpointPort := strconv.Itoa(mapping.rewrittenPort)
|
||||
realEndpointID := report.MakeEndpointNodeID(scope, "", mapping.originalIP, realEndpointPort)
|
||||
copyEndpointID := report.MakeEndpointNodeID(scope, "", mapping.rewrittenIP, copyEndpointPort)
|
||||
realEndpointPort := strconv.Itoa(int(mapping.originalPort))
|
||||
copyEndpointPort := strconv.Itoa(int(mapping.rewrittenPort))
|
||||
realEndpointID := report.MakeEndpointNodeID(scope, "", mapping.originalIP.String(), realEndpointPort)
|
||||
copyEndpointID := report.MakeEndpointNodeID(scope, "", mapping.rewrittenIP.String(), copyEndpointPort)
|
||||
|
||||
node, ok := rpt.Endpoint.Nodes[realEndpointID]
|
||||
if !ok {
|
||||
|
||||
@@ -1,19 +1,22 @@
|
||||
package endpoint
|
||||
|
||||
import (
|
||||
"net"
|
||||
"syscall"
|
||||
"testing"
|
||||
|
||||
"github.com/weaveworks/common/mtime"
|
||||
"github.com/weaveworks/common/test"
|
||||
"github.com/weaveworks/scope/probe/endpoint/conntrack"
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/test/reflect"
|
||||
)
|
||||
|
||||
type mockFlowWalker struct {
|
||||
flows []flow
|
||||
flows []conntrack.Flow
|
||||
}
|
||||
|
||||
func (m *mockFlowWalker) walkFlows(f func(f flow, active bool)) {
|
||||
func (m *mockFlowWalker) walkFlows(f func(f conntrack.Flow, active bool)) {
|
||||
for _, flow := range m.flows {
|
||||
f(flow, true)
|
||||
}
|
||||
@@ -32,39 +35,42 @@ func TestNat(t *testing.T) {
|
||||
// container2 (10.0.47.2:22222), host2 (2.3.4.5:22223) ->
|
||||
// host1 (1.2.3.4:80), container1 (10.0.47.1:80)
|
||||
|
||||
c1 := net.ParseIP("10.0.47.1")
|
||||
c2 := net.ParseIP("10.0.47.2")
|
||||
host2 := net.ParseIP("2.3.4.5")
|
||||
host1 := net.ParseIP("1.2.3.4")
|
||||
|
||||
// from the PoV of host1
|
||||
{
|
||||
f := flow{
|
||||
Type: updateType,
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "2.3.4.5",
|
||||
DstIP: "1.2.3.4",
|
||||
f := conntrack.Flow{
|
||||
MsgType: conntrack.NfctMsgUpdate,
|
||||
Original: conntrack.Meta{
|
||||
Layer3: conntrack.Layer3{
|
||||
SrcIP: host2,
|
||||
DstIP: host1,
|
||||
},
|
||||
Layer4: layer4{
|
||||
Layer4: conntrack.Layer4{
|
||||
SrcPort: 22222,
|
||||
DstPort: 80,
|
||||
Proto: "tcp",
|
||||
Proto: syscall.IPPROTO_TCP,
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.47.1",
|
||||
DstIP: "2.3.4.5",
|
||||
Reply: conntrack.Meta{
|
||||
Layer3: conntrack.Layer3{
|
||||
SrcIP: c1,
|
||||
DstIP: host2,
|
||||
},
|
||||
Layer4: layer4{
|
||||
Layer4: conntrack.Layer4{
|
||||
SrcPort: 80,
|
||||
DstPort: 22222,
|
||||
Proto: "tcp",
|
||||
Proto: syscall.IPPROTO_TCP,
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 1,
|
||||
},
|
||||
ID: 1,
|
||||
}
|
||||
|
||||
ct := &mockFlowWalker{
|
||||
flows: []flow{f},
|
||||
flows: []conntrack.Flow{f},
|
||||
}
|
||||
|
||||
have := report.MakeReport()
|
||||
@@ -88,36 +94,34 @@ func TestNat(t *testing.T) {
|
||||
|
||||
// form the PoV of host2
|
||||
{
|
||||
f := flow{
|
||||
Type: updateType,
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.47.2",
|
||||
DstIP: "1.2.3.4",
|
||||
f := conntrack.Flow{
|
||||
MsgType: conntrack.NfctMsgUpdate,
|
||||
Original: conntrack.Meta{
|
||||
Layer3: conntrack.Layer3{
|
||||
SrcIP: c2,
|
||||
DstIP: host1,
|
||||
},
|
||||
Layer4: layer4{
|
||||
Layer4: conntrack.Layer4{
|
||||
SrcPort: 22222,
|
||||
DstPort: 80,
|
||||
Proto: "tcp",
|
||||
Proto: syscall.IPPROTO_TCP,
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "1.2.3.4",
|
||||
DstIP: "2.3.4.5",
|
||||
Reply: conntrack.Meta{
|
||||
Layer3: conntrack.Layer3{
|
||||
SrcIP: host1,
|
||||
DstIP: host2,
|
||||
},
|
||||
Layer4: layer4{
|
||||
Layer4: conntrack.Layer4{
|
||||
SrcPort: 80,
|
||||
DstPort: 22223,
|
||||
Proto: "tcp",
|
||||
Proto: syscall.IPPROTO_TCP,
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 2,
|
||||
},
|
||||
ID: 2,
|
||||
}
|
||||
ct := &mockFlowWalker{
|
||||
flows: []flow{f},
|
||||
flows: []conntrack.Flow{f},
|
||||
}
|
||||
|
||||
have := report.MakeReport()
|
||||
|
||||
Reference in New Issue
Block a user