From 6a7b940965ae4be2be8cf07748b06160f84fd664 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Fri, 27 Aug 2021 02:33:00 +0300 Subject: [PATCH] Fix the performance in TCP stream factory. Make it create two `tcpReader`(s) per extension --- tap/extensions/amqp/main.go | 2 - tap/extensions/kafka/main.go | 2 - tap/tcp_reader.go | 16 ++------ tap/tcp_stream.go | 20 ++++++--- tap/tcp_stream_factory.go | 78 +++++++++++++++++++----------------- 5 files changed, 59 insertions(+), 59 deletions(-) diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 85d942d13..c45a53b0f 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -206,8 +206,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co // log.Printf("unexpected frame: %+v\n", f) } } - - return errors.New("AMQP EOF") } func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index f63c8cbf5..55c7bce3f 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -50,8 +50,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } } } - - return nil } func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index ac3f7bdd6..829a2eab1 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -2,10 +2,8 @@ package tap import ( "bufio" - "bytes" "fmt" "io" - "log" "sync" "time" @@ -56,6 +54,7 @@ type tcpReader struct { messageCount uint packetsSeen uint outboundLinkWriter *OutboundLinkWriter + Extension *api.Extension Emitter api.Emitter } @@ -93,16 +92,9 @@ func (h *tcpReader) Read(p []byte) (int, error) { func (h *tcpReader) run(wg *sync.WaitGroup, counterPair *api.CounterPair) { defer wg.Done() - - data, err := io.ReadAll(h) + b := bufio.NewReader(h) + err := h.Extension.Dissector.Dissect(b, h.isClient, h.tcpID, counterPair, h.Emitter) if err != nil { - log.Printf("Corrupted TCP stream, unable to read!") - return - } - r := bytes.NewReader(data) - - for _, extension := range extensions { - r.Reset(data) - extension.Dissector.Dissect(bufio.NewReader(r), h.isClient, h.tcpID, counterPair, h.Emitter) + io.ReadAll(b) } } diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index b442d070f..55ab74558 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -22,8 +22,8 @@ type tcpStream struct { net, transport gopacket.Flow isDNS bool isTapTarget bool - client tcpReader - server tcpReader + clients []tcpReader + servers []tcpReader urls []string ident string sync.Mutex @@ -145,9 +145,13 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass // This channel is read by an tcpReader object statsTracker.incReassembledTcpPayloadsCount() if dir == reassembly.TCPDirClientToServer { - t.client.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + for _, reader := range t.clients { + reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + } } else { - t.server.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + for _, reader := range t.servers { + reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + } } } } @@ -156,8 +160,12 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { Trace("%s: Connection closed", t.ident) if t.isTapTarget { - close(t.client.msgQueue) - close(t.server.msgQueue) + for _, reader := range t.clients { + close(reader.msgQueue) + } + for _, reader := range t.servers { + close(reader.msgQueue) + } } // do not remove the connection to allow last ACK return false diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index b3f39bc42..cf71d21a7 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -48,44 +48,48 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T optchecker: reassembly.NewTCPOptionCheck(), } if stream.isTapTarget { - counterPair := &api.CounterPair{ - Request: 0, - Response: 0, + for i, extension := range extensions { + counterPair := &api.CounterPair{ + Request: 0, + Response: 0, + } + stream.clients = append(stream.clients, tcpReader{ + msgQueue: make(chan tcpReaderDataMsg), + ident: fmt.Sprintf("%s %s", net, transport), + tcpID: &api.TcpID{ + SrcIP: srcIp, + DstIP: dstIp, + SrcPort: srcPort, + DstPort: dstPort, + }, + parent: stream, + isClient: true, + isOutgoing: props.isOutgoing, + outboundLinkWriter: factory.outboundLinkWriter, + Extension: extension, + Emitter: factory.Emitter, + }) + stream.servers = append(stream.servers, tcpReader{ + msgQueue: make(chan tcpReaderDataMsg), + ident: fmt.Sprintf("%s %s", net, transport), + tcpID: &api.TcpID{ + SrcIP: net.Dst().String(), + DstIP: net.Src().String(), + SrcPort: transport.Dst().String(), + DstPort: transport.Src().String(), + }, + parent: stream, + isClient: false, + isOutgoing: props.isOutgoing, + outboundLinkWriter: factory.outboundLinkWriter, + Extension: extension, + Emitter: factory.Emitter, + }) + factory.wg.Add(2) + // Start reading from channel stream.reader.bytes + go stream.clients[i].run(&factory.wg, counterPair) + go stream.servers[i].run(&factory.wg, counterPair) } - stream.client = tcpReader{ - msgQueue: make(chan tcpReaderDataMsg), - ident: fmt.Sprintf("%s %s", net, transport), - tcpID: &api.TcpID{ - SrcIP: srcIp, - DstIP: dstIp, - SrcPort: srcPort, - DstPort: dstPort, - }, - parent: stream, - isClient: true, - isOutgoing: props.isOutgoing, - outboundLinkWriter: factory.outboundLinkWriter, - Emitter: factory.Emitter, - } - stream.server = tcpReader{ - msgQueue: make(chan tcpReaderDataMsg), - ident: fmt.Sprintf("%s %s", net, transport), - tcpID: &api.TcpID{ - SrcIP: net.Dst().String(), - DstIP: net.Src().String(), - SrcPort: transport.Dst().String(), - DstPort: transport.Src().String(), - }, - parent: stream, - isClient: false, - isOutgoing: props.isOutgoing, - outboundLinkWriter: factory.outboundLinkWriter, - Emitter: factory.Emitter, - } - factory.wg.Add(2) - // Start reading from channel stream.reader.bytes - go stream.client.run(&factory.wg, counterPair) - go stream.server.run(&factory.wg, counterPair) } return stream }