From e2bda8f670e5c0379fdedcb6bca60104e9f1bc55 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Fri, 24 Jun 2016 14:33:56 +0100 Subject: [PATCH 1/8] Move last memcache bits out of dynamo_collector --- app/multitenant/dynamo_collector.go | 19 +------------------ app/multitenant/memcache_client.go | 20 ++++++++++++++++++-- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/app/multitenant/dynamo_collector.go b/app/multitenant/dynamo_collector.go index 5f5301059..c639be062 100644 --- a/app/multitenant/dynamo_collector.go +++ b/app/multitenant/dynamo_collector.go @@ -15,7 +15,6 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/s3" "github.com/bluele/gcache" - "github.com/bradfitz/gomemcache/memcache" "github.com/nats-io/nats" "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/context" @@ -362,20 +361,6 @@ func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time return append(reports, fetchedReports...), nil } -func memcacheStatusCode(err error) string { - // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables - switch err { - case nil: - return "200" - case memcache.ErrCacheMiss: - return "404" - case memcache.ErrMalformedKey: - return "400" - default: - return "500" - } -} - func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) { var ( now = time.Now() @@ -445,9 +430,7 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error { // third, put it in memcache if c.memcache != nil { - err = timeRequestStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error { - return c.memcache.StoreBytes(s3Key, buf.Bytes()) - }) + err = c.memcache.StoreBytes(s3Key, buf.Bytes()) if err != nil { // NOTE: We don't abort here because failing to store in memcache // doesn't actually break anything else -- it's just an diff --git a/app/multitenant/memcache_client.go b/app/multitenant/memcache_client.go index 2dcd4b14c..6aed287eb 100644 --- a/app/multitenant/memcache_client.go +++ b/app/multitenant/memcache_client.go @@ -119,6 +119,20 @@ func (c *MemcacheClient) updateMemcacheServers() error { return c.serverList.SetServers(servers...) } +func memcacheStatusCode(err error) string { + // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables + switch err { + case nil: + return "200" + case memcache.ErrCacheMiss: + return "404" + case memcache.ErrMalformedKey: + return "400" + default: + return "500" + } +} + // FetchReports gets reports from memcache. func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string, error) { var found map[string]*memcache.Item @@ -172,6 +186,8 @@ func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string, // StoreBytes stores a report, expecting the report to be serialized already. func (c *MemcacheClient) StoreBytes(key string, content []byte) error { - item := memcache.Item{Key: key, Value: content, Expiration: c.expiration} - return c.client.Set(&item) + return timeRequestStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error { + item := memcache.Item{Key: key, Value: content, Expiration: c.expiration} + return c.client.Set(&item) + }) } From 87da22767e651f784b78f1b874daafab501d4f3c Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Fri, 24 Jun 2016 15:16:35 +0100 Subject: [PATCH 2/8] Move s3 logic to separate file --- app/multitenant/dynamo_collector.go | 111 ++++++---------------------- app/multitenant/s3_client.go | 96 ++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 87 deletions(-) create mode 100644 app/multitenant/s3_client.go diff --git a/app/multitenant/dynamo_collector.go b/app/multitenant/dynamo_collector.go index c639be062..bc827c0e8 100644 --- a/app/multitenant/dynamo_collector.go +++ b/app/multitenant/dynamo_collector.go @@ -13,7 +13,6 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" - "github.com/aws/aws-sdk-go/service/s3" "github.com/bluele/gcache" "github.com/nats-io/nats" "github.com/prometheus/client_golang/prometheus" @@ -69,12 +68,6 @@ var ( Help: "Total compressed size of reports received in bytes.", }) - s3RequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: "scope", - Name: "s3_request_duration_seconds", - Help: "Time in seconds spent doing S3 requests.", - }, []string{"method", "status_code"}) - natsRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "scope", Name: "nats_requests_total", @@ -89,7 +82,6 @@ func init() { prometheus.MustRegister(inProcessCacheRequests) prometheus.MustRegister(inProcessCacheHits) prometheus.MustRegister(reportSize) - prometheus.MustRegister(s3RequestDuration) prometheus.MustRegister(natsRequests) } @@ -105,14 +97,13 @@ type ReportStore interface { } type dynamoDBCollector struct { - userIDer UserIDer - db *dynamodb.DynamoDB - s3 *s3.S3 - tableName string - bucketName string - merger app.Merger - inProcess inProcessStore - memcache *MemcacheClient + userIDer UserIDer + db *dynamodb.DynamoDB + s3 *S3Store + tableName string + merger app.Merger + inProcess inProcessStore + memcache *MemcacheClient nats *nats.Conn waitersLock sync.Mutex @@ -150,6 +141,8 @@ func NewDynamoDBCollector( } } + s3Store := NewS3Client(s3Config, bucketName) + var memcacheClient *MemcacheClient if memcachedHost != "" { var err error @@ -167,16 +160,15 @@ func NewDynamoDBCollector( } return &dynamoDBCollector{ - db: dynamodb.New(session.New(dynamoDBConfig)), - s3: s3.New(session.New(s3Config)), - userIDer: userIDer, - tableName: tableName, - bucketName: bucketName, - merger: app.NewSmartMerger(), - inProcess: newInProcessStore(reportCacheSize, reportCacheExpiration), - memcache: memcacheClient, - nats: nc, - waiters: map[watchKey]*nats.Subscription{}, + db: dynamodb.New(session.New(dynamoDBConfig)), + s3: &s3Store, + userIDer: userIDer, + tableName: tableName, + merger: app.NewSmartMerger(), + inProcess: newInProcessStore(reportCacheSize, reportCacheExpiration), + memcache: memcacheClient, + nats: nc, + waiters: map[watchKey]*nats.Subscription{}, }, nil } @@ -280,53 +272,6 @@ func (c *dynamoDBCollector) getReportKeys(rowKey string, start, end time.Time) ( return result, nil } -// Fetch multiple reports in parallel from S3. -func (c *dynamoDBCollector) getNonCached(reportKeys []string) ([]report.Report, error) { - type result struct { - key string - report *report.Report - err error - } - - ch := make(chan result, len(reportKeys)) - - for _, reportKey := range reportKeys { - go func(reportKey string) { - r := result{key: reportKey} - r.report, r.err = c.getNonCachedReport(reportKey) - ch <- r - }(reportKey) - } - - reports := []report.Report{} - for range reportKeys { - r := <-ch - if r.err != nil { - return nil, r.err - } - reports = append(reports, *r.report) - c.inProcess.StoreReport(r.key, *r.report) - } - return reports, nil -} - -// Fetch a single report from S3. -func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (*report.Report, error) { - var resp *s3.GetObjectOutput - err := timeRequest("Get", s3RequestDuration, func() error { - var err error - resp, err = c.s3.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(c.bucketName), - Key: aws.String(reportKey), - }) - return err - }) - if err != nil { - return nil, err - } - return report.MakeFromBinary(resp.Body) -} - func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time.Time) ([]report.Report, error) { rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10)) missing, err := c.getReportKeys(rowKey, start, end) @@ -338,6 +283,8 @@ func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time if c.memcache != nil { stores = append(stores, c.memcache) } + stores = append(stores, c.s3) + var reports []report.Report for _, store := range stores { if store == nil { @@ -353,12 +300,10 @@ func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time } } - fetchedReports, err := c.getNonCached(missing) - if err != nil { - return nil, err + if len(missing) > 0 { + return nil, fmt.Errorf("Error fetching from s3, still have missing reports: %v", missing) } - - return append(reports, fetchedReports...), nil + return reports, nil } func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) { @@ -415,15 +360,7 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error { return err } s3Key := fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey) - err = timeRequest("Put", s3RequestDuration, func() error { - var err error - _, err = c.s3.PutObject(&s3.PutObjectInput{ - Body: bytes.NewReader(buf.Bytes()), - Bucket: aws.String(c.bucketName), - Key: aws.String(s3Key), - }) - return err - }) + err = c.s3.StoreBytes(s3Key, buf.Bytes()) if err != nil { return err } diff --git a/app/multitenant/s3_client.go b/app/multitenant/s3_client.go new file mode 100644 index 000000000..9dca8462d --- /dev/null +++ b/app/multitenant/s3_client.go @@ -0,0 +1,96 @@ +package multitenant + +import ( + "bytes" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/prometheus/client_golang/prometheus" + + "github.com/weaveworks/scope/report" +) + +var ( + s3RequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "scope", + Name: "s3_request_duration_seconds", + Help: "Time in seconds spent doing S3 requests.", + }, []string{"method", "status_code"}) +) + +// S3Store is an S3 client that stores and retrieves Reports. +type S3Store struct { + s3 *s3.S3 + bucketName string +} + +func init() { + prometheus.MustRegister(s3RequestDuration) +} + +// NewS3Client creates a new S3 client. +func NewS3Client(config *aws.Config, bucketName string) S3Store { + return S3Store{ + s3: s3.New(session.New(config)), + bucketName: bucketName, + } +} + +// FetchReports fetches multiple reports in parallel from S3. +func (store *S3Store) FetchReports(keys []string) ([]report.Report, []string, error) { + type result struct { + key string + report *report.Report + err error + } + + ch := make(chan result, len(keys)) + + for _, key := range keys { + go func(key string) { + r := result{key: key} + r.report, r.err = store.fetchReport(key) + ch <- r + }(key) + } + + reports := []report.Report{} + for range keys { + r := <-ch + if r.err != nil { + return nil, []string{}, r.err + } + reports = append(reports, *r.report) + } + return reports, []string{}, nil +} + +func (store *S3Store) fetchReport(key string) (*report.Report, error) { + var resp *s3.GetObjectOutput + err := timeRequest("Get", s3RequestDuration, func() error { + var err error + resp, err = store.s3.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(store.bucketName), + Key: aws.String(key), + }) + return err + }) + if err != nil { + return nil, err + } + return report.MakeFromBinary(resp.Body) +} + +// StoreBytes stores a report in S3, expecting the report to be serialized +// already. +func (store *S3Store) StoreBytes(key string, content []byte) error { + return timeRequest("Put", s3RequestDuration, func() error { + _, err := store.s3.PutObject(&s3.PutObjectInput{ + Body: bytes.NewReader(content), + Bucket: aws.String(store.bucketName), + Key: aws.String(key), + }) + return err + }) +} From 5ec422c7a31637a7d7c632bacdee6dd638e999b0 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Mon, 27 Jun 2016 18:31:14 +0100 Subject: [PATCH 3/8] Fetch all reports at once Rather than have getReports be responsible for determining keys, instead call getReportKeys directly and then pass keys to getReports --- app/multitenant/dynamo_collector.go | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/app/multitenant/dynamo_collector.go b/app/multitenant/dynamo_collector.go index bc827c0e8..fc281fb64 100644 --- a/app/multitenant/dynamo_collector.go +++ b/app/multitenant/dynamo_collector.go @@ -225,7 +225,8 @@ func (c *dynamoDBCollector) CreateTables() error { } // getReportKeys gets the s3 keys for reports in this range -func (c *dynamoDBCollector) getReportKeys(rowKey string, start, end time.Time) ([]string, error) { +func (c *dynamoDBCollector) getReportKeys(userid string, row int64, start, end time.Time) ([]string, error) { + rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10)) var resp *dynamodb.QueryOutput err := timeRequest("Query", dynamoRequestDuration, func() error { var err error @@ -272,12 +273,8 @@ func (c *dynamoDBCollector) getReportKeys(rowKey string, start, end time.Time) ( return result, nil } -func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time.Time) ([]report.Report, error) { - rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10)) - missing, err := c.getReportKeys(rowKey, start, end) - if err != nil { - return nil, err - } +func (c *dynamoDBCollector) getReports(reportKeys []string) ([]report.Report, error) { + missing := reportKeys stores := []ReportStore{c.inProcess} if c.memcache != nil { @@ -312,31 +309,37 @@ func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) { start = now.Add(-15 * time.Second) rowStart, rowEnd = start.UnixNano() / time.Hour.Nanoseconds(), now.UnixNano() / time.Hour.Nanoseconds() userid, err = c.userIDer(ctx) - reports []report.Report ) if err != nil { return report.MakeReport(), err } // Queries will only every span 2 rows max. + var reportKeys []string if rowStart != rowEnd { - reports1, err := c.getReports(userid, rowStart, start, now) + reportKeys1, err := c.getReportKeys(userid, rowStart, start, now) if err != nil { return report.MakeReport(), err } - reports2, err := c.getReports(userid, rowEnd, start, now) + reportKeys2, err := c.getReportKeys(userid, rowEnd, start, now) if err != nil { return report.MakeReport(), err } - reports = append(reports1, reports2...) + reportKeys = append(reportKeys, reportKeys1...) + reportKeys = append(reportKeys, reportKeys2...) } else { - if reports, err = c.getReports(userid, rowEnd, start, now); err != nil { + if reportKeys, err = c.getReportKeys(userid, rowEnd, start, now); err != nil { return report.MakeReport(), err } } + reports, err := c.getReports(reportKeys) + if err != nil { + return report.MakeReport(), err + } + return c.merger.Merge(reports), nil } From d984605de15b55c3be8a989600cb52faacd68cbc Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Tue, 28 Jun 2016 11:25:31 +0100 Subject: [PATCH 4/8] Write back to the in-process cache --- app/multitenant/dynamo_collector.go | 13 ++++++++----- app/multitenant/memcache_client.go | 8 ++++---- app/multitenant/s3_client.go | 6 +++--- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/app/multitenant/dynamo_collector.go b/app/multitenant/dynamo_collector.go index fc281fb64..b5860910f 100644 --- a/app/multitenant/dynamo_collector.go +++ b/app/multitenant/dynamo_collector.go @@ -93,7 +93,7 @@ type DynamoDBCollector interface { // ReportStore is a thing that we can get reports from. type ReportStore interface { - FetchReports([]string) ([]report.Report, []string, error) + FetchReports([]string) (map[string]report.Report, []string, error) } type dynamoDBCollector struct { @@ -291,7 +291,10 @@ func (c *dynamoDBCollector) getReports(reportKeys []string) ([]report.Report, er if err != nil { log.Warningf("Error fetching from cache: %v", err) } - reports = append(reports, found...) + for key, report := range found { + c.inProcess.StoreReport(key, report) + reports = append(reports, report) + } if len(missing) == 0 { return reports, nil } @@ -497,13 +500,13 @@ func newInProcessStore(size int, expiration time.Duration) inProcessStore { } // FetchReports retrieves the given reports from the store. -func (c inProcessStore) FetchReports(keys []string) ([]report.Report, []string, error) { - found := []report.Report{} +func (c inProcessStore) FetchReports(keys []string) (map[string]report.Report, []string, error) { + found := map[string]report.Report{} missing := []string{} for _, key := range keys { rpt, err := c.cache.Get(key) if err == nil { - found = append(found, rpt.(report.Report)) + found[key] = rpt.(report.Report) } else { missing = append(missing, key) } diff --git a/app/multitenant/memcache_client.go b/app/multitenant/memcache_client.go index 6aed287eb..2b311c8b5 100644 --- a/app/multitenant/memcache_client.go +++ b/app/multitenant/memcache_client.go @@ -134,7 +134,7 @@ func memcacheStatusCode(err error) string { } // FetchReports gets reports from memcache. -func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string, error) { +func (c *MemcacheClient) FetchReports(keys []string) (map[string]report.Report, []string, error) { var found map[string]*memcache.Item err := timeRequestStatus("Get", memcacheRequestDuration, memcacheStatusCode, func() error { var err error @@ -165,17 +165,17 @@ func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string, ch <- result{key: key} return } - ch <- result{report: rep} + ch <- result{key: key, report: rep} }(key) } - var reports []report.Report + var reports map[string]report.Report for i := 0; i < len(keys)-len(missing); i++ { r := <-ch if r.report == nil { missing = append(missing, r.key) } else { - reports = append(reports, *r.report) + reports[r.key] = *r.report } } diff --git a/app/multitenant/s3_client.go b/app/multitenant/s3_client.go index 9dca8462d..4cb9acd0e 100644 --- a/app/multitenant/s3_client.go +++ b/app/multitenant/s3_client.go @@ -38,7 +38,7 @@ func NewS3Client(config *aws.Config, bucketName string) S3Store { } // FetchReports fetches multiple reports in parallel from S3. -func (store *S3Store) FetchReports(keys []string) ([]report.Report, []string, error) { +func (store *S3Store) FetchReports(keys []string) (map[string]report.Report, []string, error) { type result struct { key string report *report.Report @@ -55,13 +55,13 @@ func (store *S3Store) FetchReports(keys []string) ([]report.Report, []string, er }(key) } - reports := []report.Report{} + reports := map[string]report.Report{} for range keys { r := <-ch if r.err != nil { return nil, []string{}, r.err } - reports = append(reports, *r.report) + reports[r.key] = *r.report } return reports, []string{}, nil } From abec257c5941e5620d1bd958229b82847ad0f726 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Tue, 28 Jun 2016 11:45:14 +0100 Subject: [PATCH 5/8] Just pass in the s3 client --- app/multitenant/dynamo_collector.go | 11 +++++------ prog/app.go | 5 +++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/app/multitenant/dynamo_collector.go b/app/multitenant/dynamo_collector.go index b5860910f..ad7cffdd8 100644 --- a/app/multitenant/dynamo_collector.go +++ b/app/multitenant/dynamo_collector.go @@ -128,9 +128,10 @@ type watchKey struct { // https://github.com/aws/aws-sdk-go/wiki/common-examples func NewDynamoDBCollector( userIDer UserIDer, - dynamoDBConfig, s3Config *aws.Config, - tableName, bucketName, natsHost, memcachedHost string, - memcachedTimeout time.Duration, memcachedService string, + dynamoDBConfig *aws.Config, tableName string, + s3Store *S3Store, + natsHost, + memcachedHost string, memcachedTimeout time.Duration, memcachedService string, ) (DynamoDBCollector, error) { var nc *nats.Conn if natsHost != "" { @@ -141,8 +142,6 @@ func NewDynamoDBCollector( } } - s3Store := NewS3Client(s3Config, bucketName) - var memcacheClient *MemcacheClient if memcachedHost != "" { var err error @@ -161,7 +160,7 @@ func NewDynamoDBCollector( return &dynamoDBCollector{ db: dynamodb.New(session.New(dynamoDBConfig)), - s3: &s3Store, + s3: s3Store, userIDer: userIDer, tableName: tableName, merger: app.NewSmartMerger(), diff --git a/prog/app.go b/prog/app.go index e75ce98b9..2e8cceb7d 100644 --- a/prog/app.go +++ b/prog/app.go @@ -99,10 +99,11 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo if err != nil { return nil, err } - tableName := strings.TrimPrefix(parsed.Path, "/") bucketName := strings.TrimPrefix(s3.Path, "/") + s3Store := multitenant.NewS3Client(s3Config, bucketName) + tableName := strings.TrimPrefix(parsed.Path, "/") dynamoCollector, err := multitenant.NewDynamoDBCollector( - userIDer, dynamoDBConfig, s3Config, tableName, bucketName, natsHostname, + userIDer, dynamoDBConfig, tableName, &s3Store, natsHostname, memcachedHostname, memcachedTimeout, memcachedService, ) if err != nil { From 6520f8f5f37de1329cbb42ab4d27e799cb2ccdc4 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Thu, 30 Jun 2016 09:35:21 +0100 Subject: [PATCH 6/8] Pass in memcache client --- app/multitenant/dynamo_collector.go | 34 +++++++---------------------- prog/app.go | 27 +++++++++++++++++++++-- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/app/multitenant/dynamo_collector.go b/app/multitenant/dynamo_collector.go index ad7cffdd8..82db7ef56 100644 --- a/app/multitenant/dynamo_collector.go +++ b/app/multitenant/dynamo_collector.go @@ -23,14 +23,12 @@ import ( ) const ( - hourField = "hour" - tsField = "ts" - reportField = "report" - reportCacheSize = (15 / 3) * 10 * 5 // (window size * report rate) * number of hosts per user * number of users - reportCacheExpiration = 15 * time.Second - memcacheExpiration = 15 // seconds - memcacheUpdateInterval = 1 * time.Minute - natsTimeout = 10 * time.Second + hourField = "hour" + tsField = "ts" + reportField = "report" + reportCacheSize = (15 / 3) * 10 * 5 // (window size * report rate) * number of hosts per user * number of users + reportCacheExpiration = 15 * time.Second + natsTimeout = 10 * time.Second ) var ( @@ -130,8 +128,8 @@ func NewDynamoDBCollector( userIDer UserIDer, dynamoDBConfig *aws.Config, tableName string, s3Store *S3Store, - natsHost, - memcachedHost string, memcachedTimeout time.Duration, memcachedService string, + natsHost string, + memcacheClient *MemcacheClient, ) (DynamoDBCollector, error) { var nc *nats.Conn if natsHost != "" { @@ -142,22 +140,6 @@ func NewDynamoDBCollector( } } - var memcacheClient *MemcacheClient - if memcachedHost != "" { - var err error - memcacheClient, err = NewMemcacheClient(memcachedHost, memcachedTimeout, memcachedService, memcacheUpdateInterval, memcacheExpiration) - if err != nil { - // TODO(jml): Ideally, we wouldn't abort here, we would instead - // log errors when we try to use the memcache & fail to do so, as - // aborting here introduces ordering dependencies into our - // deployment. - // - // Note: this error only happens when either the memcachedHost or - // any of the SRV records that it points to fail to resolve. - return nil, err - } - } - return &dynamoDBCollector{ db: dynamodb.New(session.New(dynamoDBConfig)), s3: s3Store, diff --git a/prog/app.go b/prog/app.go index 2e8cceb7d..e898eaccf 100644 --- a/prog/app.go +++ b/prog/app.go @@ -27,6 +27,11 @@ import ( "github.com/weaveworks/scope/probe/docker" ) +const ( + memcacheExpiration = 15 // seconds + memcacheUpdateInterval = 1 * time.Minute +) + var ( requestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "scope", @@ -100,11 +105,29 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo return nil, err } bucketName := strings.TrimPrefix(s3.Path, "/") - s3Store := multitenant.NewS3Client(s3Config, bucketName) tableName := strings.TrimPrefix(parsed.Path, "/") + s3Store := multitenant.NewS3Client(s3Config, bucketName) + var memcacheClient *multitenant.MemcacheClient + if memcachedHostname != "" { + memcacheClient, err = multitenant.NewMemcacheClient( + memcachedHostname, memcachedTimeout, memcachedService, + memcacheUpdateInterval, memcacheExpiration, + ) + if err != nil { + // TODO(jml): Ideally, we wouldn't abort here, we would instead + // log errors when we try to use the memcache & fail to do so, as + // aborting here introduces ordering dependencies into our + // deployment. + // + // Note: this error only happens when either the memcachedHost + // or any of the SRV records that it points to fail to + // resolve. + return nil, err + } + } dynamoCollector, err := multitenant.NewDynamoDBCollector( userIDer, dynamoDBConfig, tableName, &s3Store, natsHostname, - memcachedHostname, memcachedTimeout, memcachedService, + memcacheClient, ) if err != nil { return nil, err From baacaa8cc5bea6010e8581fcfdb82670fbb0ec46 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Thu, 30 Jun 2016 16:40:34 +0100 Subject: [PATCH 7/8] Rename dynamoCollector to awsCollector --- .../{dynamo_collector.go => aws_collector.go} | 30 +++++++++---------- prog/app.go | 6 ++-- 2 files changed, 18 insertions(+), 18 deletions(-) rename app/multitenant/{dynamo_collector.go => aws_collector.go} (93%) diff --git a/app/multitenant/dynamo_collector.go b/app/multitenant/aws_collector.go similarity index 93% rename from app/multitenant/dynamo_collector.go rename to app/multitenant/aws_collector.go index 82db7ef56..ad0e36e2d 100644 --- a/app/multitenant/dynamo_collector.go +++ b/app/multitenant/aws_collector.go @@ -83,8 +83,8 @@ func init() { prometheus.MustRegister(natsRequests) } -// DynamoDBCollector is a Collector which can also CreateTables -type DynamoDBCollector interface { +// AWSCollector is a Collector which can also CreateTables +type AWSCollector interface { app.Collector CreateTables() error } @@ -94,7 +94,7 @@ type ReportStore interface { FetchReports([]string) (map[string]report.Report, []string, error) } -type dynamoDBCollector struct { +type awsCollector struct { userIDer UserIDer db *dynamodb.DynamoDB s3 *S3Store @@ -122,15 +122,15 @@ type watchKey struct { c chan struct{} } -// NewDynamoDBCollector the reaper of souls +// NewAWSCollector the elastic reaper of souls // https://github.com/aws/aws-sdk-go/wiki/common-examples -func NewDynamoDBCollector( +func NewAWSCollector( userIDer UserIDer, dynamoDBConfig *aws.Config, tableName string, s3Store *S3Store, natsHost string, memcacheClient *MemcacheClient, -) (DynamoDBCollector, error) { +) (AWSCollector, error) { var nc *nats.Conn if natsHost != "" { var err error @@ -140,7 +140,7 @@ func NewDynamoDBCollector( } } - return &dynamoDBCollector{ + return &awsCollector{ db: dynamodb.New(session.New(dynamoDBConfig)), s3: s3Store, userIDer: userIDer, @@ -153,8 +153,8 @@ func NewDynamoDBCollector( }, nil } -// CreateDynamoDBTables creates the required tables in dynamodb -func (c *dynamoDBCollector) CreateTables() error { +// CreateTables creates the required tables in dynamodb +func (c *awsCollector) CreateTables() error { // see if tableName exists resp, err := c.db.ListTables(&dynamodb.ListTablesInput{ Limit: aws.Int64(10), @@ -206,7 +206,7 @@ func (c *dynamoDBCollector) CreateTables() error { } // getReportKeys gets the s3 keys for reports in this range -func (c *dynamoDBCollector) getReportKeys(userid string, row int64, start, end time.Time) ([]string, error) { +func (c *awsCollector) getReportKeys(userid string, row int64, start, end time.Time) ([]string, error) { rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10)) var resp *dynamodb.QueryOutput err := timeRequest("Query", dynamoRequestDuration, func() error { @@ -254,7 +254,7 @@ func (c *dynamoDBCollector) getReportKeys(userid string, row int64, start, end t return result, nil } -func (c *dynamoDBCollector) getReports(reportKeys []string) ([]report.Report, error) { +func (c *awsCollector) getReports(reportKeys []string) ([]report.Report, error) { missing := reportKeys stores := []ReportStore{c.inProcess} @@ -287,7 +287,7 @@ func (c *dynamoDBCollector) getReports(reportKeys []string) ([]report.Report, er return reports, nil } -func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) { +func (c *awsCollector) Report(ctx context.Context) (report.Report, error) { var ( now = time.Now() start = now.Add(-15 * time.Second) @@ -327,7 +327,7 @@ func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) { return c.merger.Merge(reports), nil } -func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error { +func (c *awsCollector) Add(ctx context.Context, rep report.Report) error { userid, err := c.userIDer(ctx) if err != nil { return err @@ -406,7 +406,7 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error { return nil } -func (c *dynamoDBCollector) WaitOn(ctx context.Context, waiter chan struct{}) { +func (c *awsCollector) WaitOn(ctx context.Context, waiter chan struct{}) { userid, err := c.userIDer(ctx) if err != nil { log.Errorf("Error getting user id in WaitOn: %v", err) @@ -447,7 +447,7 @@ func (c *dynamoDBCollector) WaitOn(ctx context.Context, waiter chan struct{}) { }() } -func (c *dynamoDBCollector) UnWait(ctx context.Context, waiter chan struct{}) { +func (c *awsCollector) UnWait(ctx context.Context, waiter chan struct{}) { userid, err := c.userIDer(ctx) if err != nil { log.Errorf("Error getting user id in WaitOn: %v", err) diff --git a/prog/app.go b/prog/app.go index e898eaccf..1ca9cfb55 100644 --- a/prog/app.go +++ b/prog/app.go @@ -125,7 +125,7 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo return nil, err } } - dynamoCollector, err := multitenant.NewDynamoDBCollector( + awsCollector, err := multitenant.NewAWSCollector( userIDer, dynamoDBConfig, tableName, &s3Store, natsHostname, memcacheClient, ) @@ -133,11 +133,11 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo return nil, err } if createTables { - if err := dynamoCollector.CreateTables(); err != nil { + if err := awsCollector.CreateTables(); err != nil { return nil, err } } - return dynamoCollector, nil + return awsCollector, nil } return nil, fmt.Errorf("Invalid collector '%s'", collectorURL) From 9e0f0c51b9722852dfa79f3b30d361a1e15067c8 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Thu, 30 Jun 2016 17:01:58 +0100 Subject: [PATCH 8/8] Configuration type for AWS collector --- app/multitenant/aws_collector.go | 32 ++++++++++++++++++-------------- prog/app.go | 10 ++++++++-- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index ad0e36e2d..477e2261b 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -94,6 +94,16 @@ type ReportStore interface { FetchReports([]string) (map[string]report.Report, []string, error) } +// AWSCollectorConfig has everything we need to make an AWS collector. +type AWSCollectorConfig struct { + UserIDer UserIDer + DynamoDBConfig *aws.Config + DynamoTable string + S3Store *S3Store + NatsHost string + MemcacheClient *MemcacheClient +} + type awsCollector struct { userIDer UserIDer db *dynamodb.DynamoDB @@ -124,30 +134,24 @@ type watchKey struct { // NewAWSCollector the elastic reaper of souls // https://github.com/aws/aws-sdk-go/wiki/common-examples -func NewAWSCollector( - userIDer UserIDer, - dynamoDBConfig *aws.Config, tableName string, - s3Store *S3Store, - natsHost string, - memcacheClient *MemcacheClient, -) (AWSCollector, error) { +func NewAWSCollector(config AWSCollectorConfig) (AWSCollector, error) { var nc *nats.Conn - if natsHost != "" { + if config.NatsHost != "" { var err error - nc, err = nats.Connect(natsHost) + nc, err = nats.Connect(config.NatsHost) if err != nil { return nil, err } } return &awsCollector{ - db: dynamodb.New(session.New(dynamoDBConfig)), - s3: s3Store, - userIDer: userIDer, - tableName: tableName, + db: dynamodb.New(session.New(config.DynamoDBConfig)), + s3: config.S3Store, + userIDer: config.UserIDer, + tableName: config.DynamoTable, merger: app.NewSmartMerger(), inProcess: newInProcessStore(reportCacheSize, reportCacheExpiration), - memcache: memcacheClient, + memcache: config.MemcacheClient, nats: nc, waiters: map[watchKey]*nats.Subscription{}, }, nil diff --git a/prog/app.go b/prog/app.go index 1ca9cfb55..1fd1e008b 100644 --- a/prog/app.go +++ b/prog/app.go @@ -126,8 +126,14 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo } } awsCollector, err := multitenant.NewAWSCollector( - userIDer, dynamoDBConfig, tableName, &s3Store, natsHostname, - memcacheClient, + multitenant.AWSCollectorConfig{ + UserIDer: userIDer, + DynamoDBConfig: dynamoDBConfig, + DynamoTable: tableName, + S3Store: &s3Store, + NatsHost: natsHostname, + MemcacheClient: memcacheClient, + }, ) if err != nil { return nil, err