diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index 2365f8c55..346b7ebb4 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -1,7 +1,6 @@ package multitenant import ( - "bytes" "crypto/md5" "fmt" "io" @@ -338,45 +337,55 @@ func (c *awsCollector) Report(ctx context.Context) (report.Report, error) { return c.merger.Merge(reports), nil } +// calculateDynamoKeys generates the row & column keys for Dynamo. +func calculateDynamoKeys(userid string, now time.Time) (string, string) { + rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(now.UnixNano()/time.Hour.Nanoseconds(), 10)) + colKey := strconv.FormatInt(now.UnixNano(), 10) + return rowKey, colKey +} + +// calculateReportKey determines the key we should use for a report. +func calculateReportKey(rowKey, colKey string) (string, error) { + rowKeyHash := md5.New() + if _, err := io.WriteString(rowKeyHash, rowKey); err != nil { + return "", err + } + return fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey), nil +} + func (c *awsCollector) Add(ctx context.Context, rep report.Report) error { userid, err := c.userIDer(ctx) if err != nil { return err } - // first, encode the report into a buffer and record its size - var buf bytes.Buffer - rep.WriteBinary(&buf) - reportSizeHistogram.Observe(float64(buf.Len())) - - // second, put the report on s3 - now := time.Now() - rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(now.UnixNano()/time.Hour.Nanoseconds(), 10)) - colKey := strconv.FormatInt(now.UnixNano(), 10) - rowKeyHash := md5.New() - if _, err := io.WriteString(rowKeyHash, rowKey); err != nil { - return err - } - s3Key := fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey) - err = c.s3.StoreBytes(s3Key, buf.Bytes()) + // first, put the report on s3 + rowKey, colKey := calculateDynamoKeys(userid, time.Now()) + reportKey, err := calculateReportKey(rowKey, colKey) if err != nil { return err } + reportSize, err := c.s3.StoreReport(reportKey, &rep) + if err != nil { + return err + } + reportSizeHistogram.Observe(float64(reportSize)) + // third, put it in memcache if c.memcache != nil { - err = c.memcache.StoreBytes(s3Key, buf.Bytes()) + _, err = c.memcache.StoreReport(reportKey, &rep) if err != nil { // NOTE: We don't abort here because failing to store in memcache // doesn't actually break anything else -- it's just an // optimization. - log.Warningf("Could not store %v in memcache: %v", s3Key, err) + log.Warningf("Could not store %v in memcache: %v", reportKey, err) } } // fourth, put the key in dynamodb dynamoValueSize.WithLabelValues("PutItem"). - Add(float64(len(s3Key))) + Add(float64(len(reportKey))) var resp *dynamodb.PutItemOutput err = instrument.TimeRequestHistogram("PutItem", dynamoRequestDuration, func() error { @@ -391,7 +400,7 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report) error { N: aws.String(colKey), }, reportField: { - S: aws.String(s3Key), + S: aws.String(reportKey), }, }, ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), @@ -407,7 +416,7 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report) error { } if rep.Shortcut && c.nats != nil { - err := c.nats.Publish(userid, []byte(s3Key)) + err := c.nats.Publish(userid, []byte(reportKey)) natsRequests.WithLabelValues("Publish", instrument.ErrorCode(err)).Add(1) if err != nil { log.Errorf("Error sending shortcut report: %v", err) diff --git a/app/multitenant/memcache_client.go b/app/multitenant/memcache_client.go index fe4ec8f5d..3b9db267e 100644 --- a/app/multitenant/memcache_client.go +++ b/app/multitenant/memcache_client.go @@ -46,11 +46,12 @@ func init() { // MemcacheClient is a memcache client that gets its server list from SRV // records, and periodically updates that ServerList. type MemcacheClient struct { - client *memcache.Client - serverList *memcache.ServerList - expiration int32 - hostname string - service string + client *memcache.Client + serverList *memcache.ServerList + expiration int32 + hostname string + service string + compressionLevel int quit chan struct{} wait sync.WaitGroup @@ -58,11 +59,12 @@ type MemcacheClient struct { // MemcacheConfig defines how a MemcacheClient should be constructed. type MemcacheConfig struct { - Host string - Service string - Timeout time.Duration - UpdateInterval time.Duration - Expiration time.Duration + Host string + Service string + Timeout time.Duration + UpdateInterval time.Duration + Expiration time.Duration + CompressionLevel int } // NewMemcacheClient creates a new MemcacheClient that gets its server list @@ -73,12 +75,13 @@ func NewMemcacheClient(config MemcacheConfig) *MemcacheClient { client.Timeout = config.Timeout newClient := &MemcacheClient{ - client: client, - serverList: &servers, - expiration: int32(config.Expiration.Seconds()), - hostname: config.Host, - service: config.Service, - quit: make(chan struct{}), + client: client, + serverList: &servers, + expiration: int32(config.Expiration.Seconds()), + hostname: config.Host, + service: config.Service, + compressionLevel: config.CompressionLevel, + quit: make(chan struct{}), } err := newClient.updateMemcacheServers() if err != nil { @@ -201,10 +204,13 @@ func (c *MemcacheClient) FetchReports(keys []string) (map[string]report.Report, return reports, missing, nil } -// StoreBytes stores a report, expecting the report to be serialized already. -func (c *MemcacheClient) StoreBytes(key string, content []byte) error { - return instrument.TimeRequestHistogramStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error { - item := memcache.Item{Key: key, Value: content, Expiration: c.expiration} +// StoreReport serializes and stores a report. +func (c *MemcacheClient) StoreReport(key string, report *report.Report) (int, error) { + var buf bytes.Buffer + report.WriteBinary(&buf, c.compressionLevel) + err := instrument.TimeRequestHistogramStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error { + item := memcache.Item{Key: key, Value: buf.Bytes(), Expiration: c.expiration} return c.client.Set(&item) }) + return buf.Len(), err } diff --git a/app/multitenant/s3_client.go b/app/multitenant/s3_client.go index 94deef5f3..9e1aaaeac 100644 --- a/app/multitenant/s3_client.go +++ b/app/multitenant/s3_client.go @@ -2,6 +2,7 @@ package multitenant import ( "bytes" + "compress/gzip" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -84,15 +85,19 @@ func (store *S3Store) fetchReport(key string) (*report.Report, error) { return report.MakeFromBinary(resp.Body) } -// StoreBytes stores a report in S3, expecting the report to be serialized -// already. -func (store *S3Store) StoreBytes(key string, content []byte) error { - return instrument.TimeRequestHistogram("Put", s3RequestDuration, func() error { +// StoreReport serializes and stores a report. +// +// Returns the size of the report. This only equals bytes written if err is nil. +func (store *S3Store) StoreReport(key string, report *report.Report) (int, error) { + var buf bytes.Buffer + report.WriteBinary(&buf, gzip.BestCompression) + err := instrument.TimeRequestHistogram("Put", s3RequestDuration, func() error { _, err := store.s3.PutObject(&s3.PutObjectInput{ - Body: bytes.NewReader(content), + Body: bytes.NewReader(buf.Bytes()), Bucket: aws.String(store.bucketName), Key: aws.String(key), }) return err }) + return buf.Len(), err } diff --git a/probe/appclient/report_publisher.go b/probe/appclient/report_publisher.go index 890d22c95..8d426d853 100644 --- a/probe/appclient/report_publisher.go +++ b/probe/appclient/report_publisher.go @@ -2,6 +2,7 @@ package appclient import ( "bytes" + "compress/gzip" "github.com/weaveworks/scope/report" ) @@ -28,6 +29,6 @@ func (p *ReportPublisher) Publish(r report.Report) error { }) } buf := &bytes.Buffer{} - r.WriteBinary(buf) + r.WriteBinary(buf, gzip.BestCompression) return p.publisher.Publish(buf) } diff --git a/prog/app.go b/prog/app.go index b63ace7a3..d2de8418e 100644 --- a/prog/app.go +++ b/prog/app.go @@ -82,7 +82,7 @@ func awsConfigFromURL(url *url.URL) (*aws.Config, error) { return config, nil } -func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHostname, memcachedHostname string, memcachedTimeout time.Duration, memcachedService string, memcachedExpiration, window time.Duration, createTables bool) (app.Collector, error) { +func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHostname, memcachedHostname string, memcachedTimeout time.Duration, memcachedService string, memcachedExpiration time.Duration, memcachedCompressionLevel int, window time.Duration, createTables bool) (app.Collector, error) { if collectorURL == "local" { return app.NewCollector(window), nil } @@ -115,11 +115,12 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo if memcachedHostname != "" { memcacheClient = multitenant.NewMemcacheClient( multitenant.MemcacheConfig{ - Host: memcachedHostname, - Timeout: memcachedTimeout, - Expiration: memcachedExpiration, - UpdateInterval: memcacheUpdateInterval, - Service: memcachedService, + Host: memcachedHostname, + Timeout: memcachedTimeout, + Expiration: memcachedExpiration, + UpdateInterval: memcacheUpdateInterval, + Service: memcachedService, + CompressionLevel: memcachedCompressionLevel, }, ) } @@ -216,7 +217,8 @@ func appMain(flags appFlags) { collector, err := collectorFactory( userIDer, flags.collectorURL, flags.s3URL, flags.natsHostname, flags.memcachedHostname, - flags.memcachedTimeout, flags.memcachedService, flags.memcachedExpiration, flags.window, flags.awsCreateTables) + flags.memcachedTimeout, flags.memcachedService, flags.memcachedExpiration, flags.memcachedCompressionLevel, + flags.window, flags.awsCreateTables) if err != nil { log.Fatalf("Error creating collector: %v", err) return diff --git a/prog/main.go b/prog/main.go index 44b99e060..0810a5b24 100644 --- a/prog/main.go +++ b/prog/main.go @@ -1,6 +1,7 @@ package main import ( + "compress/gzip" "flag" "fmt" "net" @@ -100,16 +101,17 @@ type appFlags struct { containerName string dockerEndpoint string - collectorURL string - s3URL string - controlRouterURL string - pipeRouterURL string - natsHostname string - memcachedHostname string - memcachedTimeout time.Duration - memcachedService string - memcachedExpiration time.Duration - userIDHeader string + collectorURL string + s3URL string + controlRouterURL string + pipeRouterURL string + natsHostname string + memcachedHostname string + memcachedTimeout time.Duration + memcachedService string + memcachedExpiration time.Duration + memcachedCompressionLevel int + userIDHeader string blockProfileRate int @@ -197,6 +199,7 @@ func main() { flag.DurationVar(&flags.app.memcachedTimeout, "app.memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.") flag.DurationVar(&flags.app.memcachedExpiration, "app.memcached.expiration", 2*15*time.Second, "How long reports stay in the memcache.") flag.StringVar(&flags.app.memcachedService, "app.memcached.service", "memcached", "SRV service used to discover memcache servers.") + flag.IntVar(&flags.app.memcachedCompressionLevel, "app.memcached.compression", gzip.DefaultCompression, "How much to compress reports stored in memcached.") flag.StringVar(&flags.app.userIDHeader, "app.userid.header", "", "HTTP header to use as userid") flag.IntVar(&flags.app.blockProfileRate, "app.block.profile.rate", 0, "If more than 0, enable block profiling. The profiler aims to sample an average of one blocking event per rate nanoseconds spent blocked.") diff --git a/report/marshal.go b/report/marshal.go index 2cafd0568..e362102f4 100644 --- a/report/marshal.go +++ b/report/marshal.go @@ -9,8 +9,8 @@ import ( ) // WriteBinary writes a Report as a gzipped msgpack. -func (rep Report) WriteBinary(w io.Writer) error { - gzwriter, err := gzip.NewWriterLevel(w, gzip.BestCompression) +func (rep Report) WriteBinary(w io.Writer, compressionLevel int) error { + gzwriter, err := gzip.NewWriterLevel(w, compressionLevel) if err != nil { return err } diff --git a/report/marshal_test.go b/report/marshal_test.go new file mode 100644 index 000000000..f779c4743 --- /dev/null +++ b/report/marshal_test.go @@ -0,0 +1,49 @@ +package report_test + +import ( + "bytes" + "compress/gzip" + "reflect" + "testing" + + "github.com/weaveworks/scope/report" +) + +func TestRoundtrip(t *testing.T) { + var buf bytes.Buffer + r1 := report.MakeReport() + r1.WriteBinary(&buf, gzip.BestCompression) + r2, err := report.MakeFromBinary(&buf) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(r1, *r2) { + t.Errorf("%v != %v", r1, *r2) + } +} + +func TestRoundtripNoCompression(t *testing.T) { + // Make sure that we can use our standard routines for decompressing + // something with '0' level compression. + var buf bytes.Buffer + r1 := report.MakeReport() + r1.WriteBinary(&buf, 0) + r2, err := report.MakeFromBinary(&buf) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(r1, *r2) { + t.Errorf("%v != %v", r1, *r2) + } +} + +func TestMoreCompressionMeansSmaller(t *testing.T) { + // Make sure that 0 level compression actually does compress less. + var buf1, buf2 bytes.Buffer + r := report.MakeReport() + r.WriteBinary(&buf1, gzip.BestCompression) + r.WriteBinary(&buf2, 0) + if buf1.Len() >= buf2.Len() { + t.Errorf("Compression doesn't change size: %v >= %v", buf1.Len(), buf2.Len()) + } +}