Files
kubeshark/tap/tcp_assembler.go
M. Mert Yıldıran d3e6a69d82 Refactor tap module to achieve synchronously closing other protocol dissectors upon identification (#1026)
* Remove `tcpStreamWrapper` struct

* Refactor `tap` module and move some of the code to `tap/api` module

* Move `TrafficFilteringOptions` struct to `shared` module

* Change the `Dissect` method signature to have `*TcpReader` as an argument

* Add `CloseOtherProtocolDissectors` method and use it to synchronously close the other protocol dissectors

* Run `go mod tidy` in `cli` module

* Rename `SuperIdentifier` struct to `ProtoIdentifier`

* Remove `SuperTimer` struct

* Bring back `CloseTimedoutTcpStreamChannels` method

* Run `go mod tidy` everywhere

* Remove `GOGC` environment variable from tapper

* Fix the tests

* Bring back `debug.FreeOSMemory()` call

* Make `CloseOtherProtocolDissectors` method mutexed

* Revert "Remove `GOGC` environment variable from tapper"

This reverts commit cfc2484bbb.

* Bring back the removed `checksum`, `nooptcheck` and `ignorefsmerr` flags

* Define a bunch of interfaces and don't export any new structs from `tap/api`

* Keep the interfaces in `tap/api` but move the structs to `tap/tcp`

* Fix the unit tests by depending on `github.com/up9inc/mizu/tap`

* Use the modified `tlsEmitter`

* Define `TlsChunk` interface and make `tlsReader` implement `TcpReader`

* Remove unused fields in `tlsReader`

* Define `ReassemblyStream` interface and separate `gopacket` specififc fields to `tcpReassemblyStream` struct

Such that make `tap/api` don't depend on `gopacket`

* Remove the unused fields

* Make `tlsPoller` implement `TcpStream` interface and remove the call to `NewTcpStreamDummy` method

* Remove unused fields from `tlsPoller`

* Remove almost all of the setter methods in `TcpReader` and `TcpStream` interface and remove `TlsChunk` interface

* Revert "Revert "Remove `GOGC` environment variable from tapper""

This reverts commit ab2b9a803b.

* Revert "Bring back `debug.FreeOSMemory()` call"

This reverts commit 1cce863bbb.

* Remove excess comment

* Fix acceptance tests (`logger` module) #run_acceptance_tests

* Bring back `github.com/patrickmn/go-cache`

* Fix `NewTcpStream` method signature

* Put `tcpReader` and `tcpStream` mocks into protocol dissectors to remove `github.com/up9inc/mizu/tap` dependency

* Fix AMQP tests

* Revert 960ba644cd

* Revert `go.mod` and `go.sum` files in protocol dissectors

* Fix the comment position

* Revert `AppStatsInst` change

* Fix indent

* Fix CLI build

* Fix linter error

* Fix error msg

* Revert some of the changes in `chunk.go`
2022-04-28 17:19:14 +03:00

135 lines
3.6 KiB
Go

package tap
import (
"encoding/hex"
"os"
"os/signal"
"sync"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
"github.com/up9inc/mizu/tap/source"
)
const PACKETS_SEEN_LOG_THRESHOLD = 1000
type tcpAssembler struct {
*reassembly.Assembler
streamPool *reassembly.StreamPool
streamFactory *tcpStreamFactory
assemblerMutex sync.Mutex
}
// Context
// The assembler context
type context struct {
CaptureInfo gopacket.CaptureInfo
Origin api.Capture
}
func (c *context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.TcpStreamMap, opts *TapOpts) *tcpAssembler {
var emitter api.Emitter = &api.Emitting{
AppStats: &diagnose.AppStats,
OutputChannel: outputItems,
}
streamFactory := NewTcpStreamFactory(emitter, streamsMap, opts)
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d",
maxBufferedPagesTotal, maxBufferedPagesPerConnection)
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
return &tcpAssembler{
Assembler: assembler,
streamPool: streamPool,
streamFactory: streamFactory,
}
}
func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.TcpPacketInfo) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
for packetInfo := range packets {
packetsCount := diagnose.AppStats.IncPacketsCount()
if packetsCount%PACKETS_SEEN_LOG_THRESHOLD == 0 {
logger.Log.Debugf("Packets seen: #%d", packetsCount)
}
packet := packetInfo.Packet
data := packet.Data()
diagnose.AppStats.UpdateProcessedBytes(uint64(len(data)))
if dumpPacket {
logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
}
tcp := packet.Layer(layers.LayerTypeTCP)
if tcp != nil {
diagnose.AppStats.IncTcpPacketsCount()
tcp := tcp.(*layers.TCP)
c := context{
CaptureInfo: packet.Metadata().CaptureInfo,
Origin: packetInfo.Source.Origin,
}
diagnose.InternalStats.Totalsz += len(tcp.Payload)
a.assemblerMutex.Lock()
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
a.assemblerMutex.Unlock()
}
done := *maxcount > 0 && int64(diagnose.AppStats.PacketsCount) >= *maxcount
if done {
errorMapLen, _ := diagnose.TapErrors.GetErrorsSummary()
logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
diagnose.AppStats.PacketsCount,
diagnose.AppStats.ProcessedBytes,
time.Since(diagnose.AppStats.StartTime),
diagnose.TapErrors.ErrorsCount,
errorMapLen)
}
select {
case <-signalChan:
logger.Log.Infof("Caught SIGINT: aborting")
done = true
default:
// NOP: continue
}
if done {
break
}
}
a.assemblerMutex.Lock()
closed := a.FlushAll()
a.assemblerMutex.Unlock()
logger.Log.Debugf("Final flush: %d closed", closed)
}
func (a *tcpAssembler) dumpStreamPool() {
a.streamPool.Dump()
}
func (a *tcpAssembler) waitAndDump() {
a.streamFactory.WaitGoRoutines()
a.assemblerMutex.Lock()
logger.Log.Debugf("%s", a.Dump())
a.assemblerMutex.Unlock()
}