Files
kubeshark/tap/stats_tracker.go
M. Mert Yıldıran 858a64687d Stop the hanging Goroutines by dropping the old, unidentified TCP streams (#260)
* Close the hanging TCP message channels after a dynamically aligned timeout (base `10000` milliseconds)

* Bring back `source.Lazy`

* Add a one more `sync.Map.Delete` call

* Improve the formula by taking base Goroutine count into account

* Reduce duplication

* Include the dropped TCP streams count into the stats tracker and print a debug log whenever it happens

* Add `superIdentifier` field to `tcpStream` to check if it has identified

Also stop the other protocol dissectors if a TCP stream identified by a protocol.

* Take one step forward in fixing the channel closing issue (WIP)

Add `sync.Mutex` to `tcpReader` and make the loops reference based.

* Fix the channel closing issue

* Improve the accuracy of the formula, log better and multiply `baseStreamChannelTimeoutMs` by 100

* Remove `fmt.Printf`

* Replace `runtime.Gosched()` with `time.Sleep(1 * time.Millisecond)`

* Close the channels of other protocols in case of an identification

* Simplify the logic

* Replace the formula with hard timeout 5000 milliseconds and 4000 maximum number of Goroutines
2021-09-12 08:26:48 +03:00

118 lines
3.5 KiB
Go

package tap
import (
"sync"
"time"
)
type AppStats struct {
StartTime time.Time `json:"-"`
ProcessedBytes int64 `json:"processedBytes"`
PacketsCount int64 `json:"packetsCount"`
TcpPacketsCount int64 `json:"tcpPacketsCount"`
ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount int64 `json:"tlsConnectionsCount"`
MatchedPairs int64 `json:"matchedPairs"`
DroppedTcpStreams int64 `json:"droppedTcpStreams"`
}
type StatsTracker struct {
appStats AppStats
processedBytesMutex sync.Mutex
packetsCountMutex sync.Mutex
tcpPacketsCountMutex sync.Mutex
reassembledTcpPayloadsCountMutex sync.Mutex
tlsConnectionsCountMutex sync.Mutex
matchedPairsMutex sync.Mutex
droppedTcpStreamsMutex sync.Mutex
}
func (st *StatsTracker) incMatchedPairs() {
st.matchedPairsMutex.Lock()
st.appStats.MatchedPairs++
st.matchedPairsMutex.Unlock()
}
func (st *StatsTracker) incDroppedTcpStreams() {
st.droppedTcpStreamsMutex.Lock()
st.appStats.DroppedTcpStreams++
st.droppedTcpStreamsMutex.Unlock()
}
func (st *StatsTracker) incPacketsCount() int64 {
st.packetsCountMutex.Lock()
st.appStats.PacketsCount++
currentPacketsCount := st.appStats.PacketsCount
st.packetsCountMutex.Unlock()
return currentPacketsCount
}
func (st *StatsTracker) incTcpPacketsCount() {
st.tcpPacketsCountMutex.Lock()
st.appStats.TcpPacketsCount++
st.tcpPacketsCountMutex.Unlock()
}
func (st *StatsTracker) incReassembledTcpPayloadsCount() {
st.reassembledTcpPayloadsCountMutex.Lock()
st.appStats.ReassembledTcpPayloadsCount++
st.reassembledTcpPayloadsCountMutex.Unlock()
}
func (st *StatsTracker) incTlsConnectionsCount() {
st.tlsConnectionsCountMutex.Lock()
st.appStats.TlsConnectionsCount++
st.tlsConnectionsCountMutex.Unlock()
}
func (st *StatsTracker) updateProcessedBytes(size int64) {
st.processedBytesMutex.Lock()
st.appStats.ProcessedBytes += size
st.processedBytesMutex.Unlock()
}
func (st *StatsTracker) setStartTime(startTime time.Time) {
st.appStats.StartTime = startTime
}
func (st *StatsTracker) dumpStats() *AppStats {
currentAppStats := &AppStats{StartTime: st.appStats.StartTime}
st.processedBytesMutex.Lock()
currentAppStats.ProcessedBytes = st.appStats.ProcessedBytes
st.appStats.ProcessedBytes = 0
st.processedBytesMutex.Unlock()
st.packetsCountMutex.Lock()
currentAppStats.PacketsCount = st.appStats.PacketsCount
st.appStats.PacketsCount = 0
st.packetsCountMutex.Unlock()
st.tcpPacketsCountMutex.Lock()
currentAppStats.TcpPacketsCount = st.appStats.TcpPacketsCount
st.appStats.TcpPacketsCount = 0
st.tcpPacketsCountMutex.Unlock()
st.reassembledTcpPayloadsCountMutex.Lock()
currentAppStats.ReassembledTcpPayloadsCount = st.appStats.ReassembledTcpPayloadsCount
st.appStats.ReassembledTcpPayloadsCount = 0
st.reassembledTcpPayloadsCountMutex.Unlock()
st.tlsConnectionsCountMutex.Lock()
currentAppStats.TlsConnectionsCount = st.appStats.TlsConnectionsCount
st.appStats.TlsConnectionsCount = 0
st.tlsConnectionsCountMutex.Unlock()
st.matchedPairsMutex.Lock()
currentAppStats.MatchedPairs = st.appStats.MatchedPairs
st.appStats.MatchedPairs = 0
st.matchedPairsMutex.Unlock()
st.droppedTcpStreamsMutex.Lock()
currentAppStats.DroppedTcpStreams = st.appStats.DroppedTcpStreams
st.appStats.DroppedTcpStreams = 0
st.droppedTcpStreamsMutex.Unlock()
return currentAppStats
}