mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-05-06 09:17:22 +00:00
* Remove `tcpStreamWrapper` struct * Refactor `tap` module and move some of the code to `tap/api` module * Move `TrafficFilteringOptions` struct to `shared` module * Change the `Dissect` method signature to have `*TcpReader` as an argument * Add `CloseOtherProtocolDissectors` method and use it to synchronously close the other protocol dissectors * Run `go mod tidy` in `cli` module * Rename `SuperIdentifier` struct to `ProtoIdentifier` * Remove `SuperTimer` struct * Bring back `CloseTimedoutTcpStreamChannels` method * Run `go mod tidy` everywhere * Remove `GOGC` environment variable from tapper * Fix the tests * Bring back `debug.FreeOSMemory()` call * Make `CloseOtherProtocolDissectors` method mutexed * Revert "Remove `GOGC` environment variable from tapper" This reverts commitcfc2484bbb. * Bring back the removed `checksum`, `nooptcheck` and `ignorefsmerr` flags * Define a bunch of interfaces and don't export any new structs from `tap/api` * Keep the interfaces in `tap/api` but move the structs to `tap/tcp` * Fix the unit tests by depending on `github.com/up9inc/mizu/tap` * Use the modified `tlsEmitter` * Define `TlsChunk` interface and make `tlsReader` implement `TcpReader` * Remove unused fields in `tlsReader` * Define `ReassemblyStream` interface and separate `gopacket` specififc fields to `tcpReassemblyStream` struct Such that make `tap/api` don't depend on `gopacket` * Remove the unused fields * Make `tlsPoller` implement `TcpStream` interface and remove the call to `NewTcpStreamDummy` method * Remove unused fields from `tlsPoller` * Remove almost all of the setter methods in `TcpReader` and `TcpStream` interface and remove `TlsChunk` interface * Revert "Revert "Remove `GOGC` environment variable from tapper"" This reverts commitab2b9a803b. * Revert "Bring back `debug.FreeOSMemory()` call" This reverts commit1cce863bbb. * Remove excess comment * Fix acceptance tests (`logger` module) #run_acceptance_tests * Bring back `github.com/patrickmn/go-cache` * Fix `NewTcpStream` method signature * Put `tcpReader` and `tcpStream` mocks into protocol dissectors to remove `github.com/up9inc/mizu/tap` dependency * Fix AMQP tests * Revert960ba644cd* Revert `go.mod` and `go.sum` files in protocol dissectors * Fix the comment position * Revert `AppStatsInst` change * Fix indent * Fix CLI build * Fix linter error * Fix error msg * Revert some of the changes in `chunk.go`
135 lines
3.6 KiB
Go
135 lines
3.6 KiB
Go
package tap
|
|
|
|
import (
|
|
"encoding/hex"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/gopacket"
|
|
"github.com/google/gopacket/layers"
|
|
"github.com/google/gopacket/reassembly"
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
"github.com/up9inc/mizu/tap/diagnose"
|
|
"github.com/up9inc/mizu/tap/source"
|
|
)
|
|
|
|
const PACKETS_SEEN_LOG_THRESHOLD = 1000
|
|
|
|
type tcpAssembler struct {
|
|
*reassembly.Assembler
|
|
streamPool *reassembly.StreamPool
|
|
streamFactory *tcpStreamFactory
|
|
assemblerMutex sync.Mutex
|
|
}
|
|
|
|
// Context
|
|
// The assembler context
|
|
type context struct {
|
|
CaptureInfo gopacket.CaptureInfo
|
|
Origin api.Capture
|
|
}
|
|
|
|
func (c *context) GetCaptureInfo() gopacket.CaptureInfo {
|
|
return c.CaptureInfo
|
|
}
|
|
|
|
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.TcpStreamMap, opts *TapOpts) *tcpAssembler {
|
|
var emitter api.Emitter = &api.Emitting{
|
|
AppStats: &diagnose.AppStats,
|
|
OutputChannel: outputItems,
|
|
}
|
|
|
|
streamFactory := NewTcpStreamFactory(emitter, streamsMap, opts)
|
|
streamPool := reassembly.NewStreamPool(streamFactory)
|
|
assembler := reassembly.NewAssembler(streamPool)
|
|
|
|
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
|
|
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
|
|
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d",
|
|
maxBufferedPagesTotal, maxBufferedPagesPerConnection)
|
|
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
|
|
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
|
|
|
|
return &tcpAssembler{
|
|
Assembler: assembler,
|
|
streamPool: streamPool,
|
|
streamFactory: streamFactory,
|
|
}
|
|
}
|
|
|
|
func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.TcpPacketInfo) {
|
|
signalChan := make(chan os.Signal, 1)
|
|
signal.Notify(signalChan, os.Interrupt)
|
|
|
|
for packetInfo := range packets {
|
|
packetsCount := diagnose.AppStats.IncPacketsCount()
|
|
|
|
if packetsCount%PACKETS_SEEN_LOG_THRESHOLD == 0 {
|
|
logger.Log.Debugf("Packets seen: #%d", packetsCount)
|
|
}
|
|
|
|
packet := packetInfo.Packet
|
|
data := packet.Data()
|
|
diagnose.AppStats.UpdateProcessedBytes(uint64(len(data)))
|
|
if dumpPacket {
|
|
logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
|
|
}
|
|
|
|
tcp := packet.Layer(layers.LayerTypeTCP)
|
|
if tcp != nil {
|
|
diagnose.AppStats.IncTcpPacketsCount()
|
|
tcp := tcp.(*layers.TCP)
|
|
|
|
c := context{
|
|
CaptureInfo: packet.Metadata().CaptureInfo,
|
|
Origin: packetInfo.Source.Origin,
|
|
}
|
|
diagnose.InternalStats.Totalsz += len(tcp.Payload)
|
|
a.assemblerMutex.Lock()
|
|
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
|
|
a.assemblerMutex.Unlock()
|
|
}
|
|
|
|
done := *maxcount > 0 && int64(diagnose.AppStats.PacketsCount) >= *maxcount
|
|
if done {
|
|
errorMapLen, _ := diagnose.TapErrors.GetErrorsSummary()
|
|
logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
|
|
diagnose.AppStats.PacketsCount,
|
|
diagnose.AppStats.ProcessedBytes,
|
|
time.Since(diagnose.AppStats.StartTime),
|
|
diagnose.TapErrors.ErrorsCount,
|
|
errorMapLen)
|
|
}
|
|
|
|
select {
|
|
case <-signalChan:
|
|
logger.Log.Infof("Caught SIGINT: aborting")
|
|
done = true
|
|
default:
|
|
// NOP: continue
|
|
}
|
|
if done {
|
|
break
|
|
}
|
|
}
|
|
|
|
a.assemblerMutex.Lock()
|
|
closed := a.FlushAll()
|
|
a.assemblerMutex.Unlock()
|
|
logger.Log.Debugf("Final flush: %d closed", closed)
|
|
}
|
|
|
|
func (a *tcpAssembler) dumpStreamPool() {
|
|
a.streamPool.Dump()
|
|
}
|
|
|
|
func (a *tcpAssembler) waitAndDump() {
|
|
a.streamFactory.WaitGoRoutines()
|
|
a.assemblerMutex.Lock()
|
|
logger.Log.Debugf("%s", a.Dump())
|
|
a.assemblerMutex.Unlock()
|
|
}
|