From 5c02dfcbd2cd30d019a6eb065425c0aba375b460 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 15 Dec 2016 15:50:12 +0000 Subject: [PATCH] Complete hacky manual parser --- probe/endpoint/conntrack.go | 101 ++++++++++++++++++++++++++++-------- 1 file changed, 78 insertions(+), 23 deletions(-) diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index 31d53984e..9944e6445 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "sync" "time" @@ -230,36 +231,82 @@ func makeEmptyFlow() flow { return f } +func getUntaggedLine(reader *bufio.Reader) (string, error) { + // TODO: read bytes? + line, err := reader.ReadString('\n') + if err != nil { + return "", err + } + // Remove [ASSURED]/[UNREPLIED] tags inplace + // TODO: replace in-place? + line = strings.Replace(line, "[ASSURED] ", "", -1) + line = strings.Replace(line, "[UNREPLIED] ", "", -1) + return line, nil +} + func decodeStreamedFlow(reader *bufio.Reader) (flow, error) { var ( // TODO: use ints where possible? omit [10]string f = makeEmptyFlow() ) - l, _ := reader.ReadString('\n') - // " [NEW] tcp 6 120 SYN_SENT src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776\n" - n, err := fmt.Sscanf(l, " %s tcp %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d %s id=%x\n", - &omit[0], - &omit[1], - &omit[2], - &f.Independent.State, - &f.Original.Layer3.SrcIP, - &f.Original.Layer3.DstIP, - &f.Original.Layer4.SrcPort, - &f.Original.Layer4.DstPort, - &f.Reply.Layer3.SrcIP, - &f.Reply.Layer3.DstIP, - &f.Reply.Layer4.SrcPort, - &f.Reply.Layer4.DstPort, - &omit[3], - &f.Independent.ID, - ) + // Examples: + // " [UPDATE] udp 17 29 src=192.168.2.100 dst=192.168.2.1 sport=57767 dport=53 src=192.168.2.1 dst=192.168.2.100 sport=53 dport=57767" + // " [NEW] tcp 6 120 SYN_SENT src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776" + // " [UPDATE] tcp 6 120 TIME_WAIT src=10.0.2.15 dst=10.0.2.15 sport=51154 dport=4040 src=10.0.2.15 dst=10.0.2.15 sport=4040 dport=51154 [ASSURED] id=3663628160" + // " [DESTROY] tcp 6 src=172.17.0.1 dst=172.17.0.1 sport=34078 dport=53 src=172.17.0.1 dst=10.0.2.15 sport=53 dport=34078 id=3668417984" (note how the timeout and state field is missing) + + // Remove tags since they are optional and make parsing harder + line, err := getUntaggedLine(reader) if err != nil { - log.Infof("Streamed Error: %#v, n=%d, line = %#q", err, n, l) return flow{}, err } - log.Infof("Streamed flow: %v", f) + + // TODO: refactor and probably create a fully-fledged parser, this is just good enough for performance testing + if strings.Contains(line, "[DESTROY]") { + // Destroy events don't have a timeout or state field + _, err = fmt.Sscanf(line, "%s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%x\n", + &f.Type, + &f.Original.Layer4.Proto, + &omit[0], + &f.Original.Layer3.SrcIP, + &f.Original.Layer3.DstIP, + &f.Original.Layer4.SrcPort, + &f.Original.Layer4.DstPort, + &f.Reply.Layer3.SrcIP, + &f.Reply.Layer3.DstIP, + &f.Reply.Layer4.SrcPort, + &f.Reply.Layer4.DstPort, + &f.Independent.ID, + ) + } else { + _, err = fmt.Sscanf(line, "%s %s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%x\n", + &f.Type, + &f.Original.Layer4.Proto, + &omit[0], + &omit[1], + &f.Independent.State, + &f.Original.Layer3.SrcIP, + &f.Original.Layer3.DstIP, + &f.Original.Layer4.SrcPort, + &f.Original.Layer4.DstPort, + &f.Reply.Layer3.SrcIP, + &f.Reply.Layer3.DstIP, + &f.Reply.Layer4.SrcPort, + &f.Reply.Layer4.DstPort, + &f.Independent.ID, + ) + } + + if err != nil { + return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err) + } + if len(f.Type) < 3 || f.Type[0] != '[' || f.Type[len(f.Type)-1] != ']' { + return flow{}, fmt.Errorf("Unexpected type format: %q", f.Type) + } + f.Type = strings.ToLower(f.Type[1 : len(f.Type)-1]) + f.Reply.Layer4.Proto = f.Original.Layer4.Proto return f, nil } @@ -301,7 +348,16 @@ func readDumpedFlow(reader *bufio.Reader) (flow, error) { omit [10]string f = makeEmptyFlow() ) - n, err := fmt.Fscanf(reader, "%s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d %s %s %s id=%x\n", + + // Example: + // " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c" + // remove tags since they are optional and make parsing harder + line, err := getUntaggedLine(reader) + if err != nil { + return flow{}, err + } + + _, err = fmt.Sscanf(line, "%s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d %s %s id=%x\n", &f.Original.Layer4.Proto, &omit[0], &omit[1], @@ -316,12 +372,11 @@ func readDumpedFlow(reader *bufio.Reader) (flow, error) { &f.Reply.Layer4.DstPort, &omit[2], &omit[3], - &omit[4], &f.Independent.ID, ) if err != nil { - return flow{}, err + return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err) } f.Reply.Layer4.Proto = f.Original.Layer4.Proto