mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 18:20:27 +00:00
Cleanup
* Remove XML traces * Improve performance * Fix tests
This commit is contained in:
@@ -2,13 +2,12 @@ package endpoint
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/xml"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -20,48 +19,45 @@ import (
|
||||
const (
|
||||
// From https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
|
||||
// Check a tcp-related file for existence since we need tcp tracking
|
||||
procFileToCheck = "sys/net/netfilter/nf_conntrack_tcp_timeout_close"
|
||||
xmlHeader = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
|
||||
conntrackOpenTag = "<conntrack>\n"
|
||||
timeWait = "TIME_WAIT"
|
||||
tcpProto = "tcp"
|
||||
newType = "new"
|
||||
updateType = "update"
|
||||
destroyType = "destroy"
|
||||
procFileToCheck = "sys/net/netfilter/nf_conntrack_tcp_timeout_close"
|
||||
timeWait = "TIME_WAIT"
|
||||
tcpProto = "tcp"
|
||||
newType = "[NEW]"
|
||||
updateType = "[UPDATE]"
|
||||
destroyType = "[DESTROY]"
|
||||
)
|
||||
|
||||
var (
|
||||
destroyTypeB = []byte(destroyType)
|
||||
assured = []byte("[ASSURED] ")
|
||||
unreplied = []byte("[UNREPLIED] ")
|
||||
)
|
||||
|
||||
type layer3 struct {
|
||||
XMLName xml.Name `xml:"layer3"`
|
||||
SrcIP string `xml:"src"`
|
||||
DstIP string `xml:"dst"`
|
||||
SrcIP string
|
||||
DstIP string
|
||||
}
|
||||
|
||||
type layer4 struct {
|
||||
XMLName xml.Name `xml:"layer4"`
|
||||
SrcPort int `xml:"sport"`
|
||||
DstPort int `xml:"dport"`
|
||||
Proto string `xml:"protoname,attr"`
|
||||
SrcPort int
|
||||
DstPort int
|
||||
Proto string
|
||||
}
|
||||
|
||||
type meta struct {
|
||||
XMLName xml.Name `xml:"meta"`
|
||||
Direction string `xml:"direction,attr"`
|
||||
Layer3 layer3 `xml:"layer3"`
|
||||
Layer4 layer4 `xml:"layer4"`
|
||||
ID int64 `xml:"id"`
|
||||
State string `xml:"state"`
|
||||
Layer3 layer3
|
||||
Layer4 layer4
|
||||
ID int64
|
||||
State string
|
||||
}
|
||||
|
||||
type flow struct {
|
||||
XMLName xml.Name `xml:"flow"`
|
||||
Type string `xml:"type,attr"`
|
||||
|
||||
Original, Reply, Independent meta `xml:"-"`
|
||||
Type string
|
||||
Original, Reply, Independent meta
|
||||
}
|
||||
|
||||
type conntrack struct {
|
||||
XMLName xml.Name `xml:"conntrack"`
|
||||
Flows []flow `xml:"flow"`
|
||||
Flows []flow
|
||||
}
|
||||
|
||||
// flowWalker is something that maintains flows, and provides an accessor
|
||||
@@ -205,12 +201,12 @@ func (c *conntrackWalker) run() {
|
||||
c.cmd = cmd
|
||||
c.Unlock()
|
||||
|
||||
reader := bufio.NewReader(stdout)
|
||||
scanner := bufio.NewScanner(bufio.NewReader(stdout))
|
||||
defer log.Infof("conntrack exiting")
|
||||
|
||||
// Lop on the output stream
|
||||
for {
|
||||
f, err := decodeStreamedFlow(reader)
|
||||
f, err := decodeStreamedFlow(scanner)
|
||||
if err != nil {
|
||||
log.Errorf("conntrack error: %v", err)
|
||||
return
|
||||
@@ -219,24 +215,40 @@ func (c *conntrackWalker) run() {
|
||||
}
|
||||
}
|
||||
|
||||
func getUntaggedLine(reader *bufio.Reader) (string, error) {
|
||||
// TODO: read bytes?
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return "", err
|
||||
// Get a line without [ASSURED]/[UNREPLIED] tags (it simplifies parsing)
|
||||
func getUntaggedLine(scanner *bufio.Scanner) ([]byte, error) {
|
||||
success := scanner.Scan()
|
||||
if !success {
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, io.EOF
|
||||
}
|
||||
// Remove [ASSURED]/[UNREPLIED] tags inplace
|
||||
// TODO: replace in-place?
|
||||
line = strings.Replace(line, "[ASSURED] ", "", -1)
|
||||
line = strings.Replace(line, "[UNREPLIED] ", "", -1)
|
||||
line := scanner.Bytes()
|
||||
// Remove [ASSURED]/[UNREPLIED] tags
|
||||
line = removeInplace(line, assured)
|
||||
line = removeInplace(line, unreplied)
|
||||
return line, nil
|
||||
}
|
||||
|
||||
func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
|
||||
func removeInplace(s, sep []byte) []byte {
|
||||
// TODO: See if we can get better performance
|
||||
// removing multiple substrings at once (with index/suffixarray New()+Lookup())
|
||||
// Probably not worth it for only two substrings occurring once.
|
||||
index := bytes.Index(s, sep)
|
||||
if index < 0 {
|
||||
return s
|
||||
}
|
||||
copy(s[index:], s[index+len(sep):])
|
||||
return s[:len(s)-len(sep)]
|
||||
}
|
||||
|
||||
func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
|
||||
var (
|
||||
// TODO: use []byte/int where possible?
|
||||
omit [4]string
|
||||
f flow
|
||||
// Use ints for parsing unused fields since their allocations
|
||||
// are almost for free
|
||||
unused [2]int
|
||||
f flow
|
||||
)
|
||||
|
||||
// Examples:
|
||||
@@ -246,18 +258,18 @@ func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
|
||||
// " [DESTROY] tcp 6 src=172.17.0.1 dst=172.17.0.1 sport=34078 dport=53 src=172.17.0.1 dst=10.0.2.15 sport=53 dport=34078 id=3668417984" (note how the timeout and state field is missing)
|
||||
|
||||
// Remove tags since they are optional and make parsing harder
|
||||
line, err := getUntaggedLine(reader)
|
||||
line, err := getUntaggedLine(scanner)
|
||||
if err != nil {
|
||||
return flow{}, err
|
||||
}
|
||||
|
||||
// TODO: refactor and probably create a fully-fledged parser, this is just good enough for performance testing
|
||||
if strings.Contains(line, "[DESTROY]") {
|
||||
line = bytes.TrimLeft(line, " ")
|
||||
if bytes.HasPrefix(line, destroyTypeB) {
|
||||
// Destroy events don't have a timeout or state field
|
||||
_, err = fmt.Sscanf(line, "%s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%x\n",
|
||||
_, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
|
||||
&f.Type,
|
||||
&f.Original.Layer4.Proto,
|
||||
&omit[0],
|
||||
&unused[0],
|
||||
&f.Original.Layer3.SrcIP,
|
||||
&f.Original.Layer3.DstIP,
|
||||
&f.Original.Layer4.SrcPort,
|
||||
@@ -269,11 +281,11 @@ func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
|
||||
&f.Independent.ID,
|
||||
)
|
||||
} else {
|
||||
_, err = fmt.Sscanf(line, "%s %s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%x\n",
|
||||
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
|
||||
&f.Type,
|
||||
&f.Original.Layer4.Proto,
|
||||
&omit[0],
|
||||
&omit[1],
|
||||
&unused[0],
|
||||
&unused[1],
|
||||
&f.Independent.State,
|
||||
&f.Original.Layer3.SrcIP,
|
||||
&f.Original.Layer3.DstIP,
|
||||
@@ -290,10 +302,6 @@ func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
|
||||
if err != nil {
|
||||
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
|
||||
}
|
||||
if len(f.Type) < 3 || f.Type[0] != '[' || f.Type[len(f.Type)-1] != ']' {
|
||||
return flow{}, fmt.Errorf("Unexpected type format: %q", f.Type)
|
||||
}
|
||||
f.Type = strings.ToLower(f.Type[1 : len(f.Type)-1])
|
||||
f.Reply.Layer4.Proto = f.Original.Layer4.Proto
|
||||
return f, nil
|
||||
}
|
||||
@@ -314,10 +322,10 @@ func (c *conntrackWalker) existingConnections() ([]flow, error) {
|
||||
}
|
||||
}()
|
||||
|
||||
reader := bufio.NewReader(stdout)
|
||||
scanner := bufio.NewScanner(bufio.NewReader(stdout))
|
||||
var result []flow
|
||||
for {
|
||||
f, err := decodeDumpedFlow(reader)
|
||||
f, err := decodeDumpedFlow(scanner)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
@@ -330,25 +338,26 @@ func (c *conntrackWalker) existingConnections() ([]flow, error) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func decodeDumpedFlow(reader *bufio.Reader) (flow, error) {
|
||||
func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
|
||||
var (
|
||||
// TODO: use int/[]byte where possible?
|
||||
omit [4]string
|
||||
f flow
|
||||
// Use ints for parsing unused fields since allocations
|
||||
// are almost for free
|
||||
unused [4]int
|
||||
f flow
|
||||
)
|
||||
|
||||
// Example:
|
||||
// " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c"
|
||||
// remove tags since they are optional and make parsing harder
|
||||
line, err := getUntaggedLine(reader)
|
||||
line, err := getUntaggedLine(scanner)
|
||||
if err != nil {
|
||||
return flow{}, err
|
||||
}
|
||||
|
||||
_, err = fmt.Sscanf(line, "%s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d %s %s id=%x\n",
|
||||
_, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d",
|
||||
&f.Original.Layer4.Proto,
|
||||
&omit[0],
|
||||
&omit[1],
|
||||
&unused[0],
|
||||
&unused[1],
|
||||
&f.Independent.State,
|
||||
&f.Original.Layer3.SrcIP,
|
||||
&f.Original.Layer3.DstIP,
|
||||
@@ -358,8 +367,8 @@ func decodeDumpedFlow(reader *bufio.Reader) (flow, error) {
|
||||
&f.Reply.Layer3.DstIP,
|
||||
&f.Reply.Layer4.SrcPort,
|
||||
&f.Reply.Layer4.DstPort,
|
||||
&omit[2],
|
||||
&omit[3],
|
||||
&unused[2],
|
||||
&unused[3],
|
||||
&f.Independent.ID,
|
||||
)
|
||||
|
||||
|
||||
@@ -2,176 +2,192 @@ package endpoint
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/xml"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/weaveworks/common/exec"
|
||||
testexec "github.com/weaveworks/common/test/exec"
|
||||
"github.com/weaveworks/scope/test"
|
||||
)
|
||||
|
||||
const conntrackCloseTag = "</conntrack>\n"
|
||||
const bufferSize = 1024 * 1024
|
||||
// Obtained though conntrack -E -p tcp -o id and then tweaked
|
||||
const streamedFlowsSource = `[DESTROY] tcp 6 src=10.0.0.1 dst=127.0.0.1 sport=36826 dport=28106 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36826 [ASSURED] id=347275904
|
||||
[NEW] tcp 6 120 SYN_SENT src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 [UNREPLIED] src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 id=347275904
|
||||
[UPDATE] tcp 6 60 SYN_RECV src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 id=347275904
|
||||
[UPDATE] tcp 6 432000 ESTABLISHED src=10.0.0.1 dst=127.0.0.1 sport=36898 dport=28107 src=10.0.0.2 dst=127.0.0.2 sport=28107 dport=36898 [ASSURED] id=347275904`
|
||||
|
||||
func makeFlow(ty string) flow {
|
||||
return flow{
|
||||
XMLName: xml.Name{
|
||||
Local: "flow",
|
||||
var wantStreamedFlows = []flow{
|
||||
{
|
||||
Type: destroyType,
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.1",
|
||||
DstIP: "127.0.0.1",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 36826,
|
||||
DstPort: 28106,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Type: ty,
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.2",
|
||||
DstIP: "127.0.0.2",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 28107,
|
||||
DstPort: 36826,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 347275904,
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: newType,
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.1",
|
||||
DstIP: "127.0.0.1",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 36898,
|
||||
DstPort: 28107,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.2",
|
||||
DstIP: "127.0.0.2",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 28107,
|
||||
DstPort: 36898,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 347275904,
|
||||
State: "SYN_SENT",
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: updateType,
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.1",
|
||||
DstIP: "127.0.0.1",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 36898,
|
||||
DstPort: 28107,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.2",
|
||||
DstIP: "127.0.0.2",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 28107,
|
||||
DstPort: 36898,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 347275904,
|
||||
State: "SYN_RECV",
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: updateType,
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.1",
|
||||
DstIP: "127.0.0.1",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 36898,
|
||||
DstPort: 28107,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.0.2",
|
||||
DstIP: "127.0.0.2",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 28107,
|
||||
DstPort: 36898,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 347275904,
|
||||
State: "ESTABLISHED",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func testFlowDecoding(t *testing.T, source string, want []flow, decoder func(scanner *bufio.Scanner) (flow, error)) {
|
||||
scanner := bufio.NewScanner(strings.NewReader(source))
|
||||
d := time.Millisecond * 100
|
||||
for _, wantFlow := range want {
|
||||
haveFlow, err := decoder(scanner)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected decoding error: %v", err)
|
||||
}
|
||||
test.Poll(t, d, wantFlow, func() interface{} { return haveFlow })
|
||||
}
|
||||
|
||||
if _, err := decodeStreamedFlow(scanner); err != io.EOF {
|
||||
t.Fatalf("Unexpected error value on empty input: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func addMeta(f *flow, dir, srcIP, dstIP string, srcPort, dstPort int) *meta {
|
||||
meta := meta{
|
||||
XMLName: xml.Name{
|
||||
Local: "meta",
|
||||
},
|
||||
Direction: dir,
|
||||
Layer3: layer3{
|
||||
XMLName: xml.Name{
|
||||
Local: "layer3",
|
||||
},
|
||||
SrcIP: srcIP,
|
||||
DstIP: dstIP,
|
||||
},
|
||||
Layer4: layer4{
|
||||
XMLName: xml.Name{
|
||||
Local: "layer4",
|
||||
},
|
||||
SrcPort: srcPort,
|
||||
DstPort: dstPort,
|
||||
Proto: tcpProto,
|
||||
},
|
||||
}
|
||||
f.Metas = append(f.Metas, meta)
|
||||
return &meta
|
||||
func TestStreamedFlowDecoding(t *testing.T) {
|
||||
testFlowDecoding(t, streamedFlowsSource, wantStreamedFlows, decodeStreamedFlow)
|
||||
}
|
||||
|
||||
func addIndependant(f *flow, id int64, state string) *meta {
|
||||
meta := meta{
|
||||
XMLName: xml.Name{
|
||||
Local: "meta",
|
||||
},
|
||||
Direction: "independent",
|
||||
ID: id,
|
||||
State: state,
|
||||
Layer3: layer3{
|
||||
XMLName: xml.Name{
|
||||
Local: "layer3",
|
||||
// Obtained through conntrack -L -p tcp -o id
|
||||
const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208`
|
||||
|
||||
var wantDumpedFlows = []flow{
|
||||
{
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.2.2",
|
||||
DstIP: "10.0.2.15",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 49911,
|
||||
DstPort: 22,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Layer4: layer4{
|
||||
XMLName: xml.Name{
|
||||
Local: "layer4",
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.2.15",
|
||||
DstIP: "10.0.2.2",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 22,
|
||||
DstPort: 49911,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
}
|
||||
f.Metas = append(f.Metas, meta)
|
||||
return &meta
|
||||
Independent: meta{
|
||||
ID: 2993966208,
|
||||
State: "ESTABLISHED",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestConntracker(t *testing.T) {
|
||||
oldExecCmd, oldIsConntrackSupported := exec.Command, IsConntrackSupported
|
||||
defer func() { exec.Command, IsConntrackSupported = oldExecCmd, oldIsConntrackSupported }()
|
||||
|
||||
IsConntrackSupported = func(_ string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
first := true
|
||||
existingConnectionsReader, existingConnectionsWriter := io.Pipe()
|
||||
reader, writer := io.Pipe()
|
||||
exec.Command = func(name string, args ...string) exec.Cmd {
|
||||
if first {
|
||||
first = false
|
||||
return testexec.NewMockCmd(existingConnectionsReader)
|
||||
}
|
||||
return testexec.NewMockCmd(reader)
|
||||
}
|
||||
|
||||
flowWalker := newConntrackFlowWalker(true, "", bufferSize)
|
||||
defer flowWalker.stop()
|
||||
|
||||
// First write out some empty xml for the existing connections
|
||||
ecbw := bufio.NewWriter(existingConnectionsWriter)
|
||||
if _, err := ecbw.WriteString(xmlHeader); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := ecbw.WriteString(conntrackOpenTag); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := ecbw.WriteString(conntrackCloseTag); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := ecbw.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Then write out eventa
|
||||
bw := bufio.NewWriter(writer)
|
||||
if _, err := bw.WriteString(xmlHeader); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := bw.WriteString(conntrackOpenTag); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := bw.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
have := func() interface{} {
|
||||
result := []flow{}
|
||||
flowWalker.walkFlows(func(f flow) {
|
||||
f.Original = nil
|
||||
f.Reply = nil
|
||||
f.Independent = nil
|
||||
result = append(result, f)
|
||||
})
|
||||
return result
|
||||
}
|
||||
ts := 100 * time.Millisecond
|
||||
|
||||
// First, assert we have no flows
|
||||
test.Poll(t, ts, []flow{}, have)
|
||||
|
||||
// Now add some flows
|
||||
xmlEncoder := xml.NewEncoder(bw)
|
||||
writeFlow := func(f flow) {
|
||||
if err := xmlEncoder.Encode(f); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := bw.WriteString("\n"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := bw.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
flow1 := makeFlow(updateType)
|
||||
addMeta(&flow1, "original", "1.2.3.4", "2.3.4.5", 2, 3)
|
||||
addIndependant(&flow1, 1, "")
|
||||
writeFlow(flow1)
|
||||
test.Poll(t, ts, []flow{flow1}, have)
|
||||
|
||||
// Now check when we remove the flow, we still get it in the next Walk
|
||||
flow2 := makeFlow(destroyType)
|
||||
addMeta(&flow2, "original", "1.2.3.4", "2.3.4.5", 2, 3)
|
||||
addIndependant(&flow2, 1, "")
|
||||
writeFlow(flow2)
|
||||
test.Poll(t, ts, []flow{flow1}, have)
|
||||
test.Poll(t, ts, []flow{}, have)
|
||||
|
||||
// This time we're not going to remove it, but put it in state TIME_WAIT
|
||||
flow1.Type = updateType
|
||||
writeFlow(flow1)
|
||||
test.Poll(t, ts, []flow{flow1}, have)
|
||||
|
||||
flow1.Metas[1].State = timeWait
|
||||
writeFlow(flow1)
|
||||
test.Poll(t, ts, []flow{flow1}, have)
|
||||
test.Poll(t, ts, []flow{}, have)
|
||||
func TestDumpedFlowDecoding(t *testing.T) {
|
||||
testFlowDecoding(t, dumpedFlowsSource, wantDumpedFlows, decodeDumpedFlow)
|
||||
}
|
||||
|
||||
@@ -29,15 +29,40 @@ func TestNat(t *testing.T) {
|
||||
// correctly.
|
||||
// the setup is this:
|
||||
//
|
||||
// container2 (10.0.47.2:222222), host2 (2.3.4.5:22223) ->
|
||||
// container2 (10.0.47.2:22222), host2 (2.3.4.5:22223) ->
|
||||
// host1 (1.2.3.4:80), container1 (10.0.47.1:80)
|
||||
|
||||
// from the PoV of host1
|
||||
{
|
||||
f := makeFlow(updateType)
|
||||
addIndependant(&f, 1, "")
|
||||
f.Original = addMeta(&f, "original", "2.3.4.5", "1.2.3.4", 222222, 80)
|
||||
f.Reply = addMeta(&f, "reply", "10.0.47.1", "2.3.4.5", 80, 222222)
|
||||
f := flow{
|
||||
Type: updateType,
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "2.3.4.5",
|
||||
DstIP: "1.2.3.4",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 22222,
|
||||
DstPort: 80,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.47.1",
|
||||
DstIP: "2.3.4.5",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 80,
|
||||
DstPort: 22222,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 1,
|
||||
},
|
||||
}
|
||||
|
||||
ct := &mockFlowWalker{
|
||||
flows: []flow{f},
|
||||
}
|
||||
@@ -69,10 +94,34 @@ func TestNat(t *testing.T) {
|
||||
|
||||
// form the PoV of host2
|
||||
{
|
||||
f := makeFlow(updateType)
|
||||
addIndependant(&f, 2, "")
|
||||
f.Original = addMeta(&f, "original", "10.0.47.2", "1.2.3.4", 22222, 80)
|
||||
f.Reply = addMeta(&f, "reply", "1.2.3.4", "2.3.4.5", 80, 22223)
|
||||
f := flow{
|
||||
Type: updateType,
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "10.0.47.2",
|
||||
DstIP: "1.2.3.4",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 22222,
|
||||
DstPort: 80,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "1.2.3.4",
|
||||
DstIP: "2.3.4.5",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 80,
|
||||
DstPort: 22223,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 2,
|
||||
},
|
||||
}
|
||||
ct := &mockFlowWalker{
|
||||
flows: []flow{f},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user