Merge pull request #1616 from weaveworks/debug-instrument-memcache

Refactor caching layers in AWS collector
This commit is contained in:
Jonathan Lange
2016-06-30 18:10:32 +01:00
committed by GitHub
4 changed files with 230 additions and 177 deletions

View File

@@ -13,9 +13,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/bluele/gcache"
"github.com/bradfitz/gomemcache/memcache"
"github.com/nats-io/nats"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
@@ -25,14 +23,12 @@ import (
)
const (
hourField = "hour"
tsField = "ts"
reportField = "report"
reportCacheSize = (15 / 3) * 10 * 5 // (window size * report rate) * number of hosts per user * number of users
reportCacheExpiration = 15 * time.Second
memcacheExpiration = 15 // seconds
memcacheUpdateInterval = 1 * time.Minute
natsTimeout = 10 * time.Second
hourField = "hour"
tsField = "ts"
reportField = "report"
reportCacheSize = (15 / 3) * 10 * 5 // (window size * report rate) * number of hosts per user * number of users
reportCacheExpiration = 15 * time.Second
natsTimeout = 10 * time.Second
)
var (
@@ -70,12 +66,6 @@ var (
Help: "Total compressed size of reports received in bytes.",
})
s3RequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "s3_request_duration_seconds",
Help: "Time in seconds spent doing S3 requests.",
}, []string{"method", "status_code"})
natsRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "scope",
Name: "nats_requests_total",
@@ -90,30 +80,38 @@ func init() {
prometheus.MustRegister(inProcessCacheRequests)
prometheus.MustRegister(inProcessCacheHits)
prometheus.MustRegister(reportSize)
prometheus.MustRegister(s3RequestDuration)
prometheus.MustRegister(natsRequests)
}
// DynamoDBCollector is a Collector which can also CreateTables
type DynamoDBCollector interface {
// AWSCollector is a Collector which can also CreateTables
type AWSCollector interface {
app.Collector
CreateTables() error
}
// ReportStore is a thing that we can get reports from.
type ReportStore interface {
FetchReports([]string) ([]report.Report, []string, error)
FetchReports([]string) (map[string]report.Report, []string, error)
}
type dynamoDBCollector struct {
userIDer UserIDer
db *dynamodb.DynamoDB
s3 *s3.S3
tableName string
bucketName string
merger app.Merger
inProcess inProcessStore
memcache *MemcacheClient
// AWSCollectorConfig has everything we need to make an AWS collector.
type AWSCollectorConfig struct {
UserIDer UserIDer
DynamoDBConfig *aws.Config
DynamoTable string
S3Store *S3Store
NatsHost string
MemcacheClient *MemcacheClient
}
type awsCollector struct {
userIDer UserIDer
db *dynamodb.DynamoDB
s3 *S3Store
tableName string
merger app.Merger
inProcess inProcessStore
memcache *MemcacheClient
nats *nats.Conn
waitersLock sync.Mutex
@@ -134,55 +132,33 @@ type watchKey struct {
c chan struct{}
}
// NewDynamoDBCollector the reaper of souls
// NewAWSCollector the elastic reaper of souls
// https://github.com/aws/aws-sdk-go/wiki/common-examples
func NewDynamoDBCollector(
userIDer UserIDer,
dynamoDBConfig, s3Config *aws.Config,
tableName, bucketName, natsHost, memcachedHost string,
memcachedTimeout time.Duration, memcachedService string,
) (DynamoDBCollector, error) {
func NewAWSCollector(config AWSCollectorConfig) (AWSCollector, error) {
var nc *nats.Conn
if natsHost != "" {
if config.NatsHost != "" {
var err error
nc, err = nats.Connect(natsHost)
nc, err = nats.Connect(config.NatsHost)
if err != nil {
return nil, err
}
}
var memcacheClient *MemcacheClient
if memcachedHost != "" {
var err error
memcacheClient, err = NewMemcacheClient(memcachedHost, memcachedTimeout, memcachedService, memcacheUpdateInterval, memcacheExpiration)
if err != nil {
// TODO(jml): Ideally, we wouldn't abort here, we would instead
// log errors when we try to use the memcache & fail to do so, as
// aborting here introduces ordering dependencies into our
// deployment.
//
// Note: this error only happens when either the memcachedHost or
// any of the SRV records that it points to fail to resolve.
return nil, err
}
}
return &dynamoDBCollector{
db: dynamodb.New(session.New(dynamoDBConfig)),
s3: s3.New(session.New(s3Config)),
userIDer: userIDer,
tableName: tableName,
bucketName: bucketName,
merger: app.NewSmartMerger(),
inProcess: newInProcessStore(reportCacheSize, reportCacheExpiration),
memcache: memcacheClient,
nats: nc,
waiters: map[watchKey]*nats.Subscription{},
return &awsCollector{
db: dynamodb.New(session.New(config.DynamoDBConfig)),
s3: config.S3Store,
userIDer: config.UserIDer,
tableName: config.DynamoTable,
merger: app.NewSmartMerger(),
inProcess: newInProcessStore(reportCacheSize, reportCacheExpiration),
memcache: config.MemcacheClient,
nats: nc,
waiters: map[watchKey]*nats.Subscription{},
}, nil
}
// CreateDynamoDBTables creates the required tables in dynamodb
func (c *dynamoDBCollector) CreateTables() error {
// CreateTables creates the required tables in dynamodb
func (c *awsCollector) CreateTables() error {
// see if tableName exists
resp, err := c.db.ListTables(&dynamodb.ListTablesInput{
Limit: aws.Int64(10),
@@ -234,7 +210,8 @@ func (c *dynamoDBCollector) CreateTables() error {
}
// getReportKeys gets the s3 keys for reports in this range
func (c *dynamoDBCollector) getReportKeys(rowKey string, start, end time.Time) ([]string, error) {
func (c *awsCollector) getReportKeys(userid string, row int64, start, end time.Time) ([]string, error) {
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
var resp *dynamodb.QueryOutput
err := timeRequest("Query", dynamoRequestDuration, func() error {
var err error
@@ -281,64 +258,15 @@ func (c *dynamoDBCollector) getReportKeys(rowKey string, start, end time.Time) (
return result, nil
}
// Fetch multiple reports in parallel from S3.
func (c *dynamoDBCollector) getNonCached(reportKeys []string) ([]report.Report, error) {
type result struct {
key string
report *report.Report
err error
}
ch := make(chan result, len(reportKeys))
for _, reportKey := range reportKeys {
go func(reportKey string) {
r := result{key: reportKey}
r.report, r.err = c.getNonCachedReport(reportKey)
ch <- r
}(reportKey)
}
reports := []report.Report{}
for range reportKeys {
r := <-ch
if r.err != nil {
return nil, r.err
}
reports = append(reports, *r.report)
c.inProcess.StoreReport(r.key, *r.report)
}
return reports, nil
}
// Fetch a single report from S3.
func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (*report.Report, error) {
var resp *s3.GetObjectOutput
err := timeRequest("Get", s3RequestDuration, func() error {
var err error
resp, err = c.s3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(c.bucketName),
Key: aws.String(reportKey),
})
return err
})
if err != nil {
return nil, err
}
return report.MakeFromBinary(resp.Body)
}
func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time.Time) ([]report.Report, error) {
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
missing, err := c.getReportKeys(rowKey, start, end)
if err != nil {
return nil, err
}
func (c *awsCollector) getReports(reportKeys []string) ([]report.Report, error) {
missing := reportKeys
stores := []ReportStore{c.inProcess}
if c.memcache != nil {
stores = append(stores, c.memcache)
}
stores = append(stores, c.s3)
var reports []report.Report
for _, store := range stores {
if store == nil {
@@ -348,69 +276,62 @@ func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time
if err != nil {
log.Warningf("Error fetching from cache: %v", err)
}
reports = append(reports, found...)
for key, report := range found {
c.inProcess.StoreReport(key, report)
reports = append(reports, report)
}
if len(missing) == 0 {
return reports, nil
}
}
fetchedReports, err := c.getNonCached(missing)
if err != nil {
return nil, err
if len(missing) > 0 {
return nil, fmt.Errorf("Error fetching from s3, still have missing reports: %v", missing)
}
return append(reports, fetchedReports...), nil
return reports, nil
}
func memcacheStatusCode(err error) string {
// See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables
switch err {
case nil:
return "200"
case memcache.ErrCacheMiss:
return "404"
case memcache.ErrMalformedKey:
return "400"
default:
return "500"
}
}
func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) {
func (c *awsCollector) Report(ctx context.Context) (report.Report, error) {
var (
now = time.Now()
start = now.Add(-15 * time.Second)
rowStart, rowEnd = start.UnixNano() / time.Hour.Nanoseconds(), now.UnixNano() / time.Hour.Nanoseconds()
userid, err = c.userIDer(ctx)
reports []report.Report
)
if err != nil {
return report.MakeReport(), err
}
// Queries will only every span 2 rows max.
var reportKeys []string
if rowStart != rowEnd {
reports1, err := c.getReports(userid, rowStart, start, now)
reportKeys1, err := c.getReportKeys(userid, rowStart, start, now)
if err != nil {
return report.MakeReport(), err
}
reports2, err := c.getReports(userid, rowEnd, start, now)
reportKeys2, err := c.getReportKeys(userid, rowEnd, start, now)
if err != nil {
return report.MakeReport(), err
}
reports = append(reports1, reports2...)
reportKeys = append(reportKeys, reportKeys1...)
reportKeys = append(reportKeys, reportKeys2...)
} else {
if reports, err = c.getReports(userid, rowEnd, start, now); err != nil {
if reportKeys, err = c.getReportKeys(userid, rowEnd, start, now); err != nil {
return report.MakeReport(), err
}
}
reports, err := c.getReports(reportKeys)
if err != nil {
return report.MakeReport(), err
}
return c.merger.Merge(reports), nil
}
func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {
func (c *awsCollector) Add(ctx context.Context, rep report.Report) error {
userid, err := c.userIDer(ctx)
if err != nil {
return err
@@ -430,24 +351,14 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {
return err
}
s3Key := fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey)
err = timeRequest("Put", s3RequestDuration, func() error {
var err error
_, err = c.s3.PutObject(&s3.PutObjectInput{
Body: bytes.NewReader(buf.Bytes()),
Bucket: aws.String(c.bucketName),
Key: aws.String(s3Key),
})
return err
})
err = c.s3.StoreBytes(s3Key, buf.Bytes())
if err != nil {
return err
}
// third, put it in memcache
if c.memcache != nil {
err = timeRequestStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error {
return c.memcache.StoreBytes(s3Key, buf.Bytes())
})
err = c.memcache.StoreBytes(s3Key, buf.Bytes())
if err != nil {
// NOTE: We don't abort here because failing to store in memcache
// doesn't actually break anything else -- it's just an
@@ -499,7 +410,7 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {
return nil
}
func (c *dynamoDBCollector) WaitOn(ctx context.Context, waiter chan struct{}) {
func (c *awsCollector) WaitOn(ctx context.Context, waiter chan struct{}) {
userid, err := c.userIDer(ctx)
if err != nil {
log.Errorf("Error getting user id in WaitOn: %v", err)
@@ -540,7 +451,7 @@ func (c *dynamoDBCollector) WaitOn(ctx context.Context, waiter chan struct{}) {
}()
}
func (c *dynamoDBCollector) UnWait(ctx context.Context, waiter chan struct{}) {
func (c *awsCollector) UnWait(ctx context.Context, waiter chan struct{}) {
userid, err := c.userIDer(ctx)
if err != nil {
log.Errorf("Error getting user id in WaitOn: %v", err)
@@ -574,13 +485,13 @@ func newInProcessStore(size int, expiration time.Duration) inProcessStore {
}
// FetchReports retrieves the given reports from the store.
func (c inProcessStore) FetchReports(keys []string) ([]report.Report, []string, error) {
found := []report.Report{}
func (c inProcessStore) FetchReports(keys []string) (map[string]report.Report, []string, error) {
found := map[string]report.Report{}
missing := []string{}
for _, key := range keys {
rpt, err := c.cache.Get(key)
if err == nil {
found = append(found, rpt.(report.Report))
found[key] = rpt.(report.Report)
} else {
missing = append(missing, key)
}

View File

@@ -119,8 +119,22 @@ func (c *MemcacheClient) updateMemcacheServers() error {
return c.serverList.SetServers(servers...)
}
func memcacheStatusCode(err error) string {
// See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables
switch err {
case nil:
return "200"
case memcache.ErrCacheMiss:
return "404"
case memcache.ErrMalformedKey:
return "400"
default:
return "500"
}
}
// FetchReports gets reports from memcache.
func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string, error) {
func (c *MemcacheClient) FetchReports(keys []string) (map[string]report.Report, []string, error) {
var found map[string]*memcache.Item
err := timeRequestStatus("Get", memcacheRequestDuration, memcacheStatusCode, func() error {
var err error
@@ -151,17 +165,17 @@ func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string,
ch <- result{key: key}
return
}
ch <- result{report: rep}
ch <- result{key: key, report: rep}
}(key)
}
var reports []report.Report
var reports map[string]report.Report
for i := 0; i < len(keys)-len(missing); i++ {
r := <-ch
if r.report == nil {
missing = append(missing, r.key)
} else {
reports = append(reports, *r.report)
reports[r.key] = *r.report
}
}
@@ -172,6 +186,8 @@ func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string,
// StoreBytes stores a report, expecting the report to be serialized already.
func (c *MemcacheClient) StoreBytes(key string, content []byte) error {
item := memcache.Item{Key: key, Value: content, Expiration: c.expiration}
return c.client.Set(&item)
return timeRequestStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error {
item := memcache.Item{Key: key, Value: content, Expiration: c.expiration}
return c.client.Set(&item)
})
}

View File

@@ -0,0 +1,96 @@
package multitenant
import (
"bytes"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/scope/report"
)
var (
s3RequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "s3_request_duration_seconds",
Help: "Time in seconds spent doing S3 requests.",
}, []string{"method", "status_code"})
)
// S3Store is an S3 client that stores and retrieves Reports.
type S3Store struct {
s3 *s3.S3
bucketName string
}
func init() {
prometheus.MustRegister(s3RequestDuration)
}
// NewS3Client creates a new S3 client.
func NewS3Client(config *aws.Config, bucketName string) S3Store {
return S3Store{
s3: s3.New(session.New(config)),
bucketName: bucketName,
}
}
// FetchReports fetches multiple reports in parallel from S3.
func (store *S3Store) FetchReports(keys []string) (map[string]report.Report, []string, error) {
type result struct {
key string
report *report.Report
err error
}
ch := make(chan result, len(keys))
for _, key := range keys {
go func(key string) {
r := result{key: key}
r.report, r.err = store.fetchReport(key)
ch <- r
}(key)
}
reports := map[string]report.Report{}
for range keys {
r := <-ch
if r.err != nil {
return nil, []string{}, r.err
}
reports[r.key] = *r.report
}
return reports, []string{}, nil
}
func (store *S3Store) fetchReport(key string) (*report.Report, error) {
var resp *s3.GetObjectOutput
err := timeRequest("Get", s3RequestDuration, func() error {
var err error
resp, err = store.s3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(store.bucketName),
Key: aws.String(key),
})
return err
})
if err != nil {
return nil, err
}
return report.MakeFromBinary(resp.Body)
}
// StoreBytes stores a report in S3, expecting the report to be serialized
// already.
func (store *S3Store) StoreBytes(key string, content []byte) error {
return timeRequest("Put", s3RequestDuration, func() error {
_, err := store.s3.PutObject(&s3.PutObjectInput{
Body: bytes.NewReader(content),
Bucket: aws.String(store.bucketName),
Key: aws.String(key),
})
return err
})
}

View File

@@ -27,6 +27,11 @@ import (
"github.com/weaveworks/scope/probe/docker"
)
const (
memcacheExpiration = 15 // seconds
memcacheUpdateInterval = 1 * time.Minute
)
var (
requestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
@@ -99,21 +104,46 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo
if err != nil {
return nil, err
}
tableName := strings.TrimPrefix(parsed.Path, "/")
bucketName := strings.TrimPrefix(s3.Path, "/")
dynamoCollector, err := multitenant.NewDynamoDBCollector(
userIDer, dynamoDBConfig, s3Config, tableName, bucketName, natsHostname,
memcachedHostname, memcachedTimeout, memcachedService,
tableName := strings.TrimPrefix(parsed.Path, "/")
s3Store := multitenant.NewS3Client(s3Config, bucketName)
var memcacheClient *multitenant.MemcacheClient
if memcachedHostname != "" {
memcacheClient, err = multitenant.NewMemcacheClient(
memcachedHostname, memcachedTimeout, memcachedService,
memcacheUpdateInterval, memcacheExpiration,
)
if err != nil {
// TODO(jml): Ideally, we wouldn't abort here, we would instead
// log errors when we try to use the memcache & fail to do so, as
// aborting here introduces ordering dependencies into our
// deployment.
//
// Note: this error only happens when either the memcachedHost
// or any of the SRV records that it points to fail to
// resolve.
return nil, err
}
}
awsCollector, err := multitenant.NewAWSCollector(
multitenant.AWSCollectorConfig{
UserIDer: userIDer,
DynamoDBConfig: dynamoDBConfig,
DynamoTable: tableName,
S3Store: &s3Store,
NatsHost: natsHostname,
MemcacheClient: memcacheClient,
},
)
if err != nil {
return nil, err
}
if createTables {
if err := dynamoCollector.CreateTables(); err != nil {
if err := awsCollector.CreateTables(); err != nil {
return nil, err
}
}
return dynamoCollector, nil
return awsCollector, nil
}
return nil, fmt.Errorf("Invalid collector '%s'", collectorURL)