diff --git a/tap/extensions/http/lib/grpc_assembler.go b/tap/extensions/http/lib/grpc_assembler.go deleted file mode 100644 index f7c519d8f..000000000 --- a/tap/extensions/http/lib/grpc_assembler.go +++ /dev/null @@ -1,244 +0,0 @@ -package lib - -import ( - "bufio" - "bytes" - "encoding/base64" - "encoding/binary" - "errors" - "io" - "math" - "net/http" - "net/url" - "strings" - - "golang.org/x/net/http2" - "golang.org/x/net/http2/hpack" -) - -const frameHeaderLen = 9 - -var clientPreface = []byte(http2.ClientPreface) - -const initialHeaderTableSize = 4096 -const protoHTTP2 = "HTTP/2.0" -const protoMajorHTTP2 = 2 -const protoMinorHTTP2 = 0 - -const maxHTTP2DataLenDefault = 1 * 1024 * 1024 // 1MB -var maxHTTP2DataLen int = maxHTTP2DataLenDefault // value initialized during init - -type messageFragment struct { - headers []hpack.HeaderField - data []byte -} - -type fragmentsByStream map[uint32]*messageFragment - -func (fbs *fragmentsByStream) appendFrame(streamID uint32, frame http2.Frame) { - switch frame := frame.(type) { - case *http2.MetaHeadersFrame: - if existingFragment, ok := (*fbs)[streamID]; ok { - existingFragment.headers = append(existingFragment.headers, frame.Fields...) - } else { - // new fragment - (*fbs)[streamID] = &messageFragment{headers: frame.Fields} - } - case *http2.DataFrame: - newDataLen := len(frame.Data()) - if existingFragment, ok := (*fbs)[streamID]; ok { - existingDataLen := len(existingFragment.data) - // Never save more than maxHTTP2DataLen bytes - numBytesToAppend := int(math.Min(float64(maxHTTP2DataLen-existingDataLen), float64(newDataLen))) - - existingFragment.data = append(existingFragment.data, frame.Data()[:numBytesToAppend]...) - } else { - // new fragment - // In principle, should not happen with DATA frames, because they are always preceded by HEADERS - - // Never save more than maxHTTP2DataLen bytes - numBytesToAppend := int(math.Min(float64(maxHTTP2DataLen), float64(newDataLen))) - - (*fbs)[streamID] = &messageFragment{data: frame.Data()[:numBytesToAppend]} - } - } -} - -func (fbs *fragmentsByStream) pop(streamID uint32) ([]hpack.HeaderField, []byte) { - headers := (*fbs)[streamID].headers - data := (*fbs)[streamID].data - delete(*fbs, streamID) - - return headers, data -} - -func createGrpcAssembler(b *bufio.Reader) GrpcAssembler { - var framerOutput bytes.Buffer - framer := http2.NewFramer(&framerOutput, b) - framer.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil) - return GrpcAssembler{ - fragmentsByStream: make(fragmentsByStream), - framer: framer, - } -} - -type GrpcAssembler struct { - fragmentsByStream fragmentsByStream - framer *http2.Framer -} - -func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) { - // Exactly one Framer is used for each half connection. - // (Instead of creating a new Framer for each ReadFrame operation) - // This is needed in order to decompress the headers, - // because the compression context is updated with each requests/response. - frame, err := ga.framer.ReadFrame() - if err != nil { - return 0, nil, err - } - - streamID := frame.Header().StreamID - - ga.fragmentsByStream.appendFrame(streamID, frame) - - if !(ga.isStreamEnd(frame)) { - return 0, nil, nil - } - - headers, data := ga.fragmentsByStream.pop(streamID) - - // Note: header keys are converted by http.Header.Set to canonical names, e.g. content-type -> Content-Type. - // By converting the keys we violate the HTTP/2 specification, which state that all headers must be lowercase. - headersHTTP1 := make(http.Header) - for _, header := range headers { - headersHTTP1.Add(header.Name, header.Value) - } - dataString := base64.StdEncoding.EncodeToString(data) - - // Use http1 types only because they are expected in http_matcher. - // TODO: Create an interface that will be used by http_matcher:registerRequest and http_matcher:registerRequest - // to accept both HTTP/1.x and HTTP/2 requests and responses - var messageHTTP1 interface{} - if _, ok := headersHTTP1[":method"]; ok { - messageHTTP1 = http.Request{ - URL: &url.URL{}, - Method: "POST", - Header: headersHTTP1, - Proto: protoHTTP2, - ProtoMajor: protoMajorHTTP2, - ProtoMinor: protoMinorHTTP2, - Body: io.NopCloser(strings.NewReader(dataString)), - ContentLength: int64(len(dataString)), - } - } else if _, ok := headersHTTP1[":status"]; ok { - messageHTTP1 = http.Response{ - Header: headersHTTP1, - Proto: protoHTTP2, - ProtoMajor: protoMajorHTTP2, - ProtoMinor: protoMinorHTTP2, - Body: io.NopCloser(strings.NewReader(dataString)), - ContentLength: int64(len(dataString)), - } - } else { - return 0, nil, errors.New("Failed to assemble stream: neither a request nor a message") - } - - return streamID, messageHTTP1, nil -} - -func (ga *GrpcAssembler) isStreamEnd(frame http2.Frame) bool { - switch frame := frame.(type) { - case *http2.MetaHeadersFrame: - if frame.StreamEnded() { - return true - } - case *http2.DataFrame: - if frame.StreamEnded() { - return true - } - } - - return false -} - -/* Check if HTTP/2. Remove HTTP/2 client preface from start of buffer if present - */ -func checkIsHTTP2Connection(b *bufio.Reader, isClient bool) (bool, error) { - if isClient { - return checkIsHTTP2ClientStream(b) - } - - return checkIsHTTP2ServerStream(b) -} - -func prepareHTTP2Connection(b *bufio.Reader, isClient bool) error { - if !isClient { - return nil - } - - return discardClientPreface(b) -} - -func checkIsHTTP2ClientStream(b *bufio.Reader) (bool, error) { - return checkClientPreface(b) -} - -func checkIsHTTP2ServerStream(b *bufio.Reader) (bool, error) { - buf, err := b.Peek(frameHeaderLen) - if err != nil { - return false, err - } - - // If response starts with this text, it is HTTP/1.x - if bytes.Compare(buf, []byte("HTTP/1.0 ")) == 0 || bytes.Compare(buf, []byte("HTTP/1.1 ")) == 0 { - return false, nil - } - - // Check server connection preface (a settings frame) - frameHeader := http2.FrameHeader{ - Length: uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2]), - Type: http2.FrameType(buf[3]), - Flags: http2.Flags(buf[4]), - StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1), - } - - if frameHeader.Type != http2.FrameSettings { - // If HTTP/2, but not start of stream, will also fulfill this condition. - return false, nil - } - - return true, nil -} - -func checkClientPreface(b *bufio.Reader) (bool, error) { - bytesStart, err := b.Peek(len(clientPreface)) - if err != nil { - return false, err - } else if len(bytesStart) != len(clientPreface) { - return false, errors.New("checkClientPreface: not enough bytes read") - } - - if !bytes.Equal(bytesStart, clientPreface) { - return false, nil - } - - return true, nil -} - -func discardClientPreface(b *bufio.Reader) error { - if isClientPrefacePresent, err := checkClientPreface(b); err != nil { - return err - } else if !isClientPrefacePresent { - return errors.New("discardClientPreface: does not begin with client preface") - } - - // Remove client preface string from the buffer - n, err := b.Discard(len(clientPreface)) - if err != nil { - return err - } else if n != len(clientPreface) { - return errors.New("discardClientPreface: failed to discard client preface") - } - - return nil -} diff --git a/tap/extensions/http/lib/reader.go b/tap/extensions/http/lib/reader.go index 12bb00ee7..204d90b8a 100644 --- a/tap/extensions/http/lib/reader.go +++ b/tap/extensions/http/lib/reader.go @@ -60,7 +60,6 @@ type httpReader struct { data []byte captureTime time.Time hexdump bool - grpcAssembler GrpcAssembler messageCount uint harWriter *HarWriter packetsSeen uint @@ -102,20 +101,20 @@ func (h *httpReader) run(wg *sync.WaitGroup) { defer wg.Done() b := bufio.NewReader(h) - if isHTTP2, err := checkIsHTTP2Connection(b, h.isClient); err != nil { - SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)", h.ident, err, err, err) - // Do something? - } else { - h.isHTTP2 = isHTTP2 - } + // if isHTTP2, err := checkIsHTTP2Connection(b, h.isClient); err != nil { + // SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)", h.ident, err, err, err) + // // Do something? + // } else { + // h.isHTTP2 = isHTTP2 + // } - if h.isHTTP2 { - err := prepareHTTP2Connection(b, h.isClient) - if err != nil { - SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)", h.ident, err, err, err) - } - h.grpcAssembler = createGrpcAssembler(b) - } + // if h.isHTTP2 { + // err := prepareHTTP2Connection(b, h.isClient) + // if err != nil { + // SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)", h.ident, err, err, err) + // } + // h.grpcAssembler = createGrpcAssembler(b) + // } for true { if h.isHTTP2 { @@ -147,51 +146,51 @@ func (h *httpReader) run(wg *sync.WaitGroup) { } func (h *httpReader) handleHTTP2Stream() error { - streamID, messageHTTP1, err := h.grpcAssembler.readMessage() - h.messageCount++ - if err != nil { - return err - } + // streamID, messageHTTP1, err := h.grpcAssembler.readMessage() + // h.messageCount++ + // if err != nil { + // return err + // } - var reqResPair *requestResponsePair - var connectionInfo *ConnectionInfo + // var reqResPair *requestResponsePair + // var connectionInfo *ConnectionInfo - switch messageHTTP1 := messageHTTP1.(type) { - case http.Request: - ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, streamID) - connectionInfo = &ConnectionInfo{ - ClientIP: h.tcpID.srcIP, - ClientPort: h.tcpID.srcPort, - ServerIP: h.tcpID.dstIP, - ServerPort: h.tcpID.dstPort, - IsOutgoing: h.isOutgoing, - } - reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime) - case http.Response: - ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, streamID) - connectionInfo = &ConnectionInfo{ - ClientIP: h.tcpID.dstIP, - ClientPort: h.tcpID.dstPort, - ServerIP: h.tcpID.srcIP, - ServerPort: h.tcpID.srcPort, - IsOutgoing: h.isOutgoing, - } - reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime) - } + // switch messageHTTP1 := messageHTTP1.(type) { + // case http.Request: + // ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, streamID) + // connectionInfo = &ConnectionInfo{ + // ClientIP: h.tcpID.srcIP, + // ClientPort: h.tcpID.srcPort, + // ServerIP: h.tcpID.dstIP, + // ServerPort: h.tcpID.dstPort, + // IsOutgoing: h.isOutgoing, + // } + // reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime) + // case http.Response: + // ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, streamID) + // connectionInfo = &ConnectionInfo{ + // ClientIP: h.tcpID.dstIP, + // ClientPort: h.tcpID.dstPort, + // ServerIP: h.tcpID.srcIP, + // ServerPort: h.tcpID.srcPort, + // IsOutgoing: h.isOutgoing, + // } + // reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime) + // } - if reqResPair != nil { - // statsTracker.incMatchedMessages() + // if reqResPair != nil { + // // statsTracker.incMatchedMessages() - if h.harWriter != nil { - h.harWriter.WritePair( - reqResPair.Request.orig.(*http.Request), - reqResPair.Request.captureTime, - reqResPair.Response.orig.(*http.Response), - reqResPair.Response.captureTime, - connectionInfo, - ) - } - } + // if h.harWriter != nil { + // h.harWriter.WritePair( + // reqResPair.Request.orig.(*http.Request), + // reqResPair.Request.captureTime, + // reqResPair.Response.orig.(*http.Response), + // reqResPair.Response.captureTime, + // connectionInfo, + // ) + // } + // } return nil }