Move s3 logic to separate file

This commit is contained in:
Jonathan Lange
2016-06-24 15:16:35 +01:00
parent e2bda8f670
commit 87da22767e
2 changed files with 120 additions and 87 deletions

View File

@@ -13,7 +13,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/bluele/gcache"
"github.com/nats-io/nats"
"github.com/prometheus/client_golang/prometheus"
@@ -69,12 +68,6 @@ var (
Help: "Total compressed size of reports received in bytes.",
})
s3RequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "s3_request_duration_seconds",
Help: "Time in seconds spent doing S3 requests.",
}, []string{"method", "status_code"})
natsRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "scope",
Name: "nats_requests_total",
@@ -89,7 +82,6 @@ func init() {
prometheus.MustRegister(inProcessCacheRequests)
prometheus.MustRegister(inProcessCacheHits)
prometheus.MustRegister(reportSize)
prometheus.MustRegister(s3RequestDuration)
prometheus.MustRegister(natsRequests)
}
@@ -105,14 +97,13 @@ type ReportStore interface {
}
type dynamoDBCollector struct {
userIDer UserIDer
db *dynamodb.DynamoDB
s3 *s3.S3
tableName string
bucketName string
merger app.Merger
inProcess inProcessStore
memcache *MemcacheClient
userIDer UserIDer
db *dynamodb.DynamoDB
s3 *S3Store
tableName string
merger app.Merger
inProcess inProcessStore
memcache *MemcacheClient
nats *nats.Conn
waitersLock sync.Mutex
@@ -150,6 +141,8 @@ func NewDynamoDBCollector(
}
}
s3Store := NewS3Client(s3Config, bucketName)
var memcacheClient *MemcacheClient
if memcachedHost != "" {
var err error
@@ -167,16 +160,15 @@ func NewDynamoDBCollector(
}
return &dynamoDBCollector{
db: dynamodb.New(session.New(dynamoDBConfig)),
s3: s3.New(session.New(s3Config)),
userIDer: userIDer,
tableName: tableName,
bucketName: bucketName,
merger: app.NewSmartMerger(),
inProcess: newInProcessStore(reportCacheSize, reportCacheExpiration),
memcache: memcacheClient,
nats: nc,
waiters: map[watchKey]*nats.Subscription{},
db: dynamodb.New(session.New(dynamoDBConfig)),
s3: &s3Store,
userIDer: userIDer,
tableName: tableName,
merger: app.NewSmartMerger(),
inProcess: newInProcessStore(reportCacheSize, reportCacheExpiration),
memcache: memcacheClient,
nats: nc,
waiters: map[watchKey]*nats.Subscription{},
}, nil
}
@@ -280,53 +272,6 @@ func (c *dynamoDBCollector) getReportKeys(rowKey string, start, end time.Time) (
return result, nil
}
// Fetch multiple reports in parallel from S3.
func (c *dynamoDBCollector) getNonCached(reportKeys []string) ([]report.Report, error) {
type result struct {
key string
report *report.Report
err error
}
ch := make(chan result, len(reportKeys))
for _, reportKey := range reportKeys {
go func(reportKey string) {
r := result{key: reportKey}
r.report, r.err = c.getNonCachedReport(reportKey)
ch <- r
}(reportKey)
}
reports := []report.Report{}
for range reportKeys {
r := <-ch
if r.err != nil {
return nil, r.err
}
reports = append(reports, *r.report)
c.inProcess.StoreReport(r.key, *r.report)
}
return reports, nil
}
// Fetch a single report from S3.
func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (*report.Report, error) {
var resp *s3.GetObjectOutput
err := timeRequest("Get", s3RequestDuration, func() error {
var err error
resp, err = c.s3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(c.bucketName),
Key: aws.String(reportKey),
})
return err
})
if err != nil {
return nil, err
}
return report.MakeFromBinary(resp.Body)
}
func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time.Time) ([]report.Report, error) {
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
missing, err := c.getReportKeys(rowKey, start, end)
@@ -338,6 +283,8 @@ func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time
if c.memcache != nil {
stores = append(stores, c.memcache)
}
stores = append(stores, c.s3)
var reports []report.Report
for _, store := range stores {
if store == nil {
@@ -353,12 +300,10 @@ func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time
}
}
fetchedReports, err := c.getNonCached(missing)
if err != nil {
return nil, err
if len(missing) > 0 {
return nil, fmt.Errorf("Error fetching from s3, still have missing reports: %v", missing)
}
return append(reports, fetchedReports...), nil
return reports, nil
}
func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) {
@@ -415,15 +360,7 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {
return err
}
s3Key := fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey)
err = timeRequest("Put", s3RequestDuration, func() error {
var err error
_, err = c.s3.PutObject(&s3.PutObjectInput{
Body: bytes.NewReader(buf.Bytes()),
Bucket: aws.String(c.bucketName),
Key: aws.String(s3Key),
})
return err
})
err = c.s3.StoreBytes(s3Key, buf.Bytes())
if err != nil {
return err
}

View File

@@ -0,0 +1,96 @@
package multitenant
import (
"bytes"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/scope/report"
)
var (
s3RequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "s3_request_duration_seconds",
Help: "Time in seconds spent doing S3 requests.",
}, []string{"method", "status_code"})
)
// S3Store is an S3 client that stores and retrieves Reports.
type S3Store struct {
s3 *s3.S3
bucketName string
}
func init() {
prometheus.MustRegister(s3RequestDuration)
}
// NewS3Client creates a new S3 client.
func NewS3Client(config *aws.Config, bucketName string) S3Store {
return S3Store{
s3: s3.New(session.New(config)),
bucketName: bucketName,
}
}
// FetchReports fetches multiple reports in parallel from S3.
func (store *S3Store) FetchReports(keys []string) ([]report.Report, []string, error) {
type result struct {
key string
report *report.Report
err error
}
ch := make(chan result, len(keys))
for _, key := range keys {
go func(key string) {
r := result{key: key}
r.report, r.err = store.fetchReport(key)
ch <- r
}(key)
}
reports := []report.Report{}
for range keys {
r := <-ch
if r.err != nil {
return nil, []string{}, r.err
}
reports = append(reports, *r.report)
}
return reports, []string{}, nil
}
func (store *S3Store) fetchReport(key string) (*report.Report, error) {
var resp *s3.GetObjectOutput
err := timeRequest("Get", s3RequestDuration, func() error {
var err error
resp, err = store.s3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(store.bucketName),
Key: aws.String(key),
})
return err
})
if err != nil {
return nil, err
}
return report.MakeFromBinary(resp.Body)
}
// StoreBytes stores a report in S3, expecting the report to be serialized
// already.
func (store *S3Store) StoreBytes(key string, content []byte) error {
return timeRequest("Put", s3RequestDuration, func() error {
_, err := store.s3.PutObject(&s3.PutObjectInput{
Body: bytes.NewReader(content),
Bucket: aws.String(store.bucketName),
Key: aws.String(key),
})
return err
})
}