mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-04-06 02:38:15 +00:00
Compare commits
3 Commits
32.0-dev9
...
32.0-dev11
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1de50b0572 | ||
|
|
0881dad17f | ||
|
|
cade960b9b |
@@ -426,7 +426,7 @@ type TcpStream interface {
|
|||||||
SetProtocol(protocol *Protocol)
|
SetProtocol(protocol *Protocol)
|
||||||
GetOrigin() Capture
|
GetOrigin() Capture
|
||||||
GetProtoIdentifier() *ProtoIdentifier
|
GetProtoIdentifier() *ProtoIdentifier
|
||||||
GetReqResMatcher() RequestResponseMatcher
|
GetReqResMatchers() []RequestResponseMatcher
|
||||||
GetIsTapTarget() bool
|
GetIsTapTarget() bool
|
||||||
GetIsClosed() bool
|
GetIsClosed() bool
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,12 +34,14 @@ func (cl *Cleaner) clean() {
|
|||||||
cl.assemblerMutex.Unlock()
|
cl.assemblerMutex.Unlock()
|
||||||
|
|
||||||
cl.streamsMap.Range(func(k, v interface{}) bool {
|
cl.streamsMap.Range(func(k, v interface{}) bool {
|
||||||
reqResMatcher := v.(api.TcpStream).GetReqResMatcher()
|
reqResMatchers := v.(api.TcpStream).GetReqResMatchers()
|
||||||
if reqResMatcher == nil {
|
for _, reqResMatcher := range reqResMatchers {
|
||||||
return true
|
if reqResMatcher == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout))
|
||||||
|
cl.stats.deleted += deleted
|
||||||
}
|
}
|
||||||
deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout))
|
|
||||||
cl.stats.deleted += deleted
|
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ type tcpStream struct {
|
|||||||
protoIdentifier *api.ProtoIdentifier
|
protoIdentifier *api.ProtoIdentifier
|
||||||
isTapTarget bool
|
isTapTarget bool
|
||||||
origin api.Capture
|
origin api.Capture
|
||||||
reqResMatcher api.RequestResponseMatcher
|
reqResMatchers []api.RequestResponseMatcher
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -32,8 +32,8 @@ func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
|
|||||||
return t.protoIdentifier
|
return t.protoIdentifier
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetReqResMatcher() api.RequestResponseMatcher {
|
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||||
return t.reqResMatcher
|
return t.reqResMatchers
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetIsTapTarget() bool {
|
func (t *tcpStream) GetIsTapTarget() bool {
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ type tcpStream struct {
|
|||||||
protoIdentifier *api.ProtoIdentifier
|
protoIdentifier *api.ProtoIdentifier
|
||||||
isTapTarget bool
|
isTapTarget bool
|
||||||
origin api.Capture
|
origin api.Capture
|
||||||
reqResMatcher api.RequestResponseMatcher
|
reqResMatchers []api.RequestResponseMatcher
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -32,8 +32,8 @@ func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
|
|||||||
return t.protoIdentifier
|
return t.protoIdentifier
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetReqResMatcher() api.RequestResponseMatcher {
|
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||||
return t.reqResMatcher
|
return t.reqResMatchers
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetIsTapTarget() bool {
|
func (t *tcpStream) GetIsTapTarget() bool {
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ type tcpStream struct {
|
|||||||
protoIdentifier *api.ProtoIdentifier
|
protoIdentifier *api.ProtoIdentifier
|
||||||
isTapTarget bool
|
isTapTarget bool
|
||||||
origin api.Capture
|
origin api.Capture
|
||||||
reqResMatcher api.RequestResponseMatcher
|
reqResMatchers []api.RequestResponseMatcher
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -32,8 +32,8 @@ func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
|
|||||||
return t.protoIdentifier
|
return t.protoIdentifier
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetReqResMatcher() api.RequestResponseMatcher {
|
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||||
return t.reqResMatcher
|
return t.reqResMatchers
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetIsTapTarget() bool {
|
func (t *tcpStream) GetIsTapTarget() bool {
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ type tcpStream struct {
|
|||||||
protoIdentifier *api.ProtoIdentifier
|
protoIdentifier *api.ProtoIdentifier
|
||||||
isTapTarget bool
|
isTapTarget bool
|
||||||
origin api.Capture
|
origin api.Capture
|
||||||
reqResMatcher api.RequestResponseMatcher
|
reqResMatchers []api.RequestResponseMatcher
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -32,8 +32,8 @@ func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
|
|||||||
return t.protoIdentifier
|
return t.protoIdentifier
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetReqResMatcher() api.RequestResponseMatcher {
|
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||||
return t.reqResMatcher
|
return t.reqResMatchers
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetIsTapTarget() bool {
|
func (t *tcpStream) GetIsTapTarget() bool {
|
||||||
|
|||||||
@@ -69,10 +69,12 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
|
|||||||
extensions = extensionsRef
|
extensions = extensionsRef
|
||||||
filteringOptions = options
|
filteringOptions = options
|
||||||
|
|
||||||
|
streamsMap := NewTcpStreamMap()
|
||||||
|
|
||||||
if *tls {
|
if *tls {
|
||||||
for _, e := range extensions {
|
for _, e := range extensions {
|
||||||
if e.Protocol.Name == "http" {
|
if e.Protocol.Name == "http" {
|
||||||
tlsTapperInstance = startTlsTapper(e, outputItems, options)
|
tlsTapperInstance = startTlsTapper(e, outputItems, options, streamsMap)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -82,7 +84,7 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
|
|||||||
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
|
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
|
||||||
}
|
}
|
||||||
|
|
||||||
streamsMap, assembler := initializePassiveTapper(opts, outputItems)
|
assembler := initializePassiveTapper(opts, outputItems, streamsMap)
|
||||||
go startPassiveTapper(streamsMap, assembler)
|
go startPassiveTapper(streamsMap, assembler)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -181,9 +183,7 @@ func initializePacketSources() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) (api.TcpStreamMap, *tcpAssembler) {
|
func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, streamsMap api.TcpStreamMap) *tcpAssembler {
|
||||||
streamsMap := NewTcpStreamMap()
|
|
||||||
|
|
||||||
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
|
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
|
||||||
diagnose.InitializeTapperInternalStats()
|
diagnose.InitializeTapperInternalStats()
|
||||||
|
|
||||||
@@ -195,7 +195,7 @@ func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelI
|
|||||||
|
|
||||||
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
|
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
|
||||||
|
|
||||||
return streamsMap, assembler
|
return assembler
|
||||||
}
|
}
|
||||||
|
|
||||||
func startPassiveTapper(streamsMap api.TcpStreamMap, assembler *tcpAssembler) {
|
func startPassiveTapper(streamsMap api.TcpStreamMap, assembler *tcpAssembler) {
|
||||||
@@ -232,7 +232,8 @@ func startPassiveTapper(streamsMap api.TcpStreamMap, assembler *tcpAssembler) {
|
|||||||
logger.Log.Infof("AppStats: %v", diagnose.AppStats)
|
logger.Log.Infof("AppStats: %v", diagnose.AppStats)
|
||||||
}
|
}
|
||||||
|
|
||||||
func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChannelItem, options *api.TrafficFilteringOptions) *tlstapper.TlsTapper {
|
func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChannelItem,
|
||||||
|
options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) *tlstapper.TlsTapper {
|
||||||
tls := tlstapper.TlsTapper{}
|
tls := tlstapper.TlsTapper{}
|
||||||
chunksBufferSize := os.Getpagesize() * 100
|
chunksBufferSize := os.Getpagesize() * 100
|
||||||
logBufferSize := os.Getpagesize()
|
logBufferSize := os.Getpagesize()
|
||||||
@@ -262,7 +263,7 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
|
|||||||
}
|
}
|
||||||
|
|
||||||
go tls.PollForLogging()
|
go tls.PollForLogging()
|
||||||
go tls.Poll(emitter, options)
|
go tls.Poll(emitter, options, streamsMap)
|
||||||
|
|
||||||
return &tls
|
return &tls
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ type tcpReader struct {
|
|||||||
data []byte
|
data []byte
|
||||||
progress *api.ReadProgress
|
progress *api.ReadProgress
|
||||||
captureTime time.Time
|
captureTime time.Time
|
||||||
parent api.TcpStream
|
parent *tcpStream
|
||||||
packetsSeen uint
|
packetsSeen uint
|
||||||
extension *api.Extension
|
extension *api.Extension
|
||||||
emitter api.Emitter
|
emitter api.Emitter
|
||||||
@@ -35,7 +35,7 @@ type tcpReader struct {
|
|||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent api.TcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) api.TcpReader {
|
func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent *tcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) *tcpReader {
|
||||||
return &tcpReader{
|
return &tcpReader{
|
||||||
msgQueue: msgQueue,
|
msgQueue: msgQueue,
|
||||||
progress: progress,
|
progress: progress,
|
||||||
|
|||||||
@@ -148,12 +148,12 @@ func (t *tcpReassemblyStream) ReassembledSG(sg reassembly.ScatterGather, ac reas
|
|||||||
stream := t.tcpStream.(*tcpStream)
|
stream := t.tcpStream.(*tcpStream)
|
||||||
if dir == reassembly.TCPDirClientToServer {
|
if dir == reassembly.TCPDirClientToServer {
|
||||||
for i := range stream.getClients() {
|
for i := range stream.getClients() {
|
||||||
reader := stream.getClient(i).(*tcpReader)
|
reader := stream.getClient(i)
|
||||||
reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
|
reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for i := range stream.getServers() {
|
for i := range stream.getServers() {
|
||||||
reader := stream.getServer(i).(*tcpReader)
|
reader := stream.getServer(i)
|
||||||
reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
|
reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,16 +17,16 @@ type tcpStream struct {
|
|||||||
isClosed bool
|
isClosed bool
|
||||||
protoIdentifier *api.ProtoIdentifier
|
protoIdentifier *api.ProtoIdentifier
|
||||||
isTapTarget bool
|
isTapTarget bool
|
||||||
clients []api.TcpReader
|
clients []*tcpReader
|
||||||
servers []api.TcpReader
|
servers []*tcpReader
|
||||||
origin api.Capture
|
origin api.Capture
|
||||||
reqResMatcher api.RequestResponseMatcher
|
reqResMatchers []api.RequestResponseMatcher
|
||||||
createdAt time.Time
|
createdAt time.Time
|
||||||
streamsMap api.TcpStreamMap
|
streamsMap api.TcpStreamMap
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTcpStream(isTapTarget bool, streamsMap api.TcpStreamMap, capture api.Capture) api.TcpStream {
|
func NewTcpStream(isTapTarget bool, streamsMap api.TcpStreamMap, capture api.Capture) *tcpStream {
|
||||||
return &tcpStream{
|
return &tcpStream{
|
||||||
isTapTarget: isTapTarget,
|
isTapTarget: isTapTarget,
|
||||||
protoIdentifier: &api.ProtoIdentifier{},
|
protoIdentifier: &api.ProtoIdentifier{},
|
||||||
@@ -57,38 +57,42 @@ func (t *tcpStream) close() {
|
|||||||
|
|
||||||
for i := range t.clients {
|
for i := range t.clients {
|
||||||
reader := t.clients[i]
|
reader := t.clients[i]
|
||||||
reader.(*tcpReader).close()
|
reader.close()
|
||||||
}
|
}
|
||||||
for i := range t.servers {
|
for i := range t.servers {
|
||||||
reader := t.servers[i]
|
reader := t.servers[i]
|
||||||
reader.(*tcpReader).close()
|
reader.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) addClient(reader api.TcpReader) {
|
func (t *tcpStream) addClient(reader *tcpReader) {
|
||||||
t.clients = append(t.clients, reader)
|
t.clients = append(t.clients, reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) addServer(reader api.TcpReader) {
|
func (t *tcpStream) addServer(reader *tcpReader) {
|
||||||
t.servers = append(t.servers, reader)
|
t.servers = append(t.servers, reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) getClients() []api.TcpReader {
|
func (t *tcpStream) getClients() []*tcpReader {
|
||||||
return t.clients
|
return t.clients
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) getServers() []api.TcpReader {
|
func (t *tcpStream) getServers() []*tcpReader {
|
||||||
return t.servers
|
return t.servers
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) getClient(index int) api.TcpReader {
|
func (t *tcpStream) getClient(index int) *tcpReader {
|
||||||
return t.clients[index]
|
return t.clients[index]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) getServer(index int) api.TcpReader {
|
func (t *tcpStream) getServer(index int) *tcpReader {
|
||||||
return t.servers[index]
|
return t.servers[index]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *tcpStream) addReqResMatcher(reqResMatcher api.RequestResponseMatcher) {
|
||||||
|
t.reqResMatchers = append(t.reqResMatchers, reqResMatcher)
|
||||||
|
}
|
||||||
|
|
||||||
func (t *tcpStream) SetProtocol(protocol *api.Protocol) {
|
func (t *tcpStream) SetProtocol(protocol *api.Protocol) {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
defer t.Unlock()
|
||||||
@@ -102,13 +106,13 @@ func (t *tcpStream) SetProtocol(protocol *api.Protocol) {
|
|||||||
for i := range t.clients {
|
for i := range t.clients {
|
||||||
reader := t.clients[i]
|
reader := t.clients[i]
|
||||||
if reader.GetExtension().Protocol != t.protoIdentifier.Protocol {
|
if reader.GetExtension().Protocol != t.protoIdentifier.Protocol {
|
||||||
reader.(*tcpReader).close()
|
reader.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := range t.servers {
|
for i := range t.servers {
|
||||||
reader := t.servers[i]
|
reader := t.servers[i]
|
||||||
if reader.GetExtension().Protocol != t.protoIdentifier.Protocol {
|
if reader.GetExtension().Protocol != t.protoIdentifier.Protocol {
|
||||||
reader.(*tcpReader).close()
|
reader.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -123,8 +127,8 @@ func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
|
|||||||
return t.protoIdentifier
|
return t.protoIdentifier
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetReqResMatcher() api.RequestResponseMatcher {
|
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||||
return t.reqResMatcher
|
return t.reqResMatchers
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetIsTapTarget() bool {
|
func (t *tcpStream) GetIsTapTarget() bool {
|
||||||
|
|||||||
@@ -61,15 +61,15 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcpLayer *lay
|
|||||||
stream := NewTcpStream(isTapTarget, factory.streamsMap, getPacketOrigin(ac))
|
stream := NewTcpStream(isTapTarget, factory.streamsMap, getPacketOrigin(ac))
|
||||||
reassemblyStream := NewTcpReassemblyStream(fmt.Sprintf("%s:%s", net, transport), tcpLayer, fsmOptions, stream)
|
reassemblyStream := NewTcpReassemblyStream(fmt.Sprintf("%s:%s", net, transport), tcpLayer, fsmOptions, stream)
|
||||||
if stream.GetIsTapTarget() {
|
if stream.GetIsTapTarget() {
|
||||||
_stream := stream.(*tcpStream)
|
stream.setId(factory.streamsMap.NextId())
|
||||||
_stream.setId(factory.streamsMap.NextId())
|
|
||||||
for i, extension := range extensions {
|
for i, extension := range extensions {
|
||||||
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
|
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
|
||||||
|
stream.addReqResMatcher(reqResMatcher)
|
||||||
counterPair := &api.CounterPair{
|
counterPair := &api.CounterPair{
|
||||||
Request: 0,
|
Request: 0,
|
||||||
Response: 0,
|
Response: 0,
|
||||||
}
|
}
|
||||||
_stream.addClient(
|
stream.addClient(
|
||||||
NewTcpReader(
|
NewTcpReader(
|
||||||
make(chan api.TcpReaderDataMsg),
|
make(chan api.TcpReaderDataMsg),
|
||||||
&api.ReadProgress{},
|
&api.ReadProgress{},
|
||||||
@@ -90,7 +90,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcpLayer *lay
|
|||||||
reqResMatcher,
|
reqResMatcher,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
_stream.addServer(
|
stream.addServer(
|
||||||
NewTcpReader(
|
NewTcpReader(
|
||||||
make(chan api.TcpReaderDataMsg),
|
make(chan api.TcpReaderDataMsg),
|
||||||
&api.ReadProgress{},
|
&api.ReadProgress{},
|
||||||
@@ -112,11 +112,11 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcpLayer *lay
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
factory.streamsMap.Store(stream.(*tcpStream).getId(), stream)
|
factory.streamsMap.Store(stream.getId(), stream)
|
||||||
|
|
||||||
factory.wg.Add(2)
|
factory.wg.Add(2)
|
||||||
go _stream.getClient(i).(*tcpReader).run(filteringOptions, &factory.wg)
|
go stream.getClient(i).run(filteringOptions, &factory.wg)
|
||||||
go _stream.getServer(i).(*tcpReader).run(filteringOptions, &factory.wg)
|
go stream.getServer(i).run(filteringOptions, &factory.wg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return reassemblyStream
|
return reassemblyStream
|
||||||
|
|||||||
@@ -48,7 +48,13 @@ func (streamMap *tcpStreamMap) CloseTimedoutTcpStreamChannels() {
|
|||||||
<-ticker.C
|
<-ticker.C
|
||||||
|
|
||||||
streamMap.streams.Range(func(key interface{}, value interface{}) bool {
|
streamMap.streams.Range(func(key interface{}, value interface{}) bool {
|
||||||
stream := value.(*tcpStream)
|
// `*tlsStream` is not yet applicable to this routine.
|
||||||
|
// So, we cast into `(*tcpStream)` and ignore `*tlsStream`
|
||||||
|
stream, ok := value.(*tcpStream)
|
||||||
|
if !ok {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
if stream.protoIdentifier.Protocol == nil {
|
if stream.protoIdentifier.Protocol == nil {
|
||||||
if !stream.isClosed && time.Now().After(stream.createdAt.Add(tcpStreamChannelTimeoutMs)) {
|
if !stream.isClosed && time.Now().After(stream.createdAt.Add(tcpStreamChannelTimeoutMs)) {
|
||||||
stream.close()
|
stream.close()
|
||||||
|
|||||||
@@ -21,34 +21,25 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type tlsPoller struct {
|
type tlsPoller struct {
|
||||||
tls *TlsTapper
|
tls *TlsTapper
|
||||||
readers map[string]api.TcpReader
|
readers map[string]*tlsReader
|
||||||
closedReaders chan string
|
closedReaders chan string
|
||||||
reqResMatcher api.RequestResponseMatcher
|
reqResMatcher api.RequestResponseMatcher
|
||||||
chunksReader *perf.Reader
|
chunksReader *perf.Reader
|
||||||
extension *api.Extension
|
extension *api.Extension
|
||||||
procfs string
|
procfs string
|
||||||
pidToNamespace sync.Map
|
pidToNamespace sync.Map
|
||||||
isClosed bool
|
|
||||||
protoIdentifier *api.ProtoIdentifier
|
|
||||||
isTapTarget bool
|
|
||||||
origin api.Capture
|
|
||||||
createdAt time.Time
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller {
|
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller {
|
||||||
return &tlsPoller{
|
return &tlsPoller{
|
||||||
tls: tls,
|
tls: tls,
|
||||||
readers: make(map[string]api.TcpReader),
|
readers: make(map[string]*tlsReader),
|
||||||
closedReaders: make(chan string, 100),
|
closedReaders: make(chan string, 100),
|
||||||
reqResMatcher: extension.Dissector.NewResponseRequestMatcher(),
|
reqResMatcher: extension.Dissector.NewResponseRequestMatcher(),
|
||||||
extension: extension,
|
extension: extension,
|
||||||
chunksReader: nil,
|
chunksReader: nil,
|
||||||
procfs: procfs,
|
procfs: procfs,
|
||||||
protoIdentifier: &api.ProtoIdentifier{},
|
|
||||||
isTapTarget: true,
|
|
||||||
origin: api.Ebpf,
|
|
||||||
createdAt: time.Now(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -68,7 +59,7 @@ func (p *tlsPoller) close() error {
|
|||||||
return p.chunksReader.Close()
|
return p.chunksReader.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
|
func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) {
|
||||||
chunks := make(chan *tlsChunk)
|
chunks := make(chan *tlsChunk)
|
||||||
|
|
||||||
go p.pollChunksPerfBuffer(chunks)
|
go p.pollChunksPerfBuffer(chunks)
|
||||||
@@ -80,7 +71,7 @@ func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptio
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := p.handleTlsChunk(chunk, p.extension, emitter, options); err != nil {
|
if err := p.handleTlsChunk(chunk, p.extension, emitter, options, streamsMap); err != nil {
|
||||||
LogError(err)
|
LogError(err)
|
||||||
}
|
}
|
||||||
case key := <-p.closedReaders:
|
case key := <-p.closedReaders:
|
||||||
@@ -124,8 +115,8 @@ func (p *tlsPoller) pollChunksPerfBuffer(chunks chan<- *tlsChunk) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
|
func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension, emitter api.Emitter,
|
||||||
emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) error {
|
||||||
ip, port, err := chunk.getAddress()
|
ip, port, err := chunk.getAddress()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -135,24 +126,13 @@ func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
|
|||||||
key := buildTlsKey(chunk, ip, port)
|
key := buildTlsKey(chunk, ip, port)
|
||||||
reader, exists := p.readers[key]
|
reader, exists := p.readers[key]
|
||||||
|
|
||||||
newReader := NewTlsReader(
|
|
||||||
key,
|
|
||||||
func(r *tlsReader) {
|
|
||||||
p.closeReader(key, r)
|
|
||||||
},
|
|
||||||
chunk.isRequest(),
|
|
||||||
p,
|
|
||||||
)
|
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
reader = p.startNewTlsReader(chunk, ip, port, key, extension, newReader, options)
|
reader = p.startNewTlsReader(chunk, ip, port, key, emitter, extension, options, streamsMap)
|
||||||
p.readers[key] = reader
|
p.readers[key] = reader
|
||||||
}
|
}
|
||||||
|
|
||||||
tlsReader := reader.(*tlsReader)
|
reader.captureTime = time.Now()
|
||||||
|
reader.chunks <- chunk
|
||||||
tlsReader.setCaptureTime(time.Now())
|
|
||||||
tlsReader.sendChunk(chunk)
|
|
||||||
|
|
||||||
if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" {
|
if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" {
|
||||||
p.logTls(chunk, ip, port)
|
p.logTls(chunk, ip, port)
|
||||||
@@ -161,25 +141,48 @@ func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, key string, extension *api.Extension,
|
func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, key string,
|
||||||
reader api.TcpReader, options *api.TrafficFilteringOptions) api.TcpReader {
|
emitter api.Emitter, extension *api.Extension, options *api.TrafficFilteringOptions,
|
||||||
|
streamsMap api.TcpStreamMap) *tlsReader {
|
||||||
|
|
||||||
tcpid := p.buildTcpId(chunk, ip, port)
|
tcpid := p.buildTcpId(chunk, ip, port)
|
||||||
|
|
||||||
tlsReader := reader.(*tlsReader)
|
doneHandler := func(r *tlsReader) {
|
||||||
tlsReader.setTcpID(&tcpid)
|
p.closeReader(key, r)
|
||||||
|
}
|
||||||
|
|
||||||
tlsReader.setEmitter(&tlsEmitter{
|
tlsEmitter := &tlsEmitter{
|
||||||
delegate: reader.GetEmitter(),
|
delegate: emitter,
|
||||||
namespace: p.getNamespace(chunk.Pid),
|
namespace: p.getNamespace(chunk.Pid),
|
||||||
})
|
}
|
||||||
|
|
||||||
|
reader := &tlsReader{
|
||||||
|
key: key,
|
||||||
|
chunks: make(chan *tlsChunk, 1),
|
||||||
|
doneHandler: doneHandler,
|
||||||
|
progress: &api.ReadProgress{},
|
||||||
|
tcpID: &tcpid,
|
||||||
|
isClient: chunk.isRequest(),
|
||||||
|
captureTime: time.Now(),
|
||||||
|
extension: extension,
|
||||||
|
emitter: tlsEmitter,
|
||||||
|
counterPair: &api.CounterPair{},
|
||||||
|
reqResMatcher: p.reqResMatcher,
|
||||||
|
}
|
||||||
|
|
||||||
|
stream := &tlsStream{
|
||||||
|
reader: reader,
|
||||||
|
protoIdentifier: &api.ProtoIdentifier{},
|
||||||
|
}
|
||||||
|
streamsMap.Store(streamsMap.NextId(), stream)
|
||||||
|
|
||||||
|
reader.parent = stream
|
||||||
|
|
||||||
go dissect(extension, reader, options)
|
go dissect(extension, reader, options)
|
||||||
return reader
|
return reader
|
||||||
}
|
}
|
||||||
|
|
||||||
func dissect(extension *api.Extension, reader api.TcpReader,
|
func dissect(extension *api.Extension, reader *tlsReader, options *api.TrafficFilteringOptions) {
|
||||||
options *api.TrafficFilteringOptions) {
|
|
||||||
b := bufio.NewReader(reader)
|
b := bufio.NewReader(reader)
|
||||||
|
|
||||||
err := extension.Dissector.Dissect(b, reader, options)
|
err := extension.Dissector.Dissect(b, reader, options)
|
||||||
@@ -279,27 +282,3 @@ func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) {
|
|||||||
srcIp, srcPort, dstIp, dstPort,
|
srcIp, srcPort, dstIp, dstPort,
|
||||||
chunk.Recorded, chunk.Len, chunk.Start, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
|
chunk.Recorded, chunk.Len, chunk.Start, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *tlsPoller) SetProtocol(protocol *api.Protocol) {
|
|
||||||
// TODO: Implement
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *tlsPoller) GetOrigin() api.Capture {
|
|
||||||
return p.origin
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *tlsPoller) GetProtoIdentifier() *api.ProtoIdentifier {
|
|
||||||
return p.protoIdentifier
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *tlsPoller) GetReqResMatcher() api.RequestResponseMatcher {
|
|
||||||
return p.reqResMatcher
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *tlsPoller) GetIsTapTarget() bool {
|
|
||||||
return p.isTapTarget
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *tlsPoller) GetIsClosed() bool {
|
|
||||||
return p.isClosed
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -14,41 +14,15 @@ type tlsReader struct {
|
|||||||
doneHandler func(r *tlsReader)
|
doneHandler func(r *tlsReader)
|
||||||
progress *api.ReadProgress
|
progress *api.ReadProgress
|
||||||
tcpID *api.TcpID
|
tcpID *api.TcpID
|
||||||
isClosed bool
|
|
||||||
isClient bool
|
isClient bool
|
||||||
captureTime time.Time
|
captureTime time.Time
|
||||||
parent api.TcpStream
|
|
||||||
extension *api.Extension
|
extension *api.Extension
|
||||||
emitter api.Emitter
|
emitter api.Emitter
|
||||||
counterPair *api.CounterPair
|
counterPair *api.CounterPair
|
||||||
|
parent *tlsStream
|
||||||
reqResMatcher api.RequestResponseMatcher
|
reqResMatcher api.RequestResponseMatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTlsReader(key string, doneHandler func(r *tlsReader), isClient bool, stream api.TcpStream) api.TcpReader {
|
|
||||||
return &tlsReader{
|
|
||||||
key: key,
|
|
||||||
chunks: make(chan *tlsChunk, 1),
|
|
||||||
doneHandler: doneHandler,
|
|
||||||
parent: stream,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *tlsReader) sendChunk(chunk *tlsChunk) {
|
|
||||||
r.chunks <- chunk
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *tlsReader) setTcpID(tcpID *api.TcpID) {
|
|
||||||
r.tcpID = tcpID
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *tlsReader) setCaptureTime(captureTime time.Time) {
|
|
||||||
r.captureTime = captureTime
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *tlsReader) setEmitter(emitter api.Emitter) {
|
|
||||||
r.emitter = emitter
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *tlsReader) Read(p []byte) (int, error) {
|
func (r *tlsReader) Read(p []byte) (int, error) {
|
||||||
var chunk *tlsChunk
|
var chunk *tlsChunk
|
||||||
|
|
||||||
@@ -111,7 +85,7 @@ func (r *tlsReader) GetEmitter() api.Emitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *tlsReader) GetIsClosed() bool {
|
func (r *tlsReader) GetIsClosed() bool {
|
||||||
return r.isClosed
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *tlsReader) GetExtension() *api.Extension {
|
func (r *tlsReader) GetExtension() *api.Extension {
|
||||||
|
|||||||
32
tap/tlstapper/tls_stream.go
Normal file
32
tap/tlstapper/tls_stream.go
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
package tlstapper
|
||||||
|
|
||||||
|
import "github.com/up9inc/mizu/tap/api"
|
||||||
|
|
||||||
|
type tlsStream struct {
|
||||||
|
reader *tlsReader
|
||||||
|
protoIdentifier *api.ProtoIdentifier
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tlsStream) GetOrigin() api.Capture {
|
||||||
|
return api.Ebpf
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tlsStream) GetProtoIdentifier() *api.ProtoIdentifier {
|
||||||
|
return t.protoIdentifier
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tlsStream) SetProtocol(protocol *api.Protocol) {
|
||||||
|
t.protoIdentifier.Protocol = protocol
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tlsStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||||
|
return []api.RequestResponseMatcher{t.reader.reqResMatcher}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tlsStream) GetIsTapTarget() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tlsStream) GetIsClosed() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
@@ -50,8 +50,8 @@ func (t *TlsTapper) Init(chunksBufferSize int, logBufferSize int, procfs string,
|
|||||||
return t.poller.init(&t.bpfObjects, chunksBufferSize)
|
return t.poller.init(&t.bpfObjects, chunksBufferSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TlsTapper) Poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
|
func (t *TlsTapper) Poll(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) {
|
||||||
t.poller.poll(emitter, options)
|
t.poller.poll(emitter, options, streamsMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TlsTapper) PollForLogging() {
|
func (t *TlsTapper) PollForLogging() {
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ module.exports = {
|
|||||||
instanceOfMiniCssExtractPlugin.options.ignoreOrder = true;
|
instanceOfMiniCssExtractPlugin.options.ignoreOrder = true;
|
||||||
|
|
||||||
webpackConfig.resolve.alias['react']= path.resolve(__dirname, 'node_modules/react'); // solve 2 react instances
|
webpackConfig.resolve.alias['react']= path.resolve(__dirname, 'node_modules/react'); // solve 2 react instances
|
||||||
|
webpackConfig.resolve.alias['@material-ui/styles']= path.resolve("node_modules", "@material-ui/styles");
|
||||||
|
|
||||||
return webpackConfig;
|
return webpackConfig;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user