Merge pull request #2095 from weaveworks/1991-conntrack-parsing

Disable XML in conntrack parsing
This commit is contained in:
Alfonso Acosta
2016-12-22 11:00:51 +01:00
committed by GitHub
3 changed files with 408 additions and 238 deletions

View File

@@ -2,7 +2,8 @@ package endpoint
import (
"bufio"
"encoding/xml"
"bytes"
"fmt"
"io"
"os"
"path/filepath"
@@ -18,49 +19,45 @@ import (
const (
// From https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
// Check a tcp-related file for existence since we need tcp tracking
procFileToCheck = "sys/net/netfilter/nf_conntrack_tcp_timeout_close"
xmlHeader = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
conntrackOpenTag = "<conntrack>\n"
timeWait = "TIME_WAIT"
tcpProto = "tcp"
newType = "new"
updateType = "update"
destroyType = "destroy"
procFileToCheck = "sys/net/netfilter/nf_conntrack_tcp_timeout_close"
timeWait = "TIME_WAIT"
tcpProto = "tcp"
newType = "[NEW]"
updateType = "[UPDATE]"
destroyType = "[DESTROY]"
)
var (
destroyTypeB = []byte(destroyType)
assured = []byte("[ASSURED] ")
unreplied = []byte("[UNREPLIED] ")
)
type layer3 struct {
XMLName xml.Name `xml:"layer3"`
SrcIP string `xml:"src"`
DstIP string `xml:"dst"`
SrcIP string
DstIP string
}
type layer4 struct {
XMLName xml.Name `xml:"layer4"`
SrcPort int `xml:"sport"`
DstPort int `xml:"dport"`
Proto string `xml:"protoname,attr"`
SrcPort int
DstPort int
Proto string
}
type meta struct {
XMLName xml.Name `xml:"meta"`
Direction string `xml:"direction,attr"`
Layer3 layer3 `xml:"layer3"`
Layer4 layer4 `xml:"layer4"`
ID int64 `xml:"id"`
State string `xml:"state"`
Layer3 layer3
Layer4 layer4
ID int64
State string
}
type flow struct {
XMLName xml.Name `xml:"flow"`
Metas []meta `xml:"meta"`
Type string `xml:"type,attr"`
Original, Reply, Independent *meta `xml:"-"`
Type string
Original, Reply, Independent meta
}
type conntrack struct {
XMLName xml.Name `xml:"conntrack"`
Flows []flow `xml:"flow"`
Flows []flow
}
// flowWalker is something that maintains flows, and provides an accessor
@@ -165,7 +162,7 @@ func (c *conntrackWalker) run() {
args := append([]string{
"--buffer-size", strconv.Itoa(c.bufferSize), "-E",
"-o", "xml", "-p", "tcp"}, c.args...,
"-o", "id", "-p", "tcp"}, c.args...,
)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
@@ -204,30 +201,13 @@ func (c *conntrackWalker) run() {
c.cmd = cmd
c.Unlock()
// Swallow the first two lines
reader := bufio.NewReader(stdout)
if line, err := reader.ReadString('\n'); err != nil {
log.Errorf("conntrack error: %v", err)
return
} else if line != xmlHeader {
log.Errorf("conntrack invalid output: '%s'", line)
return
}
if line, err := reader.ReadString('\n'); err != nil {
log.Errorf("conntrack error: %v", err)
return
} else if line != conntrackOpenTag {
log.Errorf("conntrack invalid output: '%s'", line)
return
}
scanner := bufio.NewScanner(bufio.NewReader(stdout))
defer log.Infof("conntrack exiting")
defer log.Infof("contrack exiting")
// Now loop on the output stream
decoder := xml.NewDecoder(reader)
// Loop on the output stream
for {
var f flow
if err := decoder.Decode(&f); err != nil {
f, err := decodeStreamedFlow(scanner)
if err != nil {
log.Errorf("conntrack error: %v", err)
return
}
@@ -235,8 +215,99 @@ func (c *conntrackWalker) run() {
}
}
// Get a line without [ASSURED]/[UNREPLIED] tags (it simplifies parsing)
func getUntaggedLine(scanner *bufio.Scanner) ([]byte, error) {
success := scanner.Scan()
if !success {
if err := scanner.Err(); err != nil {
return nil, err
}
return nil, io.EOF
}
line := scanner.Bytes()
// Remove [ASSURED]/[UNREPLIED] tags
line = removeInplace(line, assured)
line = removeInplace(line, unreplied)
return line, nil
}
func removeInplace(s, sep []byte) []byte {
// TODO: See if we can get better performance
// removing multiple substrings at once (with index/suffixarray New()+Lookup())
// Probably not worth it for only two substrings occurring once.
index := bytes.Index(s, sep)
if index < 0 {
return s
}
copy(s[index:], s[index+len(sep):])
return s[:len(s)-len(sep)]
}
func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
var (
// Use ints for parsing unused fields since their allocations
// are almost for free
unused [2]int
f flow
)
// Examples:
// " [UPDATE] udp 17 29 src=192.168.2.100 dst=192.168.2.1 sport=57767 dport=53 src=192.168.2.1 dst=192.168.2.100 sport=53 dport=57767"
// " [NEW] tcp 6 120 SYN_SENT src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776"
// " [UPDATE] tcp 6 120 TIME_WAIT src=10.0.2.15 dst=10.0.2.15 sport=51154 dport=4040 src=10.0.2.15 dst=10.0.2.15 sport=4040 dport=51154 [ASSURED] id=3663628160"
// " [DESTROY] tcp 6 src=172.17.0.1 dst=172.17.0.1 sport=34078 dport=53 src=172.17.0.1 dst=10.0.2.15 sport=53 dport=34078 id=3668417984" (note how the timeout and state field is missing)
// Remove tags since they are optional and make parsing harder
line, err := getUntaggedLine(scanner)
if err != nil {
return flow{}, err
}
line = bytes.TrimLeft(line, " ")
if bytes.HasPrefix(line, destroyTypeB) {
// Destroy events don't have a timeout or state field
_, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
&f.Type,
&f.Original.Layer4.Proto,
&unused[0],
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&f.Independent.ID,
)
} else {
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
&f.Type,
&f.Original.Layer4.Proto,
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&f.Independent.ID,
)
}
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}
f.Reply.Layer4.Proto = f.Original.Layer4.Proto
return f, nil
}
func (c *conntrackWalker) existingConnections() ([]flow, error) {
args := append([]string{"-L", "-o", "xml", "-p", "tcp"}, c.args...)
args := append([]string{"-L", "-o", "id", "-p", "tcp"}, c.args...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
@@ -250,13 +321,63 @@ func (c *conntrackWalker) existingConnections() ([]flow, error) {
log.Errorf("conntrack existingConnections exit error: %v", err)
}
}()
var result conntrack
if err := xml.NewDecoder(stdout).Decode(&result); err == io.EOF {
return []flow{}, nil
} else if err != nil {
return []flow{}, err
scanner := bufio.NewScanner(bufio.NewReader(stdout))
var result []flow
for {
f, err := decodeDumpedFlow(scanner)
if err != nil {
if err == io.EOF {
break
}
log.Errorf("conntrack error: %v", err)
return result, err
}
result = append(result, f)
}
return result.Flows, nil
return result, nil
}
func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
var (
// Use ints for parsing unused fields since allocations
// are almost for free
unused [4]int
f flow
)
// Example:
// " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c"
// remove tags since they are optional and make parsing harder
line, err := getUntaggedLine(scanner)
if err != nil {
return flow{}, err
}
_, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d",
&f.Original.Layer4.Proto,
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&unused[2],
&unused[3],
&f.Independent.ID,
)
if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}
f.Reply.Layer4.Proto = f.Original.Layer4.Proto
return f, nil
}
func (c *conntrackWalker) stop() {
@@ -269,21 +390,8 @@ func (c *conntrackWalker) stop() {
}
func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {
// A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this
// host) and the 'reply' 4 tuple, which is what it has been rewritten to.
// This code finds those metas, which are identified by a Direction
// attribute.
for i := range f.Metas {
meta := &f.Metas[i]
switch meta.Direction {
case "original":
f.Original = meta
case "reply":
f.Reply = meta
case "independent":
f.Independent = meta
}
}
c.Lock()
defer c.Unlock()
// For not, I'm only interested in tcp connections - there is too much udp
// traffic going on (every container talking to weave dns, for example) to
@@ -292,9 +400,6 @@ func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {
return
}
c.Lock()
defer c.Unlock()
// Ignore flows for which we never saw an update; they are likely
// incomplete or wrong. See #1462.
switch {

View File

@@ -2,176 +2,192 @@ package endpoint
import (
"bufio"
"encoding/xml"
"io"
"strings"
"testing"
"time"
"github.com/weaveworks/common/exec"
testexec "github.com/weaveworks/common/test/exec"
"github.com/weaveworks/scope/test"
)
const conntrackCloseTag = "</conntrack>\n"
const bufferSize = 1024 * 1024
// Obtained though conntrack -E -p tcp -o id and then tweaked
const streamedFlowsSource = `[DESTROY] tcp 6 src=10.0.0.1 dst=127.0.0.1 sport=36826 dport=28106 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36826 [ASSURED] id=347275904
[NEW] tcp 6 120 SYN_SENT src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 [UNREPLIED] src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 id=347275904
[UPDATE] tcp 6 60 SYN_RECV src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 id=347275904
[UPDATE] tcp 6 432000 ESTABLISHED src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 [ASSURED] id=347275904`
func makeFlow(ty string) flow {
return flow{
XMLName: xml.Name{
Local: "flow",
var wantStreamedFlows = []flow{
{
Type: destroyType,
Original: meta{
Layer3: layer3{
SrcIP: "10.0.0.1",
DstIP: "127.0.0.1",
},
Layer4: layer4{
SrcPort: 36826,
DstPort: 28106,
Proto: "tcp",
},
},
Type: ty,
Reply: meta{
Layer3: layer3{
SrcIP: "10.0.0.2",
DstIP: "127.0.0.2",
},
Layer4: layer4{
SrcPort: 28107,
DstPort: 36826,
Proto: "tcp",
},
},
Independent: meta{
ID: 347275904,
},
},
{
Type: newType,
Original: meta{
Layer3: layer3{
SrcIP: "10.0.0.1",
DstIP: "127.0.0.1",
},
Layer4: layer4{
SrcPort: 36898,
DstPort: 28107,
Proto: "tcp",
},
},
Reply: meta{
Layer3: layer3{
SrcIP: "10.0.0.2",
DstIP: "127.0.0.2",
},
Layer4: layer4{
SrcPort: 28107,
DstPort: 36898,
Proto: "tcp",
},
},
Independent: meta{
ID: 347275904,
State: "SYN_SENT",
},
},
{
Type: updateType,
Original: meta{
Layer3: layer3{
SrcIP: "10.0.0.1",
DstIP: "127.0.0.1",
},
Layer4: layer4{
SrcPort: 36898,
DstPort: 28107,
Proto: "tcp",
},
},
Reply: meta{
Layer3: layer3{
SrcIP: "10.0.0.2",
DstIP: "127.0.0.2",
},
Layer4: layer4{
SrcPort: 28107,
DstPort: 36898,
Proto: "tcp",
},
},
Independent: meta{
ID: 347275904,
State: "SYN_RECV",
},
},
{
Type: updateType,
Original: meta{
Layer3: layer3{
SrcIP: "10.0.0.1",
DstIP: "127.0.0.1",
},
Layer4: layer4{
SrcPort: 36898,
DstPort: 28107,
Proto: "tcp",
},
},
Reply: meta{
Layer3: layer3{
SrcIP: "10.0.0.2",
DstIP: "127.0.0.2",
},
Layer4: layer4{
SrcPort: 28107,
DstPort: 36898,
Proto: "tcp",
},
},
Independent: meta{
ID: 347275904,
State: "ESTABLISHED",
},
},
}
func testFlowDecoding(t *testing.T, source string, want []flow, decoder func(scanner *bufio.Scanner) (flow, error)) {
scanner := bufio.NewScanner(strings.NewReader(source))
d := time.Millisecond * 100
for _, wantFlow := range want {
haveFlow, err := decoder(scanner)
if err != nil {
t.Fatalf("Unexpected decoding error: %v", err)
}
test.Poll(t, d, wantFlow, func() interface{} { return haveFlow })
}
if _, err := decodeStreamedFlow(scanner); err != io.EOF {
t.Fatalf("Unexpected error value on empty input: %v", err)
}
}
func addMeta(f *flow, dir, srcIP, dstIP string, srcPort, dstPort int) *meta {
meta := meta{
XMLName: xml.Name{
Local: "meta",
},
Direction: dir,
Layer3: layer3{
XMLName: xml.Name{
Local: "layer3",
},
SrcIP: srcIP,
DstIP: dstIP,
},
Layer4: layer4{
XMLName: xml.Name{
Local: "layer4",
},
SrcPort: srcPort,
DstPort: dstPort,
Proto: tcpProto,
},
}
f.Metas = append(f.Metas, meta)
return &meta
func TestStreamedFlowDecoding(t *testing.T) {
testFlowDecoding(t, streamedFlowsSource, wantStreamedFlows, decodeStreamedFlow)
}
func addIndependant(f *flow, id int64, state string) *meta {
meta := meta{
XMLName: xml.Name{
Local: "meta",
},
Direction: "independent",
ID: id,
State: state,
Layer3: layer3{
XMLName: xml.Name{
Local: "layer3",
// Obtained through conntrack -L -p tcp -o id
const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208`
var wantDumpedFlows = []flow{
{
Original: meta{
Layer3: layer3{
SrcIP: "10.0.2.2",
DstIP: "10.0.2.15",
},
Layer4: layer4{
SrcPort: 49911,
DstPort: 22,
Proto: "tcp",
},
},
Layer4: layer4{
XMLName: xml.Name{
Local: "layer4",
Reply: meta{
Layer3: layer3{
SrcIP: "10.0.2.15",
DstIP: "10.0.2.2",
},
Layer4: layer4{
SrcPort: 22,
DstPort: 49911,
Proto: "tcp",
},
},
}
f.Metas = append(f.Metas, meta)
return &meta
Independent: meta{
ID: 2993966208,
State: "ESTABLISHED",
},
},
}
func TestConntracker(t *testing.T) {
oldExecCmd, oldIsConntrackSupported := exec.Command, IsConntrackSupported
defer func() { exec.Command, IsConntrackSupported = oldExecCmd, oldIsConntrackSupported }()
IsConntrackSupported = func(_ string) error {
return nil
}
first := true
existingConnectionsReader, existingConnectionsWriter := io.Pipe()
reader, writer := io.Pipe()
exec.Command = func(name string, args ...string) exec.Cmd {
if first {
first = false
return testexec.NewMockCmd(existingConnectionsReader)
}
return testexec.NewMockCmd(reader)
}
flowWalker := newConntrackFlowWalker(true, "", bufferSize)
defer flowWalker.stop()
// First write out some empty xml for the existing connections
ecbw := bufio.NewWriter(existingConnectionsWriter)
if _, err := ecbw.WriteString(xmlHeader); err != nil {
t.Fatal(err)
}
if _, err := ecbw.WriteString(conntrackOpenTag); err != nil {
t.Fatal(err)
}
if _, err := ecbw.WriteString(conntrackCloseTag); err != nil {
t.Fatal(err)
}
if err := ecbw.Flush(); err != nil {
t.Fatal(err)
}
// Then write out eventa
bw := bufio.NewWriter(writer)
if _, err := bw.WriteString(xmlHeader); err != nil {
t.Fatal(err)
}
if _, err := bw.WriteString(conntrackOpenTag); err != nil {
t.Fatal(err)
}
if err := bw.Flush(); err != nil {
t.Fatal(err)
}
have := func() interface{} {
result := []flow{}
flowWalker.walkFlows(func(f flow) {
f.Original = nil
f.Reply = nil
f.Independent = nil
result = append(result, f)
})
return result
}
ts := 100 * time.Millisecond
// First, assert we have no flows
test.Poll(t, ts, []flow{}, have)
// Now add some flows
xmlEncoder := xml.NewEncoder(bw)
writeFlow := func(f flow) {
if err := xmlEncoder.Encode(f); err != nil {
t.Fatal(err)
}
if _, err := bw.WriteString("\n"); err != nil {
t.Fatal(err)
}
if err := bw.Flush(); err != nil {
t.Fatal(err)
}
}
flow1 := makeFlow(updateType)
addMeta(&flow1, "original", "1.2.3.4", "2.3.4.5", 2, 3)
addIndependant(&flow1, 1, "")
writeFlow(flow1)
test.Poll(t, ts, []flow{flow1}, have)
// Now check when we remove the flow, we still get it in the next Walk
flow2 := makeFlow(destroyType)
addMeta(&flow2, "original", "1.2.3.4", "2.3.4.5", 2, 3)
addIndependant(&flow2, 1, "")
writeFlow(flow2)
test.Poll(t, ts, []flow{flow1}, have)
test.Poll(t, ts, []flow{}, have)
// This time we're not going to remove it, but put it in state TIME_WAIT
flow1.Type = updateType
writeFlow(flow1)
test.Poll(t, ts, []flow{flow1}, have)
flow1.Metas[1].State = timeWait
writeFlow(flow1)
test.Poll(t, ts, []flow{flow1}, have)
test.Poll(t, ts, []flow{}, have)
func TestDumpedFlowDecoding(t *testing.T) {
testFlowDecoding(t, dumpedFlowsSource, wantDumpedFlows, decodeDumpedFlow)
}

View File

@@ -29,15 +29,40 @@ func TestNat(t *testing.T) {
// correctly.
// the setup is this:
//
// container2 (10.0.47.2:222222), host2 (2.3.4.5:22223) ->
// container2 (10.0.47.2:22222), host2 (2.3.4.5:22223) ->
// host1 (1.2.3.4:80), container1 (10.0.47.1:80)
// from the PoV of host1
{
f := makeFlow(updateType)
addIndependant(&f, 1, "")
f.Original = addMeta(&f, "original", "2.3.4.5", "1.2.3.4", 222222, 80)
f.Reply = addMeta(&f, "reply", "10.0.47.1", "2.3.4.5", 80, 222222)
f := flow{
Type: updateType,
Original: meta{
Layer3: layer3{
SrcIP: "2.3.4.5",
DstIP: "1.2.3.4",
},
Layer4: layer4{
SrcPort: 22222,
DstPort: 80,
Proto: "tcp",
},
},
Reply: meta{
Layer3: layer3{
SrcIP: "10.0.47.1",
DstIP: "2.3.4.5",
},
Layer4: layer4{
SrcPort: 80,
DstPort: 22222,
Proto: "tcp",
},
},
Independent: meta{
ID: 1,
},
}
ct := &mockFlowWalker{
flows: []flow{f},
}
@@ -69,10 +94,34 @@ func TestNat(t *testing.T) {
// form the PoV of host2
{
f := makeFlow(updateType)
addIndependant(&f, 2, "")
f.Original = addMeta(&f, "original", "10.0.47.2", "1.2.3.4", 22222, 80)
f.Reply = addMeta(&f, "reply", "1.2.3.4", "2.3.4.5", 80, 22223)
f := flow{
Type: updateType,
Original: meta{
Layer3: layer3{
SrcIP: "10.0.47.2",
DstIP: "1.2.3.4",
},
Layer4: layer4{
SrcPort: 22222,
DstPort: 80,
Proto: "tcp",
},
},
Reply: meta{
Layer3: layer3{
SrcIP: "1.2.3.4",
DstIP: "2.3.4.5",
},
Layer4: layer4{
SrcPort: 80,
DstPort: 22223,
Proto: "tcp",
},
},
Independent: meta{
ID: 2,
},
}
ct := &mockFlowWalker{
flows: []flow{f},
}