Files
kubeshark/tap/tcp_reader.go
M. Mert Yıldıran 858a64687d Stop the hanging Goroutines by dropping the old, unidentified TCP streams (#260)
* Close the hanging TCP message channels after a dynamically aligned timeout (base `10000` milliseconds)

* Bring back `source.Lazy`

* Add a one more `sync.Map.Delete` call

* Improve the formula by taking base Goroutine count into account

* Reduce duplication

* Include the dropped TCP streams count into the stats tracker and print a debug log whenever it happens

* Add `superIdentifier` field to `tcpStream` to check if it has identified

Also stop the other protocol dissectors if a TCP stream identified by a protocol.

* Take one step forward in fixing the channel closing issue (WIP)

Add `sync.Mutex` to `tcpReader` and make the loops reference based.

* Fix the channel closing issue

* Improve the accuracy of the formula, log better and multiply `baseStreamChannelTimeoutMs` by 100

* Remove `fmt.Printf`

* Replace `runtime.Gosched()` with `time.Sleep(1 * time.Millisecond)`

* Close the channels of other protocols in case of an identification

* Simplify the logic

* Replace the formula with hard timeout 5000 milliseconds and 4000 maximum number of Goroutines
2021-09-12 08:26:48 +03:00

115 lines
2.8 KiB
Go

package tap
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"sync"
"time"
"github.com/bradleyfalzon/tlsx"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
)
const checkTLSPacketAmount = 100
type tcpReaderDataMsg struct {
bytes []byte
timestamp time.Time
}
type tcpID struct {
srcIP string
dstIP string
srcPort string
dstPort string
}
type ConnectionInfo struct {
ClientIP string
ClientPort string
ServerIP string
ServerPort string
IsOutgoing bool
}
func (tid *tcpID) String() string {
return fmt.Sprintf("%s->%s %s->%s", tid.srcIP, tid.dstIP, tid.srcPort, tid.dstPort)
}
/* tcpReader gets reads from a channel of bytes of tcp payload, and parses it into requests and responses.
* The payload is written to the channel by a tcpStream object that is dedicated to one tcp connection.
* An tcpReader object is unidirectional: it parses either a client stream or a server stream.
* Implements io.Reader interface (Read)
*/
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte
superTimer *api.SuperTimer
parent *tcpStream
messageCount uint
packetsSeen uint
outboundLinkWriter *OutboundLinkWriter
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
sync.Mutex
}
func (h *tcpReader) Read(p []byte) (int, error) {
var msg tcpReaderDataMsg
ok := true
for ok && len(h.data) == 0 {
msg, ok = <-h.msgQueue
h.data = msg.bytes
h.superTimer.CaptureTime = msg.timestamp
if len(h.data) > 0 {
h.packetsSeen += 1
}
if h.packetsSeen < checkTLSPacketAmount && len(msg.bytes) > 5 { // packets with less than 5 bytes cause tlsx to panic
clientHello := tlsx.ClientHello{}
err := clientHello.Unmarshall(msg.bytes)
if err == nil {
rlog.Debugf("Detected TLS client hello with SNI %s\n", clientHello.SNI)
// TODO: Throws `panic: runtime error: invalid memory address or nil pointer dereference` error.
// numericPort, _ := strconv.Atoi(h.tcpID.DstPort)
// h.outboundLinkWriter.WriteOutboundLink(h.tcpID.SrcIP, h.tcpID.DstIP, numericPort, clientHello.SNI, TLSProtocol)
}
}
}
if !ok || len(h.data) == 0 {
return 0, io.EOF
}
l := copy(p, h.data)
h.data = h.data[l:]
return l, nil
}
func (h *tcpReader) Close() {
h.Lock()
if !h.isClosed {
h.isClosed = true
close(h.msgQueue)
}
h.Unlock()
}
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter)
if err != nil {
io.Copy(ioutil.Discard, b)
}
}