Fix the performance in TCP stream factory. Make it create two tcpReader(s) per extension

This commit is contained in:
M. Mert Yildiran
2021-08-27 02:33:00 +03:00
parent b583108c26
commit 6a7b940965
5 changed files with 59 additions and 59 deletions

View File

@@ -206,8 +206,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
// log.Printf("unexpected frame: %+v\n", f)
}
}
return errors.New("AMQP EOF")
}
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {

View File

@@ -50,8 +50,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
}
return nil
}
func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry {

View File

@@ -2,10 +2,8 @@ package tap
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"sync"
"time"
@@ -56,6 +54,7 @@ type tcpReader struct {
messageCount uint
packetsSeen uint
outboundLinkWriter *OutboundLinkWriter
Extension *api.Extension
Emitter api.Emitter
}
@@ -93,16 +92,9 @@ func (h *tcpReader) Read(p []byte) (int, error) {
func (h *tcpReader) run(wg *sync.WaitGroup, counterPair *api.CounterPair) {
defer wg.Done()
data, err := io.ReadAll(h)
b := bufio.NewReader(h)
err := h.Extension.Dissector.Dissect(b, h.isClient, h.tcpID, counterPair, h.Emitter)
if err != nil {
log.Printf("Corrupted TCP stream, unable to read!")
return
}
r := bytes.NewReader(data)
for _, extension := range extensions {
r.Reset(data)
extension.Dissector.Dissect(bufio.NewReader(r), h.isClient, h.tcpID, counterPair, h.Emitter)
io.ReadAll(b)
}
}

View File

@@ -22,8 +22,8 @@ type tcpStream struct {
net, transport gopacket.Flow
isDNS bool
isTapTarget bool
client tcpReader
server tcpReader
clients []tcpReader
servers []tcpReader
urls []string
ident string
sync.Mutex
@@ -145,9 +145,13 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
// This channel is read by an tcpReader object
statsTracker.incReassembledTcpPayloadsCount()
if dir == reassembly.TCPDirClientToServer {
t.client.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
for _, reader := range t.clients {
reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
}
} else {
t.server.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
for _, reader := range t.servers {
reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
}
}
}
}
@@ -156,8 +160,12 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Trace("%s: Connection closed", t.ident)
if t.isTapTarget {
close(t.client.msgQueue)
close(t.server.msgQueue)
for _, reader := range t.clients {
close(reader.msgQueue)
}
for _, reader := range t.servers {
close(reader.msgQueue)
}
}
// do not remove the connection to allow last ACK
return false

View File

@@ -48,44 +48,48 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
optchecker: reassembly.NewTCPOptionCheck(),
}
if stream.isTapTarget {
counterPair := &api.CounterPair{
Request: 0,
Response: 0,
for i, extension := range extensions {
counterPair := &api.CounterPair{
Request: 0,
Response: 0,
}
stream.clients = append(stream.clients, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: srcIp,
DstIP: dstIp,
SrcPort: srcPort,
DstPort: dstPort,
},
parent: stream,
isClient: true,
isOutgoing: props.isOutgoing,
outboundLinkWriter: factory.outboundLinkWriter,
Extension: extension,
Emitter: factory.Emitter,
})
stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: net.Dst().String(),
DstIP: net.Src().String(),
SrcPort: transport.Dst().String(),
DstPort: transport.Src().String(),
},
parent: stream,
isClient: false,
isOutgoing: props.isOutgoing,
outboundLinkWriter: factory.outboundLinkWriter,
Extension: extension,
Emitter: factory.Emitter,
})
factory.wg.Add(2)
// Start reading from channel stream.reader.bytes
go stream.clients[i].run(&factory.wg, counterPair)
go stream.servers[i].run(&factory.wg, counterPair)
}
stream.client = tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: srcIp,
DstIP: dstIp,
SrcPort: srcPort,
DstPort: dstPort,
},
parent: stream,
isClient: true,
isOutgoing: props.isOutgoing,
outboundLinkWriter: factory.outboundLinkWriter,
Emitter: factory.Emitter,
}
stream.server = tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: net.Dst().String(),
DstIP: net.Src().String(),
SrcPort: transport.Dst().String(),
DstPort: transport.Src().String(),
},
parent: stream,
isClient: false,
isOutgoing: props.isOutgoing,
outboundLinkWriter: factory.outboundLinkWriter,
Emitter: factory.Emitter,
}
factory.wg.Add(2)
// Start reading from channel stream.reader.bytes
go stream.client.run(&factory.wg, counterPair)
go stream.server.run(&factory.wg, counterPair)
}
return stream
}