Compare commits

...

3 Commits

Author SHA1 Message Date
M. Mert Yıldıran
1de50b0572 Fix the request-response matcher maps iteration in clean() method and share the streams map with the TLS tapper (#1059)
* Fix `panic: interface conversion: api.RequestResponseMatcher is nil, not *http.requestResponseMatcher` error

Also fix the request-response matcher maps iteration in `clean()` method.

* Fix the mocks in the unit tests

* Remove unnecessary fields from `tlsPoller` and implement `SetProtocol` method

* Use concrete types in `tap` package

* Share the streams map with the TLS tapper

* Check interface conversion error
2022-05-01 16:16:22 +03:00
AmitUp9
0881dad17f update craco config to resolve material ui instances (#1060) 2022-05-01 16:05:53 +03:00
David Levanon
cade960b9b Fix tls + creating tls_stream (#1058) 2022-05-01 14:46:31 +03:00
17 changed files with 158 additions and 160 deletions

View File

@@ -426,7 +426,7 @@ type TcpStream interface {
SetProtocol(protocol *Protocol) SetProtocol(protocol *Protocol)
GetOrigin() Capture GetOrigin() Capture
GetProtoIdentifier() *ProtoIdentifier GetProtoIdentifier() *ProtoIdentifier
GetReqResMatcher() RequestResponseMatcher GetReqResMatchers() []RequestResponseMatcher
GetIsTapTarget() bool GetIsTapTarget() bool
GetIsClosed() bool GetIsClosed() bool
} }

View File

@@ -34,12 +34,14 @@ func (cl *Cleaner) clean() {
cl.assemblerMutex.Unlock() cl.assemblerMutex.Unlock()
cl.streamsMap.Range(func(k, v interface{}) bool { cl.streamsMap.Range(func(k, v interface{}) bool {
reqResMatcher := v.(api.TcpStream).GetReqResMatcher() reqResMatchers := v.(api.TcpStream).GetReqResMatchers()
if reqResMatcher == nil { for _, reqResMatcher := range reqResMatchers {
return true if reqResMatcher == nil {
continue
}
deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout))
cl.stats.deleted += deleted
} }
deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout))
cl.stats.deleted += deleted
return true return true
}) })

View File

@@ -11,7 +11,7 @@ type tcpStream struct {
protoIdentifier *api.ProtoIdentifier protoIdentifier *api.ProtoIdentifier
isTapTarget bool isTapTarget bool
origin api.Capture origin api.Capture
reqResMatcher api.RequestResponseMatcher reqResMatchers []api.RequestResponseMatcher
sync.Mutex sync.Mutex
} }
@@ -32,8 +32,8 @@ func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier return t.protoIdentifier
} }
func (t *tcpStream) GetReqResMatcher() api.RequestResponseMatcher { func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
return t.reqResMatcher return t.reqResMatchers
} }
func (t *tcpStream) GetIsTapTarget() bool { func (t *tcpStream) GetIsTapTarget() bool {

View File

@@ -11,7 +11,7 @@ type tcpStream struct {
protoIdentifier *api.ProtoIdentifier protoIdentifier *api.ProtoIdentifier
isTapTarget bool isTapTarget bool
origin api.Capture origin api.Capture
reqResMatcher api.RequestResponseMatcher reqResMatchers []api.RequestResponseMatcher
sync.Mutex sync.Mutex
} }
@@ -32,8 +32,8 @@ func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier return t.protoIdentifier
} }
func (t *tcpStream) GetReqResMatcher() api.RequestResponseMatcher { func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
return t.reqResMatcher return t.reqResMatchers
} }
func (t *tcpStream) GetIsTapTarget() bool { func (t *tcpStream) GetIsTapTarget() bool {

View File

@@ -11,7 +11,7 @@ type tcpStream struct {
protoIdentifier *api.ProtoIdentifier protoIdentifier *api.ProtoIdentifier
isTapTarget bool isTapTarget bool
origin api.Capture origin api.Capture
reqResMatcher api.RequestResponseMatcher reqResMatchers []api.RequestResponseMatcher
sync.Mutex sync.Mutex
} }
@@ -32,8 +32,8 @@ func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier return t.protoIdentifier
} }
func (t *tcpStream) GetReqResMatcher() api.RequestResponseMatcher { func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
return t.reqResMatcher return t.reqResMatchers
} }
func (t *tcpStream) GetIsTapTarget() bool { func (t *tcpStream) GetIsTapTarget() bool {

View File

@@ -11,7 +11,7 @@ type tcpStream struct {
protoIdentifier *api.ProtoIdentifier protoIdentifier *api.ProtoIdentifier
isTapTarget bool isTapTarget bool
origin api.Capture origin api.Capture
reqResMatcher api.RequestResponseMatcher reqResMatchers []api.RequestResponseMatcher
sync.Mutex sync.Mutex
} }
@@ -32,8 +32,8 @@ func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier return t.protoIdentifier
} }
func (t *tcpStream) GetReqResMatcher() api.RequestResponseMatcher { func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
return t.reqResMatcher return t.reqResMatchers
} }
func (t *tcpStream) GetIsTapTarget() bool { func (t *tcpStream) GetIsTapTarget() bool {

View File

@@ -69,10 +69,12 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
extensions = extensionsRef extensions = extensionsRef
filteringOptions = options filteringOptions = options
streamsMap := NewTcpStreamMap()
if *tls { if *tls {
for _, e := range extensions { for _, e := range extensions {
if e.Protocol.Name == "http" { if e.Protocol.Name == "http" {
tlsTapperInstance = startTlsTapper(e, outputItems, options) tlsTapperInstance = startTlsTapper(e, outputItems, options, streamsMap)
break break
} }
} }
@@ -82,7 +84,7 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds)) diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
} }
streamsMap, assembler := initializePassiveTapper(opts, outputItems) assembler := initializePassiveTapper(opts, outputItems, streamsMap)
go startPassiveTapper(streamsMap, assembler) go startPassiveTapper(streamsMap, assembler)
} }
@@ -181,9 +183,7 @@ func initializePacketSources() error {
return err return err
} }
func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) (api.TcpStreamMap, *tcpAssembler) { func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, streamsMap api.TcpStreamMap) *tcpAssembler {
streamsMap := NewTcpStreamMap()
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet) diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
diagnose.InitializeTapperInternalStats() diagnose.InitializeTapperInternalStats()
@@ -195,7 +195,7 @@ func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelI
assembler := NewTcpAssembler(outputItems, streamsMap, opts) assembler := NewTcpAssembler(outputItems, streamsMap, opts)
return streamsMap, assembler return assembler
} }
func startPassiveTapper(streamsMap api.TcpStreamMap, assembler *tcpAssembler) { func startPassiveTapper(streamsMap api.TcpStreamMap, assembler *tcpAssembler) {
@@ -232,7 +232,8 @@ func startPassiveTapper(streamsMap api.TcpStreamMap, assembler *tcpAssembler) {
logger.Log.Infof("AppStats: %v", diagnose.AppStats) logger.Log.Infof("AppStats: %v", diagnose.AppStats)
} }
func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChannelItem, options *api.TrafficFilteringOptions) *tlstapper.TlsTapper { func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChannelItem,
options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) *tlstapper.TlsTapper {
tls := tlstapper.TlsTapper{} tls := tlstapper.TlsTapper{}
chunksBufferSize := os.Getpagesize() * 100 chunksBufferSize := os.Getpagesize() * 100
logBufferSize := os.Getpagesize() logBufferSize := os.Getpagesize()
@@ -262,7 +263,7 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
} }
go tls.PollForLogging() go tls.PollForLogging()
go tls.Poll(emitter, options) go tls.Poll(emitter, options, streamsMap)
return &tls return &tls
} }

View File

@@ -26,7 +26,7 @@ type tcpReader struct {
data []byte data []byte
progress *api.ReadProgress progress *api.ReadProgress
captureTime time.Time captureTime time.Time
parent api.TcpStream parent *tcpStream
packetsSeen uint packetsSeen uint
extension *api.Extension extension *api.Extension
emitter api.Emitter emitter api.Emitter
@@ -35,7 +35,7 @@ type tcpReader struct {
sync.Mutex sync.Mutex
} }
func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent api.TcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) api.TcpReader { func NewTcpReader(msgQueue chan api.TcpReaderDataMsg, progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent *tcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) *tcpReader {
return &tcpReader{ return &tcpReader{
msgQueue: msgQueue, msgQueue: msgQueue,
progress: progress, progress: progress,

View File

@@ -148,12 +148,12 @@ func (t *tcpReassemblyStream) ReassembledSG(sg reassembly.ScatterGather, ac reas
stream := t.tcpStream.(*tcpStream) stream := t.tcpStream.(*tcpStream)
if dir == reassembly.TCPDirClientToServer { if dir == reassembly.TCPDirClientToServer {
for i := range stream.getClients() { for i := range stream.getClients() {
reader := stream.getClient(i).(*tcpReader) reader := stream.getClient(i)
reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp)) reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
} }
} else { } else {
for i := range stream.getServers() { for i := range stream.getServers() {
reader := stream.getServer(i).(*tcpReader) reader := stream.getServer(i)
reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp)) reader.sendMsgIfNotClosed(NewTcpReaderDataMsg(data, timestamp))
} }
} }

View File

@@ -17,16 +17,16 @@ type tcpStream struct {
isClosed bool isClosed bool
protoIdentifier *api.ProtoIdentifier protoIdentifier *api.ProtoIdentifier
isTapTarget bool isTapTarget bool
clients []api.TcpReader clients []*tcpReader
servers []api.TcpReader servers []*tcpReader
origin api.Capture origin api.Capture
reqResMatcher api.RequestResponseMatcher reqResMatchers []api.RequestResponseMatcher
createdAt time.Time createdAt time.Time
streamsMap api.TcpStreamMap streamsMap api.TcpStreamMap
sync.Mutex sync.Mutex
} }
func NewTcpStream(isTapTarget bool, streamsMap api.TcpStreamMap, capture api.Capture) api.TcpStream { func NewTcpStream(isTapTarget bool, streamsMap api.TcpStreamMap, capture api.Capture) *tcpStream {
return &tcpStream{ return &tcpStream{
isTapTarget: isTapTarget, isTapTarget: isTapTarget,
protoIdentifier: &api.ProtoIdentifier{}, protoIdentifier: &api.ProtoIdentifier{},
@@ -57,38 +57,42 @@ func (t *tcpStream) close() {
for i := range t.clients { for i := range t.clients {
reader := t.clients[i] reader := t.clients[i]
reader.(*tcpReader).close() reader.close()
} }
for i := range t.servers { for i := range t.servers {
reader := t.servers[i] reader := t.servers[i]
reader.(*tcpReader).close() reader.close()
} }
} }
func (t *tcpStream) addClient(reader api.TcpReader) { func (t *tcpStream) addClient(reader *tcpReader) {
t.clients = append(t.clients, reader) t.clients = append(t.clients, reader)
} }
func (t *tcpStream) addServer(reader api.TcpReader) { func (t *tcpStream) addServer(reader *tcpReader) {
t.servers = append(t.servers, reader) t.servers = append(t.servers, reader)
} }
func (t *tcpStream) getClients() []api.TcpReader { func (t *tcpStream) getClients() []*tcpReader {
return t.clients return t.clients
} }
func (t *tcpStream) getServers() []api.TcpReader { func (t *tcpStream) getServers() []*tcpReader {
return t.servers return t.servers
} }
func (t *tcpStream) getClient(index int) api.TcpReader { func (t *tcpStream) getClient(index int) *tcpReader {
return t.clients[index] return t.clients[index]
} }
func (t *tcpStream) getServer(index int) api.TcpReader { func (t *tcpStream) getServer(index int) *tcpReader {
return t.servers[index] return t.servers[index]
} }
func (t *tcpStream) addReqResMatcher(reqResMatcher api.RequestResponseMatcher) {
t.reqResMatchers = append(t.reqResMatchers, reqResMatcher)
}
func (t *tcpStream) SetProtocol(protocol *api.Protocol) { func (t *tcpStream) SetProtocol(protocol *api.Protocol) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
@@ -102,13 +106,13 @@ func (t *tcpStream) SetProtocol(protocol *api.Protocol) {
for i := range t.clients { for i := range t.clients {
reader := t.clients[i] reader := t.clients[i]
if reader.GetExtension().Protocol != t.protoIdentifier.Protocol { if reader.GetExtension().Protocol != t.protoIdentifier.Protocol {
reader.(*tcpReader).close() reader.close()
} }
} }
for i := range t.servers { for i := range t.servers {
reader := t.servers[i] reader := t.servers[i]
if reader.GetExtension().Protocol != t.protoIdentifier.Protocol { if reader.GetExtension().Protocol != t.protoIdentifier.Protocol {
reader.(*tcpReader).close() reader.close()
} }
} }
@@ -123,8 +127,8 @@ func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier return t.protoIdentifier
} }
func (t *tcpStream) GetReqResMatcher() api.RequestResponseMatcher { func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
return t.reqResMatcher return t.reqResMatchers
} }
func (t *tcpStream) GetIsTapTarget() bool { func (t *tcpStream) GetIsTapTarget() bool {

View File

@@ -61,15 +61,15 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcpLayer *lay
stream := NewTcpStream(isTapTarget, factory.streamsMap, getPacketOrigin(ac)) stream := NewTcpStream(isTapTarget, factory.streamsMap, getPacketOrigin(ac))
reassemblyStream := NewTcpReassemblyStream(fmt.Sprintf("%s:%s", net, transport), tcpLayer, fsmOptions, stream) reassemblyStream := NewTcpReassemblyStream(fmt.Sprintf("%s:%s", net, transport), tcpLayer, fsmOptions, stream)
if stream.GetIsTapTarget() { if stream.GetIsTapTarget() {
_stream := stream.(*tcpStream) stream.setId(factory.streamsMap.NextId())
_stream.setId(factory.streamsMap.NextId())
for i, extension := range extensions { for i, extension := range extensions {
reqResMatcher := extension.Dissector.NewResponseRequestMatcher() reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
stream.addReqResMatcher(reqResMatcher)
counterPair := &api.CounterPair{ counterPair := &api.CounterPair{
Request: 0, Request: 0,
Response: 0, Response: 0,
} }
_stream.addClient( stream.addClient(
NewTcpReader( NewTcpReader(
make(chan api.TcpReaderDataMsg), make(chan api.TcpReaderDataMsg),
&api.ReadProgress{}, &api.ReadProgress{},
@@ -90,7 +90,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcpLayer *lay
reqResMatcher, reqResMatcher,
), ),
) )
_stream.addServer( stream.addServer(
NewTcpReader( NewTcpReader(
make(chan api.TcpReaderDataMsg), make(chan api.TcpReaderDataMsg),
&api.ReadProgress{}, &api.ReadProgress{},
@@ -112,11 +112,11 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcpLayer *lay
), ),
) )
factory.streamsMap.Store(stream.(*tcpStream).getId(), stream) factory.streamsMap.Store(stream.getId(), stream)
factory.wg.Add(2) factory.wg.Add(2)
go _stream.getClient(i).(*tcpReader).run(filteringOptions, &factory.wg) go stream.getClient(i).run(filteringOptions, &factory.wg)
go _stream.getServer(i).(*tcpReader).run(filteringOptions, &factory.wg) go stream.getServer(i).run(filteringOptions, &factory.wg)
} }
} }
return reassemblyStream return reassemblyStream

View File

@@ -48,7 +48,13 @@ func (streamMap *tcpStreamMap) CloseTimedoutTcpStreamChannels() {
<-ticker.C <-ticker.C
streamMap.streams.Range(func(key interface{}, value interface{}) bool { streamMap.streams.Range(func(key interface{}, value interface{}) bool {
stream := value.(*tcpStream) // `*tlsStream` is not yet applicable to this routine.
// So, we cast into `(*tcpStream)` and ignore `*tlsStream`
stream, ok := value.(*tcpStream)
if !ok {
return true
}
if stream.protoIdentifier.Protocol == nil { if stream.protoIdentifier.Protocol == nil {
if !stream.isClosed && time.Now().After(stream.createdAt.Add(tcpStreamChannelTimeoutMs)) { if !stream.isClosed && time.Now().After(stream.createdAt.Add(tcpStreamChannelTimeoutMs)) {
stream.close() stream.close()

View File

@@ -21,34 +21,25 @@ import (
) )
type tlsPoller struct { type tlsPoller struct {
tls *TlsTapper tls *TlsTapper
readers map[string]api.TcpReader readers map[string]*tlsReader
closedReaders chan string closedReaders chan string
reqResMatcher api.RequestResponseMatcher reqResMatcher api.RequestResponseMatcher
chunksReader *perf.Reader chunksReader *perf.Reader
extension *api.Extension extension *api.Extension
procfs string procfs string
pidToNamespace sync.Map pidToNamespace sync.Map
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
createdAt time.Time
} }
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller { func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller {
return &tlsPoller{ return &tlsPoller{
tls: tls, tls: tls,
readers: make(map[string]api.TcpReader), readers: make(map[string]*tlsReader),
closedReaders: make(chan string, 100), closedReaders: make(chan string, 100),
reqResMatcher: extension.Dissector.NewResponseRequestMatcher(), reqResMatcher: extension.Dissector.NewResponseRequestMatcher(),
extension: extension, extension: extension,
chunksReader: nil, chunksReader: nil,
procfs: procfs, procfs: procfs,
protoIdentifier: &api.ProtoIdentifier{},
isTapTarget: true,
origin: api.Ebpf,
createdAt: time.Now(),
} }
} }
@@ -68,7 +59,7 @@ func (p *tlsPoller) close() error {
return p.chunksReader.Close() return p.chunksReader.Close()
} }
func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptions) { func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) {
chunks := make(chan *tlsChunk) chunks := make(chan *tlsChunk)
go p.pollChunksPerfBuffer(chunks) go p.pollChunksPerfBuffer(chunks)
@@ -80,7 +71,7 @@ func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptio
return return
} }
if err := p.handleTlsChunk(chunk, p.extension, emitter, options); err != nil { if err := p.handleTlsChunk(chunk, p.extension, emitter, options, streamsMap); err != nil {
LogError(err) LogError(err)
} }
case key := <-p.closedReaders: case key := <-p.closedReaders:
@@ -124,8 +115,8 @@ func (p *tlsPoller) pollChunksPerfBuffer(chunks chan<- *tlsChunk) {
} }
} }
func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension, func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension, emitter api.Emitter,
emitter api.Emitter, options *api.TrafficFilteringOptions) error { options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) error {
ip, port, err := chunk.getAddress() ip, port, err := chunk.getAddress()
if err != nil { if err != nil {
@@ -135,24 +126,13 @@ func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
key := buildTlsKey(chunk, ip, port) key := buildTlsKey(chunk, ip, port)
reader, exists := p.readers[key] reader, exists := p.readers[key]
newReader := NewTlsReader(
key,
func(r *tlsReader) {
p.closeReader(key, r)
},
chunk.isRequest(),
p,
)
if !exists { if !exists {
reader = p.startNewTlsReader(chunk, ip, port, key, extension, newReader, options) reader = p.startNewTlsReader(chunk, ip, port, key, emitter, extension, options, streamsMap)
p.readers[key] = reader p.readers[key] = reader
} }
tlsReader := reader.(*tlsReader) reader.captureTime = time.Now()
reader.chunks <- chunk
tlsReader.setCaptureTime(time.Now())
tlsReader.sendChunk(chunk)
if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" { if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" {
p.logTls(chunk, ip, port) p.logTls(chunk, ip, port)
@@ -161,25 +141,48 @@ func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
return nil return nil
} }
func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, key string, extension *api.Extension, func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, key string,
reader api.TcpReader, options *api.TrafficFilteringOptions) api.TcpReader { emitter api.Emitter, extension *api.Extension, options *api.TrafficFilteringOptions,
streamsMap api.TcpStreamMap) *tlsReader {
tcpid := p.buildTcpId(chunk, ip, port) tcpid := p.buildTcpId(chunk, ip, port)
tlsReader := reader.(*tlsReader) doneHandler := func(r *tlsReader) {
tlsReader.setTcpID(&tcpid) p.closeReader(key, r)
}
tlsReader.setEmitter(&tlsEmitter{ tlsEmitter := &tlsEmitter{
delegate: reader.GetEmitter(), delegate: emitter,
namespace: p.getNamespace(chunk.Pid), namespace: p.getNamespace(chunk.Pid),
}) }
reader := &tlsReader{
key: key,
chunks: make(chan *tlsChunk, 1),
doneHandler: doneHandler,
progress: &api.ReadProgress{},
tcpID: &tcpid,
isClient: chunk.isRequest(),
captureTime: time.Now(),
extension: extension,
emitter: tlsEmitter,
counterPair: &api.CounterPair{},
reqResMatcher: p.reqResMatcher,
}
stream := &tlsStream{
reader: reader,
protoIdentifier: &api.ProtoIdentifier{},
}
streamsMap.Store(streamsMap.NextId(), stream)
reader.parent = stream
go dissect(extension, reader, options) go dissect(extension, reader, options)
return reader return reader
} }
func dissect(extension *api.Extension, reader api.TcpReader, func dissect(extension *api.Extension, reader *tlsReader, options *api.TrafficFilteringOptions) {
options *api.TrafficFilteringOptions) {
b := bufio.NewReader(reader) b := bufio.NewReader(reader)
err := extension.Dissector.Dissect(b, reader, options) err := extension.Dissector.Dissect(b, reader, options)
@@ -279,27 +282,3 @@ func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) {
srcIp, srcPort, dstIp, dstPort, srcIp, srcPort, dstIp, dstPort,
chunk.Recorded, chunk.Len, chunk.Start, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded])) chunk.Recorded, chunk.Len, chunk.Start, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
} }
func (p *tlsPoller) SetProtocol(protocol *api.Protocol) {
// TODO: Implement
}
func (p *tlsPoller) GetOrigin() api.Capture {
return p.origin
}
func (p *tlsPoller) GetProtoIdentifier() *api.ProtoIdentifier {
return p.protoIdentifier
}
func (p *tlsPoller) GetReqResMatcher() api.RequestResponseMatcher {
return p.reqResMatcher
}
func (p *tlsPoller) GetIsTapTarget() bool {
return p.isTapTarget
}
func (p *tlsPoller) GetIsClosed() bool {
return p.isClosed
}

View File

@@ -14,41 +14,15 @@ type tlsReader struct {
doneHandler func(r *tlsReader) doneHandler func(r *tlsReader)
progress *api.ReadProgress progress *api.ReadProgress
tcpID *api.TcpID tcpID *api.TcpID
isClosed bool
isClient bool isClient bool
captureTime time.Time captureTime time.Time
parent api.TcpStream
extension *api.Extension extension *api.Extension
emitter api.Emitter emitter api.Emitter
counterPair *api.CounterPair counterPair *api.CounterPair
parent *tlsStream
reqResMatcher api.RequestResponseMatcher reqResMatcher api.RequestResponseMatcher
} }
func NewTlsReader(key string, doneHandler func(r *tlsReader), isClient bool, stream api.TcpStream) api.TcpReader {
return &tlsReader{
key: key,
chunks: make(chan *tlsChunk, 1),
doneHandler: doneHandler,
parent: stream,
}
}
func (r *tlsReader) sendChunk(chunk *tlsChunk) {
r.chunks <- chunk
}
func (r *tlsReader) setTcpID(tcpID *api.TcpID) {
r.tcpID = tcpID
}
func (r *tlsReader) setCaptureTime(captureTime time.Time) {
r.captureTime = captureTime
}
func (r *tlsReader) setEmitter(emitter api.Emitter) {
r.emitter = emitter
}
func (r *tlsReader) Read(p []byte) (int, error) { func (r *tlsReader) Read(p []byte) (int, error) {
var chunk *tlsChunk var chunk *tlsChunk
@@ -111,7 +85,7 @@ func (r *tlsReader) GetEmitter() api.Emitter {
} }
func (r *tlsReader) GetIsClosed() bool { func (r *tlsReader) GetIsClosed() bool {
return r.isClosed return false
} }
func (r *tlsReader) GetExtension() *api.Extension { func (r *tlsReader) GetExtension() *api.Extension {

View File

@@ -0,0 +1,32 @@
package tlstapper
import "github.com/up9inc/mizu/tap/api"
type tlsStream struct {
reader *tlsReader
protoIdentifier *api.ProtoIdentifier
}
func (t *tlsStream) GetOrigin() api.Capture {
return api.Ebpf
}
func (t *tlsStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
}
func (t *tlsStream) SetProtocol(protocol *api.Protocol) {
t.protoIdentifier.Protocol = protocol
}
func (t *tlsStream) GetReqResMatchers() []api.RequestResponseMatcher {
return []api.RequestResponseMatcher{t.reader.reqResMatcher}
}
func (t *tlsStream) GetIsTapTarget() bool {
return true
}
func (t *tlsStream) GetIsClosed() bool {
return false
}

View File

@@ -50,8 +50,8 @@ func (t *TlsTapper) Init(chunksBufferSize int, logBufferSize int, procfs string,
return t.poller.init(&t.bpfObjects, chunksBufferSize) return t.poller.init(&t.bpfObjects, chunksBufferSize)
} }
func (t *TlsTapper) Poll(emitter api.Emitter, options *api.TrafficFilteringOptions) { func (t *TlsTapper) Poll(emitter api.Emitter, options *api.TrafficFilteringOptions, streamsMap api.TcpStreamMap) {
t.poller.poll(emitter, options) t.poller.poll(emitter, options, streamsMap)
} }
func (t *TlsTapper) PollForLogging() { func (t *TlsTapper) PollForLogging() {

View File

@@ -13,7 +13,7 @@ module.exports = {
instanceOfMiniCssExtractPlugin.options.ignoreOrder = true; instanceOfMiniCssExtractPlugin.options.ignoreOrder = true;
webpackConfig.resolve.alias['react']= path.resolve(__dirname, 'node_modules/react'); // solve 2 react instances webpackConfig.resolve.alias['react']= path.resolve(__dirname, 'node_modules/react'); // solve 2 react instances
webpackConfig.resolve.alias['@material-ui/styles']= path.resolve("node_modules", "@material-ui/styles");
return webpackConfig; return webpackConfig;
} }