mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 02:00:43 +00:00
probe: conntrack: fix output parsing
With net.netfilter.nf_conntrack_acct = 1, conntrack adds the following fields in the output: packets=3 bytes=164 And with SELinux (e.g. Fedora), conntrack adds: secctx=... The parsing with fmt.Sscanf introduced in #2095 was unfortunately rejecting lines with those fields. This patch fixes that by adding more complicated parsing in decodeFlowKeyValues() with FieldsFunc and SplitN. Fixes #2117 Regression from #2095
This commit is contained in:
@@ -8,8 +8,10 @@ import (
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
|
||||
@@ -250,6 +252,58 @@ func removeInplace(s, sep []byte) []byte {
|
||||
return s[:len(s)-len(sep)]
|
||||
}
|
||||
|
||||
// decodeFlowKeyValues parses the key-values from a conntrack line and updates the flow
|
||||
// It only considers the following key-values:
|
||||
// src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776
|
||||
// Keys can be present twice, so the order is important.
|
||||
// Conntrack could add other key-values such as secctx=, packets=, bytes=. Those are ignored.
|
||||
func decodeFlowKeyValues(line []byte, f *flow) error {
|
||||
var err error
|
||||
for _, field := range strings.FieldsFunc(string(line), func(c rune) bool { return unicode.IsSpace(c) }) {
|
||||
kv := strings.SplitN(field, "=", 2)
|
||||
if len(kv) != 2 {
|
||||
continue
|
||||
}
|
||||
key := kv[0]
|
||||
value := kv[1]
|
||||
firstTupleSet := f.Original.Layer4.DstPort != 0
|
||||
switch {
|
||||
case key == "src":
|
||||
if !firstTupleSet {
|
||||
f.Original.Layer3.SrcIP = value
|
||||
} else {
|
||||
f.Reply.Layer3.SrcIP = value
|
||||
}
|
||||
|
||||
case key == "dst":
|
||||
if !firstTupleSet {
|
||||
f.Original.Layer3.DstIP = value
|
||||
} else {
|
||||
f.Reply.Layer3.DstIP = value
|
||||
}
|
||||
|
||||
case key == "sport":
|
||||
if !firstTupleSet {
|
||||
f.Original.Layer4.SrcPort, err = strconv.Atoi(value)
|
||||
} else {
|
||||
f.Reply.Layer4.SrcPort, err = strconv.Atoi(value)
|
||||
}
|
||||
|
||||
case key == "dport":
|
||||
if !firstTupleSet {
|
||||
f.Original.Layer4.DstPort, err = strconv.Atoi(value)
|
||||
} else {
|
||||
f.Reply.Layer4.DstPort, err = strconv.Atoi(value)
|
||||
}
|
||||
|
||||
case key == "id":
|
||||
f.Independent.ID, err = strconv.ParseInt(value, 10, 64)
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
|
||||
var (
|
||||
// Use ints for parsing unused fields since their allocations
|
||||
@@ -273,42 +327,29 @@ func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
|
||||
line = bytes.TrimLeft(line, " ")
|
||||
if bytes.HasPrefix(line, destroyTypeB) {
|
||||
// Destroy events don't have a timeout or state field
|
||||
_, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
|
||||
_, err = fmt.Sscanf(string(line), "%s %s %d",
|
||||
&f.Type,
|
||||
&f.Original.Layer4.Proto,
|
||||
&unused[0],
|
||||
&f.Original.Layer3.SrcIP,
|
||||
&f.Original.Layer3.DstIP,
|
||||
&f.Original.Layer4.SrcPort,
|
||||
&f.Original.Layer4.DstPort,
|
||||
&f.Reply.Layer3.SrcIP,
|
||||
&f.Reply.Layer3.DstIP,
|
||||
&f.Reply.Layer4.SrcPort,
|
||||
&f.Reply.Layer4.DstPort,
|
||||
&f.Independent.ID,
|
||||
)
|
||||
} else {
|
||||
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
|
||||
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s",
|
||||
&f.Type,
|
||||
&f.Original.Layer4.Proto,
|
||||
&unused[0],
|
||||
&unused[1],
|
||||
&f.Independent.State,
|
||||
&f.Original.Layer3.SrcIP,
|
||||
&f.Original.Layer3.DstIP,
|
||||
&f.Original.Layer4.SrcPort,
|
||||
&f.Original.Layer4.DstPort,
|
||||
&f.Reply.Layer3.SrcIP,
|
||||
&f.Reply.Layer3.DstIP,
|
||||
&f.Reply.Layer4.SrcPort,
|
||||
&f.Reply.Layer4.DstPort,
|
||||
&f.Independent.ID,
|
||||
)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
|
||||
}
|
||||
|
||||
err = decodeFlowKeyValues(line, &f)
|
||||
if err != nil {
|
||||
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
|
||||
}
|
||||
|
||||
f.Reply.Layer4.Proto = f.Original.Layer4.Proto
|
||||
return f, nil
|
||||
}
|
||||
@@ -353,32 +394,27 @@ func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
|
||||
f flow
|
||||
)
|
||||
|
||||
// Example:
|
||||
// " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c"
|
||||
// Examples with different formats:
|
||||
// With SELinux, there is a "secctx="
|
||||
// After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes="
|
||||
//
|
||||
// "tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088"
|
||||
// "tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208"
|
||||
// "tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880"
|
||||
// "tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840"
|
||||
|
||||
// remove tags since they are optional and make parsing harder
|
||||
line, err := getUntaggedLine(scanner)
|
||||
if err != nil {
|
||||
return flow{}, err
|
||||
}
|
||||
|
||||
_, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d",
|
||||
&f.Original.Layer4.Proto,
|
||||
&unused[0],
|
||||
&unused[1],
|
||||
&f.Independent.State,
|
||||
&f.Original.Layer3.SrcIP,
|
||||
&f.Original.Layer3.DstIP,
|
||||
&f.Original.Layer4.SrcPort,
|
||||
&f.Original.Layer4.DstPort,
|
||||
&f.Reply.Layer3.SrcIP,
|
||||
&f.Reply.Layer3.DstIP,
|
||||
&f.Reply.Layer4.SrcPort,
|
||||
&f.Reply.Layer4.DstPort,
|
||||
&unused[2],
|
||||
&unused[3],
|
||||
&f.Independent.ID,
|
||||
)
|
||||
_, err = fmt.Sscanf(string(line), "%s %d %d %s", &f.Original.Layer4.Proto, &unused[0], &unused[1], &f.Independent.State)
|
||||
if err != nil {
|
||||
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
|
||||
}
|
||||
|
||||
err = decodeFlowKeyValues(line, &f)
|
||||
if err != nil {
|
||||
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
|
||||
}
|
||||
|
||||
@@ -155,7 +155,11 @@ func TestStreamedFlowDecoding(t *testing.T) {
|
||||
}
|
||||
|
||||
// Obtained through conntrack -L -p tcp -o id
|
||||
const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208`
|
||||
// With SELinux, there is a "secctx="
|
||||
// After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes="
|
||||
const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208
|
||||
tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880
|
||||
tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840`
|
||||
|
||||
var wantDumpedFlows = []flow{
|
||||
{
|
||||
@@ -186,6 +190,62 @@ var wantDumpedFlows = []flow{
|
||||
State: "ESTABLISHED",
|
||||
},
|
||||
},
|
||||
{
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "172.17.0.5",
|
||||
DstIP: "172.17.0.2",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 47010,
|
||||
DstPort: 80,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "172.17.0.2",
|
||||
DstIP: "172.17.0.5",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 80,
|
||||
DstPort: 47010,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 4001098880,
|
||||
State: "ESTABLISHED",
|
||||
},
|
||||
},
|
||||
{
|
||||
Original: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "192.168.35.116",
|
||||
DstIP: "216.58.213.227",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 49862,
|
||||
DstPort: 443,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Reply: meta{
|
||||
Layer3: layer3{
|
||||
SrcIP: "216.58.213.227",
|
||||
DstIP: "192.168.35.116",
|
||||
},
|
||||
Layer4: layer4{
|
||||
SrcPort: 443,
|
||||
DstPort: 49862,
|
||||
Proto: "tcp",
|
||||
},
|
||||
},
|
||||
Independent: meta{
|
||||
ID: 943643840,
|
||||
State: "ESTABLISHED",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestDumpedFlowDecoding(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user