mirror of
https://github.com/weaveworks/scope.git
synced 2026-05-04 08:19:17 +00:00
refactor some timing helpers into a common lib
This commit is contained in:
@@ -19,6 +19,7 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/weaveworks/scope/app"
|
||||
"github.com/weaveworks/scope/common/instrument"
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
@@ -213,7 +214,7 @@ func (c *awsCollector) CreateTables() error {
|
||||
func (c *awsCollector) getReportKeys(userid string, row int64, start, end time.Time) ([]string, error) {
|
||||
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
|
||||
var resp *dynamodb.QueryOutput
|
||||
err := timeRequest("Query", dynamoRequestDuration, func() error {
|
||||
err := instrument.TimeRequest("Query", dynamoRequestDuration, func() error {
|
||||
var err error
|
||||
resp, err = c.db.Query(&dynamodb.QueryInput{
|
||||
TableName: aws.String(c.tableName),
|
||||
@@ -372,7 +373,7 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report) error {
|
||||
Add(float64(len(s3Key)))
|
||||
|
||||
var resp *dynamodb.PutItemOutput
|
||||
err = timeRequest("PutItem", dynamoRequestDuration, func() error {
|
||||
err = instrument.TimeRequest("PutItem", dynamoRequestDuration, func() error {
|
||||
var err error
|
||||
resp, err = c.db.PutItem(&dynamodb.PutItemInput{
|
||||
TableName: aws.String(c.tableName),
|
||||
@@ -401,7 +402,7 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report) error {
|
||||
|
||||
if rep.Shortcut && c.nats != nil {
|
||||
err := c.nats.Publish(userid, []byte(s3Key))
|
||||
natsRequests.WithLabelValues("Publish", errorCode(err)).Add(1)
|
||||
natsRequests.WithLabelValues("Publish", instrument.ErrorCode(err)).Add(1)
|
||||
if err != nil {
|
||||
log.Errorf("Error sending shortcut report: %v", err)
|
||||
}
|
||||
@@ -422,7 +423,7 @@ func (c *awsCollector) WaitOn(ctx context.Context, waiter chan struct{}) {
|
||||
}
|
||||
|
||||
sub, err := c.nats.SubscribeSync(userid)
|
||||
natsRequests.WithLabelValues("SubscribeSync", errorCode(err)).Add(1)
|
||||
natsRequests.WithLabelValues("SubscribeSync", instrument.ErrorCode(err)).Add(1)
|
||||
if err != nil {
|
||||
log.Errorf("Error subscribing for shortcuts: %v", err)
|
||||
return
|
||||
@@ -438,7 +439,7 @@ func (c *awsCollector) WaitOn(ctx context.Context, waiter chan struct{}) {
|
||||
if err == nats.ErrTimeout {
|
||||
continue
|
||||
}
|
||||
natsRequests.WithLabelValues("NextMsg", errorCode(err)).Add(1)
|
||||
natsRequests.WithLabelValues("NextMsg", instrument.ErrorCode(err)).Add(1)
|
||||
if err != nil {
|
||||
log.Debugf("NextMsg error: %v", err)
|
||||
return
|
||||
@@ -469,7 +470,7 @@ func (c *awsCollector) UnWait(ctx context.Context, waiter chan struct{}) {
|
||||
c.waitersLock.Unlock()
|
||||
|
||||
err = sub.Unsubscribe()
|
||||
natsRequests.WithLabelValues("Unsubscribe", errorCode(err)).Add(1)
|
||||
natsRequests.WithLabelValues("Unsubscribe", instrument.ErrorCode(err)).Add(1)
|
||||
if err != nil {
|
||||
log.Errorf("Error on unsubscribe: %v", err)
|
||||
}
|
||||
|
||||
@@ -1,40 +0,0 @@
|
||||
package multitenant
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func errorCode(err error) string {
|
||||
if err == nil {
|
||||
return "200"
|
||||
}
|
||||
return "500"
|
||||
}
|
||||
|
||||
// timeRequest runs 'f' and records how long it took in the given Prometheus
|
||||
// metric. If 'f' returns successfully, record a "200". Otherwise, record
|
||||
// "500".
|
||||
//
|
||||
// If you want more complicated logic for translating errors into statuses,
|
||||
// use 'timeRequestStatus'.
|
||||
func timeRequest(method string, metric *prometheus.SummaryVec, f func() error) error {
|
||||
return timeRequestStatus(method, metric, errorCode, f)
|
||||
}
|
||||
|
||||
// timeRequestStatus runs 'f' and records how long it took in the given
|
||||
// Prometheus metric.
|
||||
//
|
||||
// toStatusCode is a function that translates errors returned by 'f' into
|
||||
// HTTP-like status codes.
|
||||
func timeRequestStatus(method string, metric *prometheus.SummaryVec, toStatusCode func(error) string, f func() error) error {
|
||||
if toStatusCode == nil {
|
||||
toStatusCode = errorCode
|
||||
}
|
||||
startTime := time.Now()
|
||||
err := f()
|
||||
duration := time.Now().Sub(startTime)
|
||||
metric.WithLabelValues(method, toStatusCode(err)).Observe(duration.Seconds())
|
||||
return err
|
||||
}
|
||||
@@ -11,6 +11,8 @@ import (
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/bradfitz/gomemcache/memcache"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/weaveworks/scope/common/instrument"
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
@@ -136,7 +138,7 @@ func memcacheStatusCode(err error) string {
|
||||
// FetchReports gets reports from memcache.
|
||||
func (c *MemcacheClient) FetchReports(keys []string) (map[string]report.Report, []string, error) {
|
||||
var found map[string]*memcache.Item
|
||||
err := timeRequestStatus("Get", memcacheRequestDuration, memcacheStatusCode, func() error {
|
||||
err := instrument.TimeRequestStatus("Get", memcacheRequestDuration, memcacheStatusCode, func() error {
|
||||
var err error
|
||||
found, err = c.client.GetMulti(keys)
|
||||
return err
|
||||
@@ -186,7 +188,7 @@ func (c *MemcacheClient) FetchReports(keys []string) (map[string]report.Report,
|
||||
|
||||
// StoreBytes stores a report, expecting the report to be serialized already.
|
||||
func (c *MemcacheClient) StoreBytes(key string, content []byte) error {
|
||||
return timeRequestStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error {
|
||||
return instrument.TimeRequestStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error {
|
||||
item := memcache.Item{Key: key, Value: content, Expiration: c.expiration}
|
||||
return c.client.Set(&item)
|
||||
})
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/weaveworks/scope/common/instrument"
|
||||
"github.com/weaveworks/scope/report"
|
||||
)
|
||||
|
||||
@@ -68,7 +69,7 @@ func (store *S3Store) FetchReports(keys []string) (map[string]report.Report, []s
|
||||
|
||||
func (store *S3Store) fetchReport(key string) (*report.Report, error) {
|
||||
var resp *s3.GetObjectOutput
|
||||
err := timeRequest("Get", s3RequestDuration, func() error {
|
||||
err := instrument.TimeRequest("Get", s3RequestDuration, func() error {
|
||||
var err error
|
||||
resp, err = store.s3.GetObject(&s3.GetObjectInput{
|
||||
Bucket: aws.String(store.bucketName),
|
||||
@@ -85,7 +86,7 @@ func (store *S3Store) fetchReport(key string) (*report.Report, error) {
|
||||
// StoreBytes stores a report in S3, expecting the report to be serialized
|
||||
// already.
|
||||
func (store *S3Store) StoreBytes(key string, content []byte) error {
|
||||
return timeRequest("Put", s3RequestDuration, func() error {
|
||||
return instrument.TimeRequest("Put", s3RequestDuration, func() error {
|
||||
_, err := store.s3.PutObject(&s3.PutObjectInput{
|
||||
Body: bytes.NewReader(content),
|
||||
Bucket: aws.String(store.bucketName),
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/weaveworks/scope/app"
|
||||
"github.com/weaveworks/scope/common/instrument"
|
||||
"github.com/weaveworks/scope/common/xfer"
|
||||
)
|
||||
|
||||
@@ -94,7 +95,7 @@ func (cr *sqsControlRouter) getOrCreateQueue(name string) (*string, error) {
|
||||
// CreateQueue creates a queue or if it already exists, returns url of said queue
|
||||
var createQueueRes *sqs.CreateQueueOutput
|
||||
var err error
|
||||
err = timeRequestStatus("CreateQueue", sqsRequestDuration, nil, func() error {
|
||||
err = instrument.TimeRequestStatus("CreateQueue", sqsRequestDuration, nil, func() error {
|
||||
createQueueRes, err = cr.service.CreateQueue(&sqs.CreateQueueInput{
|
||||
QueueName: aws.String(name),
|
||||
})
|
||||
@@ -127,7 +128,7 @@ func (cr *sqsControlRouter) loop() {
|
||||
for {
|
||||
var res *sqs.ReceiveMessageOutput
|
||||
var err error
|
||||
err = timeRequestStatus("ReceiveMessage", sqsRequestDuration, nil, func() error {
|
||||
err = instrument.TimeRequestStatus("ReceiveMessage", sqsRequestDuration, nil, func() error {
|
||||
res, err = cr.service.ReceiveMessage(&sqs.ReceiveMessageInput{
|
||||
QueueUrl: responseQueueURL,
|
||||
WaitTimeSeconds: longPollTime,
|
||||
@@ -157,7 +158,7 @@ func (cr *sqsControlRouter) deleteMessages(queueURL *string, messages []*sqs.Mes
|
||||
Id: message.MessageId,
|
||||
})
|
||||
}
|
||||
return timeRequestStatus("DeleteMessageBatch", sqsRequestDuration, nil, func() error {
|
||||
return instrument.TimeRequestStatus("DeleteMessageBatch", sqsRequestDuration, nil, func() error {
|
||||
_, err := cr.service.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
|
||||
QueueUrl: queueURL,
|
||||
Entries: entries,
|
||||
@@ -193,7 +194,7 @@ func (cr *sqsControlRouter) sendMessage(queueURL *string, message interface{}) e
|
||||
}
|
||||
log.Infof("sendMessage to %s: %s", *queueURL, buf.String())
|
||||
|
||||
return timeRequestStatus("SendMessage", sqsRequestDuration, nil, func() error {
|
||||
return instrument.TimeRequestStatus("SendMessage", sqsRequestDuration, nil, func() error {
|
||||
_, err := cr.service.SendMessage(&sqs.SendMessageInput{
|
||||
QueueUrl: queueURL,
|
||||
MessageBody: aws.String(buf.String()),
|
||||
@@ -216,7 +217,7 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer
|
||||
}
|
||||
|
||||
var probeQueueURL *sqs.GetQueueUrlOutput
|
||||
err = timeRequestStatus("GetQueueUrl", sqsRequestDuration, nil, func() error {
|
||||
err = instrument.TimeRequestStatus("GetQueueUrl", sqsRequestDuration, nil, func() error {
|
||||
probeQueueName := fmt.Sprintf("%sprobe-%s-%s", cr.prefix, userID, probeID)
|
||||
probeQueueURL, err = cr.service.GetQueueUrl(&sqs.GetQueueUrlInput{
|
||||
QueueName: aws.String(probeQueueName),
|
||||
@@ -240,7 +241,7 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer
|
||||
}()
|
||||
|
||||
// Next, send the request to that queue
|
||||
if err := timeRequestStatus("SendMessage", sqsRequestDuration, nil, func() error {
|
||||
if err := instrument.TimeRequestStatus("SendMessage", sqsRequestDuration, nil, func() error {
|
||||
return cr.sendMessage(probeQueueURL.QueueUrl, sqsRequestMessage{
|
||||
ID: id,
|
||||
Request: req,
|
||||
@@ -323,7 +324,7 @@ func (pw *probeWorker) loop() {
|
||||
|
||||
var res *sqs.ReceiveMessageOutput
|
||||
var err error
|
||||
err = timeRequestStatus("ReceiveMessage", sqsRequestDuration, nil, func() error {
|
||||
err = instrument.TimeRequestStatus("ReceiveMessage", sqsRequestDuration, nil, func() error {
|
||||
res, err = pw.router.service.ReceiveMessage(&sqs.ReceiveMessageInput{
|
||||
QueueUrl: pw.requestQueueURL,
|
||||
WaitTimeSeconds: longPollTime,
|
||||
|
||||
Reference in New Issue
Block a user