Merge pull request #1684 from weaveworks/decompress-memcache-1671

Add options for storing memcached reports with different compression levels
This commit is contained in:
Jonathan Lange
2016-07-15 14:03:38 +01:00
committed by GitHub
8 changed files with 141 additions and 66 deletions

View File

@@ -1,7 +1,6 @@
package multitenant
import (
"bytes"
"crypto/md5"
"fmt"
"io"
@@ -338,45 +337,55 @@ func (c *awsCollector) Report(ctx context.Context) (report.Report, error) {
return c.merger.Merge(reports), nil
}
// calculateDynamoKeys generates the row & column keys for Dynamo.
func calculateDynamoKeys(userid string, now time.Time) (string, string) {
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(now.UnixNano()/time.Hour.Nanoseconds(), 10))
colKey := strconv.FormatInt(now.UnixNano(), 10)
return rowKey, colKey
}
// calculateReportKey determines the key we should use for a report.
func calculateReportKey(rowKey, colKey string) (string, error) {
rowKeyHash := md5.New()
if _, err := io.WriteString(rowKeyHash, rowKey); err != nil {
return "", err
}
return fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey), nil
}
func (c *awsCollector) Add(ctx context.Context, rep report.Report) error {
userid, err := c.userIDer(ctx)
if err != nil {
return err
}
// first, encode the report into a buffer and record its size
var buf bytes.Buffer
rep.WriteBinary(&buf)
reportSizeHistogram.Observe(float64(buf.Len()))
// second, put the report on s3
now := time.Now()
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(now.UnixNano()/time.Hour.Nanoseconds(), 10))
colKey := strconv.FormatInt(now.UnixNano(), 10)
rowKeyHash := md5.New()
if _, err := io.WriteString(rowKeyHash, rowKey); err != nil {
return err
}
s3Key := fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey)
err = c.s3.StoreBytes(s3Key, buf.Bytes())
// first, put the report on s3
rowKey, colKey := calculateDynamoKeys(userid, time.Now())
reportKey, err := calculateReportKey(rowKey, colKey)
if err != nil {
return err
}
reportSize, err := c.s3.StoreReport(reportKey, &rep)
if err != nil {
return err
}
reportSizeHistogram.Observe(float64(reportSize))
// third, put it in memcache
if c.memcache != nil {
err = c.memcache.StoreBytes(s3Key, buf.Bytes())
_, err = c.memcache.StoreReport(reportKey, &rep)
if err != nil {
// NOTE: We don't abort here because failing to store in memcache
// doesn't actually break anything else -- it's just an
// optimization.
log.Warningf("Could not store %v in memcache: %v", s3Key, err)
log.Warningf("Could not store %v in memcache: %v", reportKey, err)
}
}
// fourth, put the key in dynamodb
dynamoValueSize.WithLabelValues("PutItem").
Add(float64(len(s3Key)))
Add(float64(len(reportKey)))
var resp *dynamodb.PutItemOutput
err = instrument.TimeRequestHistogram("PutItem", dynamoRequestDuration, func() error {
@@ -391,7 +400,7 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report) error {
N: aws.String(colKey),
},
reportField: {
S: aws.String(s3Key),
S: aws.String(reportKey),
},
},
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
@@ -407,7 +416,7 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report) error {
}
if rep.Shortcut && c.nats != nil {
err := c.nats.Publish(userid, []byte(s3Key))
err := c.nats.Publish(userid, []byte(reportKey))
natsRequests.WithLabelValues("Publish", instrument.ErrorCode(err)).Add(1)
if err != nil {
log.Errorf("Error sending shortcut report: %v", err)

View File

@@ -46,11 +46,12 @@ func init() {
// MemcacheClient is a memcache client that gets its server list from SRV
// records, and periodically updates that ServerList.
type MemcacheClient struct {
client *memcache.Client
serverList *memcache.ServerList
expiration int32
hostname string
service string
client *memcache.Client
serverList *memcache.ServerList
expiration int32
hostname string
service string
compressionLevel int
quit chan struct{}
wait sync.WaitGroup
@@ -58,11 +59,12 @@ type MemcacheClient struct {
// MemcacheConfig defines how a MemcacheClient should be constructed.
type MemcacheConfig struct {
Host string
Service string
Timeout time.Duration
UpdateInterval time.Duration
Expiration time.Duration
Host string
Service string
Timeout time.Duration
UpdateInterval time.Duration
Expiration time.Duration
CompressionLevel int
}
// NewMemcacheClient creates a new MemcacheClient that gets its server list
@@ -73,12 +75,13 @@ func NewMemcacheClient(config MemcacheConfig) *MemcacheClient {
client.Timeout = config.Timeout
newClient := &MemcacheClient{
client: client,
serverList: &servers,
expiration: int32(config.Expiration.Seconds()),
hostname: config.Host,
service: config.Service,
quit: make(chan struct{}),
client: client,
serverList: &servers,
expiration: int32(config.Expiration.Seconds()),
hostname: config.Host,
service: config.Service,
compressionLevel: config.CompressionLevel,
quit: make(chan struct{}),
}
err := newClient.updateMemcacheServers()
if err != nil {
@@ -201,10 +204,13 @@ func (c *MemcacheClient) FetchReports(keys []string) (map[string]report.Report,
return reports, missing, nil
}
// StoreBytes stores a report, expecting the report to be serialized already.
func (c *MemcacheClient) StoreBytes(key string, content []byte) error {
return instrument.TimeRequestHistogramStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error {
item := memcache.Item{Key: key, Value: content, Expiration: c.expiration}
// StoreReport serializes and stores a report.
func (c *MemcacheClient) StoreReport(key string, report *report.Report) (int, error) {
var buf bytes.Buffer
report.WriteBinary(&buf, c.compressionLevel)
err := instrument.TimeRequestHistogramStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error {
item := memcache.Item{Key: key, Value: buf.Bytes(), Expiration: c.expiration}
return c.client.Set(&item)
})
return buf.Len(), err
}

View File

@@ -2,6 +2,7 @@ package multitenant
import (
"bytes"
"compress/gzip"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
@@ -84,15 +85,19 @@ func (store *S3Store) fetchReport(key string) (*report.Report, error) {
return report.MakeFromBinary(resp.Body)
}
// StoreBytes stores a report in S3, expecting the report to be serialized
// already.
func (store *S3Store) StoreBytes(key string, content []byte) error {
return instrument.TimeRequestHistogram("Put", s3RequestDuration, func() error {
// StoreReport serializes and stores a report.
//
// Returns the size of the report. This only equals bytes written if err is nil.
func (store *S3Store) StoreReport(key string, report *report.Report) (int, error) {
var buf bytes.Buffer
report.WriteBinary(&buf, gzip.BestCompression)
err := instrument.TimeRequestHistogram("Put", s3RequestDuration, func() error {
_, err := store.s3.PutObject(&s3.PutObjectInput{
Body: bytes.NewReader(content),
Body: bytes.NewReader(buf.Bytes()),
Bucket: aws.String(store.bucketName),
Key: aws.String(key),
})
return err
})
return buf.Len(), err
}

View File

@@ -2,6 +2,7 @@ package appclient
import (
"bytes"
"compress/gzip"
"github.com/weaveworks/scope/report"
)
@@ -28,6 +29,6 @@ func (p *ReportPublisher) Publish(r report.Report) error {
})
}
buf := &bytes.Buffer{}
r.WriteBinary(buf)
r.WriteBinary(buf, gzip.BestCompression)
return p.publisher.Publish(buf)
}

View File

@@ -82,7 +82,7 @@ func awsConfigFromURL(url *url.URL) (*aws.Config, error) {
return config, nil
}
func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHostname, memcachedHostname string, memcachedTimeout time.Duration, memcachedService string, memcachedExpiration, window time.Duration, createTables bool) (app.Collector, error) {
func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHostname, memcachedHostname string, memcachedTimeout time.Duration, memcachedService string, memcachedExpiration time.Duration, memcachedCompressionLevel int, window time.Duration, createTables bool) (app.Collector, error) {
if collectorURL == "local" {
return app.NewCollector(window), nil
}
@@ -115,11 +115,12 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo
if memcachedHostname != "" {
memcacheClient = multitenant.NewMemcacheClient(
multitenant.MemcacheConfig{
Host: memcachedHostname,
Timeout: memcachedTimeout,
Expiration: memcachedExpiration,
UpdateInterval: memcacheUpdateInterval,
Service: memcachedService,
Host: memcachedHostname,
Timeout: memcachedTimeout,
Expiration: memcachedExpiration,
UpdateInterval: memcacheUpdateInterval,
Service: memcachedService,
CompressionLevel: memcachedCompressionLevel,
},
)
}
@@ -216,7 +217,8 @@ func appMain(flags appFlags) {
collector, err := collectorFactory(
userIDer, flags.collectorURL, flags.s3URL, flags.natsHostname, flags.memcachedHostname,
flags.memcachedTimeout, flags.memcachedService, flags.memcachedExpiration, flags.window, flags.awsCreateTables)
flags.memcachedTimeout, flags.memcachedService, flags.memcachedExpiration, flags.memcachedCompressionLevel,
flags.window, flags.awsCreateTables)
if err != nil {
log.Fatalf("Error creating collector: %v", err)
return

View File

@@ -1,6 +1,7 @@
package main
import (
"compress/gzip"
"flag"
"fmt"
"net"
@@ -100,16 +101,17 @@ type appFlags struct {
containerName string
dockerEndpoint string
collectorURL string
s3URL string
controlRouterURL string
pipeRouterURL string
natsHostname string
memcachedHostname string
memcachedTimeout time.Duration
memcachedService string
memcachedExpiration time.Duration
userIDHeader string
collectorURL string
s3URL string
controlRouterURL string
pipeRouterURL string
natsHostname string
memcachedHostname string
memcachedTimeout time.Duration
memcachedService string
memcachedExpiration time.Duration
memcachedCompressionLevel int
userIDHeader string
blockProfileRate int
@@ -197,6 +199,7 @@ func main() {
flag.DurationVar(&flags.app.memcachedTimeout, "app.memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.")
flag.DurationVar(&flags.app.memcachedExpiration, "app.memcached.expiration", 2*15*time.Second, "How long reports stay in the memcache.")
flag.StringVar(&flags.app.memcachedService, "app.memcached.service", "memcached", "SRV service used to discover memcache servers.")
flag.IntVar(&flags.app.memcachedCompressionLevel, "app.memcached.compression", gzip.DefaultCompression, "How much to compress reports stored in memcached.")
flag.StringVar(&flags.app.userIDHeader, "app.userid.header", "", "HTTP header to use as userid")
flag.IntVar(&flags.app.blockProfileRate, "app.block.profile.rate", 0, "If more than 0, enable block profiling. The profiler aims to sample an average of one blocking event per rate nanoseconds spent blocked.")

View File

@@ -9,8 +9,8 @@ import (
)
// WriteBinary writes a Report as a gzipped msgpack.
func (rep Report) WriteBinary(w io.Writer) error {
gzwriter, err := gzip.NewWriterLevel(w, gzip.BestCompression)
func (rep Report) WriteBinary(w io.Writer, compressionLevel int) error {
gzwriter, err := gzip.NewWriterLevel(w, compressionLevel)
if err != nil {
return err
}

49
report/marshal_test.go Normal file
View File

@@ -0,0 +1,49 @@
package report_test
import (
"bytes"
"compress/gzip"
"reflect"
"testing"
"github.com/weaveworks/scope/report"
)
func TestRoundtrip(t *testing.T) {
var buf bytes.Buffer
r1 := report.MakeReport()
r1.WriteBinary(&buf, gzip.BestCompression)
r2, err := report.MakeFromBinary(&buf)
if err != nil {
t.Error(err)
}
if !reflect.DeepEqual(r1, *r2) {
t.Errorf("%v != %v", r1, *r2)
}
}
func TestRoundtripNoCompression(t *testing.T) {
// Make sure that we can use our standard routines for decompressing
// something with '0' level compression.
var buf bytes.Buffer
r1 := report.MakeReport()
r1.WriteBinary(&buf, 0)
r2, err := report.MakeFromBinary(&buf)
if err != nil {
t.Error(err)
}
if !reflect.DeepEqual(r1, *r2) {
t.Errorf("%v != %v", r1, *r2)
}
}
func TestMoreCompressionMeansSmaller(t *testing.T) {
// Make sure that 0 level compression actually does compress less.
var buf1, buf2 bytes.Buffer
r := report.MakeReport()
r.WriteBinary(&buf1, gzip.BestCompression)
r.WriteBinary(&buf2, 0)
if buf1.Len() >= buf2.Len() {
t.Errorf("Compression doesn't change size: %v >= %v", buf1.Len(), buf2.Len())
}
}