mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 10:11:03 +00:00
Merge pull request #1600 from weaveworks/extract-report-serialzn
Helper for reading & writing from binary Removes gob support from router.
This commit is contained in:
@@ -2,7 +2,6 @@ package multitenant
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -18,7 +17,6 @@ import (
|
||||
"github.com/bluele/gcache"
|
||||
"github.com/nats-io/nats"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/ugorji/go/codec"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/weaveworks/scope/app"
|
||||
@@ -312,15 +310,7 @@ func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (*report.Report
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reader, err := gzip.NewReader(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rep := report.MakeReport()
|
||||
if err := codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode(&rep); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &rep, nil
|
||||
return report.MakeFromBinary(resp.Body)
|
||||
}
|
||||
|
||||
func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time.Time) ([]report.Report, error) {
|
||||
@@ -387,14 +377,7 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {
|
||||
|
||||
// first, encode the report into a buffer and record its size
|
||||
var buf bytes.Buffer
|
||||
writer, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := codec.NewEncoder(writer, &codec.MsgpackHandle{}).Encode(&rep); err != nil {
|
||||
return err
|
||||
}
|
||||
writer.Close()
|
||||
rep.WriteBinary(&buf)
|
||||
reportSize.Add(float64(buf.Len()))
|
||||
|
||||
// second, put the report on s3
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"encoding/gob"
|
||||
"io"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
@@ -102,63 +100,32 @@ func RegisterTopologyRoutes(router *mux.Router, r Reporter) {
|
||||
gzipHandler(requestContextDecorator(makeProbeHandler(r))))
|
||||
}
|
||||
|
||||
type byteCounter struct {
|
||||
next io.ReadCloser
|
||||
count *uint64
|
||||
}
|
||||
|
||||
func (c byteCounter) Read(p []byte) (n int, err error) {
|
||||
n, err = c.next.Read(p)
|
||||
*c.count += uint64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (c byteCounter) Close() error {
|
||||
return c.next.Close()
|
||||
}
|
||||
|
||||
// RegisterReportPostHandler registers the handler for report submission
|
||||
func RegisterReportPostHandler(a Adder, router *mux.Router) {
|
||||
post := router.Methods("POST").Subrouter()
|
||||
post.HandleFunc("/api/report", requestContextDecorator(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
rpt report.Report
|
||||
reader = r.Body
|
||||
err error
|
||||
compressedSize, uncompressedSize uint64
|
||||
rpt report.Report
|
||||
reader = r.Body
|
||||
)
|
||||
|
||||
if log.GetLevel() == log.DebugLevel {
|
||||
reader = byteCounter{next: reader, count: &compressedSize}
|
||||
}
|
||||
if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") {
|
||||
reader, err = gzip.NewReader(reader)
|
||||
if err != nil {
|
||||
respondWith(w, http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
gzipped := strings.Contains(r.Header.Get("Content-Encoding"), "gzip")
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
var handle codec.Handle
|
||||
switch {
|
||||
case strings.HasPrefix(contentType, "application/json"):
|
||||
handle = &codec.JsonHandle{}
|
||||
case strings.HasPrefix(contentType, "application/msgpack"):
|
||||
handle = &codec.MsgpackHandle{}
|
||||
default:
|
||||
respondWith(w, http.StatusBadRequest, fmt.Errorf("Unsupported Content-Type: %v", contentType))
|
||||
return
|
||||
}
|
||||
|
||||
if log.GetLevel() == log.DebugLevel {
|
||||
reader = byteCounter{next: reader, count: &uncompressedSize}
|
||||
}
|
||||
decoder := gob.NewDecoder(reader).Decode
|
||||
if strings.HasPrefix(r.Header.Get("Content-Type"), "application/json") {
|
||||
decoder = codec.NewDecoder(reader, &codec.JsonHandle{}).Decode
|
||||
} else if strings.HasPrefix(r.Header.Get("Content-Type"), "application/msgpack") {
|
||||
decoder = codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode
|
||||
}
|
||||
|
||||
if err := decoder(&rpt); err != nil {
|
||||
if err := rpt.ReadBinary(reader, gzipped, handle); err != nil {
|
||||
respondWith(w, http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
log.Debugf(
|
||||
"Received report sizes: compressed %d bytes, uncompressed %d bytes (%.2f%%)",
|
||||
compressedSize,
|
||||
uncompressedSize,
|
||||
float32(compressedSize)/float32(uncompressedSize)*100,
|
||||
)
|
||||
|
||||
if err := a.Add(ctx, rpt); err != nil {
|
||||
log.Errorf("Error Adding report: %v", err)
|
||||
|
||||
@@ -2,7 +2,6 @@ package app_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@@ -84,11 +83,6 @@ func TestReportPostHandler(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
test("", func(v interface{}) ([]byte, error) {
|
||||
buf := &bytes.Buffer{}
|
||||
err := gob.NewEncoder(buf).Encode(v)
|
||||
return buf.Bytes(), err
|
||||
})
|
||||
test("application/json", func(v interface{}) ([]byte, error) {
|
||||
buf := &bytes.Buffer{}
|
||||
err := codec.NewEncoder(buf, &codec.JsonHandle{}).Encode(v)
|
||||
|
||||
@@ -2,9 +2,6 @@ package appclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"github.com/ugorji/go/codec"
|
||||
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
@@ -24,11 +21,6 @@ func NewReportPublisher(publisher Publisher) *ReportPublisher {
|
||||
// Publish serialises and compresses a report, then passes it to a publisher
|
||||
func (p *ReportPublisher) Publish(r report.Report) error {
|
||||
buf := &bytes.Buffer{}
|
||||
gzwriter := gzip.NewWriter(buf)
|
||||
if err := codec.NewEncoder(gzwriter, &codec.MsgpackHandle{}).Encode(r); err != nil {
|
||||
return err
|
||||
}
|
||||
gzwriter.Close() // otherwise the content won't get flushed to the output stream
|
||||
|
||||
r.WriteBinary(buf)
|
||||
return p.publisher.Publish(buf)
|
||||
}
|
||||
|
||||
77
report/marshal.go
Normal file
77
report/marshal.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package report
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"io"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
// WriteBinary writes a Report as a gzipped msgpack.
|
||||
func (rep Report) WriteBinary(w io.Writer) error {
|
||||
gzwriter, err := gzip.NewWriterLevel(w, gzip.BestCompression)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = codec.NewEncoder(gzwriter, &codec.MsgpackHandle{}).Encode(&rep); err != nil {
|
||||
return err
|
||||
}
|
||||
gzwriter.Close() // otherwise the content won't get flushed to the output stream
|
||||
return nil
|
||||
}
|
||||
|
||||
type byteCounter struct {
|
||||
next io.Reader
|
||||
count *uint64
|
||||
}
|
||||
|
||||
func (c byteCounter) Read(p []byte) (n int, err error) {
|
||||
n, err = c.next.Read(p)
|
||||
*c.count += uint64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
// ReadBinary reads bytes into a Report.
|
||||
//
|
||||
// Will decompress the binary if gzipped is true, and will use the given
|
||||
// codecHandle to decode it.
|
||||
func (rep *Report) ReadBinary(r io.Reader, gzipped bool, codecHandle codec.Handle) error {
|
||||
var err error
|
||||
var compressedSize, uncompressedSize uint64
|
||||
|
||||
// We have historically had trouble with reports being too large. We are
|
||||
// keeping this instrumentation around to help us implement
|
||||
// weaveworks/scope#985.
|
||||
if log.GetLevel() == log.DebugLevel {
|
||||
r = byteCounter{next: r, count: &compressedSize}
|
||||
}
|
||||
if gzipped {
|
||||
r, err = gzip.NewReader(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if log.GetLevel() == log.DebugLevel {
|
||||
r = byteCounter{next: r, count: &uncompressedSize}
|
||||
}
|
||||
if err := codec.NewDecoder(r, codecHandle).Decode(&rep); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debugf(
|
||||
"Received report sizes: compressed %d bytes, uncompressed %d bytes (%.2f%%)",
|
||||
compressedSize,
|
||||
uncompressedSize,
|
||||
float32(compressedSize)/float32(uncompressedSize)*100,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// MakeFromBinary constructs a Report from a gzipped msgpack.
|
||||
func MakeFromBinary(r io.Reader) (*Report, error) {
|
||||
rep := MakeReport()
|
||||
if err := rep.ReadBinary(r, true, &codec.MsgpackHandle{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &rep, nil
|
||||
}
|
||||
Reference in New Issue
Block a user