From 46e8bd1ed37c2aa36e7feb8ba76b56deb8e8976e Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Thu, 26 Aug 2021 02:15:35 +0300 Subject: [PATCH] Make the HTTP request-response counter perfect --- tap/api/api.go | 7 ++++++- tap/extensions/amqp/main.go | 2 +- tap/extensions/http/handlers.go | 26 ++++++-------------------- tap/extensions/http/main.go | 14 +++----------- tap/extensions/kafka/main.go | 2 +- tap/tcp_reader.go | 4 ++-- tap/tcp_stream_factory.go | 10 +++++++--- 7 files changed, 26 insertions(+), 39 deletions(-) diff --git a/tap/api/api.go b/tap/api/api.go index e40fe8c98..0333a402d 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -42,6 +42,11 @@ type TcpID struct { Ident string } +type CounterPair struct { + Request uint + Response uint +} + type GenericMessage struct { IsRequest bool `json:"is_request"` CaptureTime time.Time `json:"capture_time"` @@ -63,7 +68,7 @@ type OutputChannelItem struct { type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, emitter Emitter) error + Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, emitter Emitter) error Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry Summarize(entry *MizuEntry) *BaseEntryDetails Represent(entry *MizuEntry) (Protocol, []byte, error) diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 9f213cd84..85d942d13 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -41,7 +41,7 @@ func (d dissecting) Ping() { const amqpRequest string = "amqp_request" -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { r := AmqpReader{b} var remaining int diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index 600e2b7e4..6dbd98530 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -14,27 +14,16 @@ import ( "github.com/up9inc/mizu/tap/api" ) -func populateCounterMap(ident string) { - if counterMap[ident] == nil { - counterMap[ident] = &counterPair{ - request: 0, - response: 0, - } - } -} - func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter api.Emitter) error { streamID, messageHTTP1, err := grpcAssembler.readMessage() if err != nil { return err } - populateCounterMap(tcpID.Ident) var item *api.OutputChannelItem switch messageHTTP1 := messageHTTP1.(type) { case http.Request: - counterMap[tcpID.Ident].request++ ident := fmt.Sprintf( "%s->%s %s->%s %d", tcpID.SrcIP, @@ -54,7 +43,6 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a } } case http.Response: - counterMap[tcpID.Ident].response++ ident := fmt.Sprintf( "%s->%s %s->%s %d", tcpID.DstIP, @@ -83,14 +71,13 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a return nil } -func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emitter) error { +func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { req, err := http.ReadRequest(b) if err != nil { // log.Println("Error reading stream:", err) return err } - populateCounterMap(tcpID.Ident) - counterMap[tcpID.Ident].request++ + counterPair.Request++ body, err := ioutil.ReadAll(req.Body) req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind @@ -110,7 +97,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emit tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort, - counterMap[tcpID.Ident].request, + counterPair.Request, ) item := reqResMatcher.registerRequest(ident, req, time.Now()) if item != nil { @@ -126,14 +113,13 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emit return nil } -func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emitter) error { +func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { res, err := http.ReadResponse(b, nil) if err != nil { // log.Println("Error reading stream:", err) return err } - populateCounterMap(tcpID.Ident) - counterMap[tcpID.Ident].response++ + counterPair.Response++ var req string req = fmt.Sprintf("") @@ -163,7 +149,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emit tcpID.SrcIP, tcpID.DstPort, tcpID.SrcPort, - counterMap[tcpID.Ident].response, + counterPair.Response, ) item := reqResMatcher.registerResponse(ident, res, time.Now()) if item != nil { diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 0dc269bca..e2cf02630 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -12,13 +12,6 @@ import ( "github.com/up9inc/mizu/tap/api" ) -type counterPair struct { - request uint - response uint -} - -var counterMap map[string]*counterPair - var protocol api.Protocol = api.Protocol{ Name: "http", LongName: "Hypertext Transfer Protocol -- HTTP/1.1", @@ -52,7 +45,6 @@ const ( func init() { log.Println("Initializing HTTP extension.") - counterMap = make(map[string]*counterPair) } type dissecting string @@ -65,7 +57,7 @@ func (d dissecting) Ping() { log.Printf("pong %s\n", protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort) isHTTP2, err := checkIsHTTP2Connection(b, isClient) if err != nil { @@ -94,7 +86,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em } success = true } else if isClient { - err = handleHTTP1ClientStream(b, tcpID, emitter) + err = handleHTTP1ClientStream(b, tcpID, counterPair, emitter) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -103,7 +95,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em } success = true } else { - err = handleHTTP1ServerStream(b, tcpID, emitter) + err = handleHTTP1ServerStream(b, tcpID, counterPair, emitter) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 6a953316b..f63c8cbf5 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -36,7 +36,7 @@ func (d dissecting) Ping() { log.Printf("pong %s\n", _protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) error { +func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error { for { if isClient { _, _, err := ReadRequest(b, tcpID) diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index 1cc7be82f..ac3f7bdd6 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -91,7 +91,7 @@ func (h *tcpReader) Read(p []byte) (int, error) { return l, nil } -func (h *tcpReader) run(wg *sync.WaitGroup, isClient bool) { +func (h *tcpReader) run(wg *sync.WaitGroup, counterPair *api.CounterPair) { defer wg.Done() data, err := io.ReadAll(h) @@ -103,6 +103,6 @@ func (h *tcpReader) run(wg *sync.WaitGroup, isClient bool) { for _, extension := range extensions { r.Reset(data) - extension.Dissector.Dissect(bufio.NewReader(r), isClient, h.tcpID, h.Emitter) + extension.Dissector.Dissect(bufio.NewReader(r), h.isClient, h.tcpID, counterPair, h.Emitter) } } diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index aaf1c94d9..2ed647d71 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -48,6 +48,10 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T optchecker: reassembly.NewTCPOptionCheck(), } if stream.isTapTarget { + counterPair := &api.CounterPair{ + Request: 0, + Response: 0, + } stream.client = tcpReader{ msgQueue: make(chan tcpReaderDataMsg), ident: fmt.Sprintf("%s %s", net, transport), @@ -73,15 +77,15 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T DstPort: transport.Src().String(), }, parent: stream, - isClient: true, + isClient: false, isOutgoing: props.isOutgoing, outboundLinkWriter: factory.outboundLinkWriter, Emitter: factory.Emitter, } factory.wg.Add(2) // Start reading from channel stream.reader.bytes - go stream.client.run(&factory.wg, true) - go stream.server.run(&factory.wg, false) + go stream.client.run(&factory.wg, counterPair) + go stream.server.run(&factory.wg, counterPair) } return stream }