From 3173f6ad75d562e3a27a5196a6a069cd03da9932 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 12 Jul 2016 11:15:57 +0100 Subject: [PATCH] Use histograms over summaries --- app/multitenant/aws_collector.go | 7 ++++--- app/multitenant/memcache_client.go | 7 ++++--- app/multitenant/s3_client.go | 7 ++++--- app/multitenant/sqs_control_router.go | 17 +++++++++-------- common/instrument/instrument.go | 26 ++++++++++++++++++++++++++ common/middleware/instrument.go | 2 +- prog/app.go | 3 ++- 7 files changed, 50 insertions(+), 19 deletions(-) diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index 9bef429c8..d8e8c1721 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -31,10 +31,11 @@ const ( ) var ( - dynamoRequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + dynamoRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "scope", Name: "dynamo_request_duration_seconds", Help: "Time in seconds spent doing DynamoDB requests.", + Buckets: prometheus.DefBuckets, }, []string{"method", "status_code"}) dynamoConsumedCapacity = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "scope", @@ -217,7 +218,7 @@ func (c *awsCollector) CreateTables() error { func (c *awsCollector) getReportKeys(userid string, row int64, start, end time.Time) ([]string, error) { rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10)) var resp *dynamodb.QueryOutput - err := instrument.TimeRequest("Query", dynamoRequestDuration, func() error { + err := instrument.TimeRequestHistogram("Query", dynamoRequestDuration, func() error { var err error resp, err = c.db.Query(&dynamodb.QueryInput{ TableName: aws.String(c.tableName), @@ -377,7 +378,7 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report) error { Add(float64(len(s3Key))) var resp *dynamodb.PutItemOutput - err = instrument.TimeRequest("PutItem", dynamoRequestDuration, func() error { + err = instrument.TimeRequestHistogram("PutItem", dynamoRequestDuration, func() error { var err error resp, err = c.db.PutItem(&dynamodb.PutItemInput{ TableName: aws.String(c.tableName), diff --git a/app/multitenant/memcache_client.go b/app/multitenant/memcache_client.go index 2bbcf271b..fe4ec8f5d 100644 --- a/app/multitenant/memcache_client.go +++ b/app/multitenant/memcache_client.go @@ -29,10 +29,11 @@ var ( Help: "Total count of reports found in memcache that were not found in our in-memory cache.", }) - memcacheRequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + memcacheRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "scope", Name: "memcache_request_duration_seconds", Help: "Total time spent in seconds doing memcache requests.", + Buckets: prometheus.DefBuckets, }, []string{"method", "status_code"}) ) @@ -148,7 +149,7 @@ func memcacheStatusCode(err error) string { func (c *MemcacheClient) FetchReports(keys []string) (map[string]report.Report, []string, error) { memcacheRequests.Add(float64(len(keys))) var found map[string]*memcache.Item - err := instrument.TimeRequestStatus("Get", memcacheRequestDuration, memcacheStatusCode, func() error { + err := instrument.TimeRequestHistogramStatus("Get", memcacheRequestDuration, memcacheStatusCode, func() error { var err error found, err = c.client.GetMulti(keys) return err @@ -202,7 +203,7 @@ func (c *MemcacheClient) FetchReports(keys []string) (map[string]report.Report, // StoreBytes stores a report, expecting the report to be serialized already. func (c *MemcacheClient) StoreBytes(key string, content []byte) error { - return instrument.TimeRequestStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error { + return instrument.TimeRequestHistogramStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error { item := memcache.Item{Key: key, Value: content, Expiration: c.expiration} return c.client.Set(&item) }) diff --git a/app/multitenant/s3_client.go b/app/multitenant/s3_client.go index e812ed016..94deef5f3 100644 --- a/app/multitenant/s3_client.go +++ b/app/multitenant/s3_client.go @@ -13,10 +13,11 @@ import ( ) var ( - s3RequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + s3RequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "scope", Name: "s3_request_duration_seconds", Help: "Time in seconds spent doing S3 requests.", + Buckets: prometheus.DefBuckets, }, []string{"method", "status_code"}) ) @@ -69,7 +70,7 @@ func (store *S3Store) FetchReports(keys []string) (map[string]report.Report, []s func (store *S3Store) fetchReport(key string) (*report.Report, error) { var resp *s3.GetObjectOutput - err := instrument.TimeRequest("Get", s3RequestDuration, func() error { + err := instrument.TimeRequestHistogram("Get", s3RequestDuration, func() error { var err error resp, err = store.s3.GetObject(&s3.GetObjectInput{ Bucket: aws.String(store.bucketName), @@ -86,7 +87,7 @@ func (store *S3Store) fetchReport(key string) (*report.Report, error) { // StoreBytes stores a report in S3, expecting the report to be serialized // already. func (store *S3Store) StoreBytes(key string, content []byte) error { - return instrument.TimeRequest("Put", s3RequestDuration, func() error { + return instrument.TimeRequestHistogram("Put", s3RequestDuration, func() error { _, err := store.s3.PutObject(&s3.PutObjectInput{ Body: bytes.NewReader(content), Bucket: aws.String(store.bucketName), diff --git a/app/multitenant/sqs_control_router.go b/app/multitenant/sqs_control_router.go index 722b2ce98..3d0e0d95d 100644 --- a/app/multitenant/sqs_control_router.go +++ b/app/multitenant/sqs_control_router.go @@ -23,10 +23,11 @@ import ( var ( longPollTime = aws.Int64(10) rpcTimeout = time.Minute - sqsRequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + sqsRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "scope", Name: "sqs_request_duration_seconds", Help: "Time in seconds spent doing SQS requests.", + Buckets: prometheus.DefBuckets, }, []string{"method", "status_code"}) ) @@ -95,7 +96,7 @@ func (cr *sqsControlRouter) getOrCreateQueue(name string) (*string, error) { // CreateQueue creates a queue or if it already exists, returns url of said queue var createQueueRes *sqs.CreateQueueOutput var err error - err = instrument.TimeRequestStatus("CreateQueue", sqsRequestDuration, nil, func() error { + err = instrument.TimeRequestHistogram("CreateQueue", sqsRequestDuration, func() error { createQueueRes, err = cr.service.CreateQueue(&sqs.CreateQueueInput{ QueueName: aws.String(name), }) @@ -128,7 +129,7 @@ func (cr *sqsControlRouter) loop() { for { var res *sqs.ReceiveMessageOutput var err error - err = instrument.TimeRequestStatus("ReceiveMessage", sqsRequestDuration, nil, func() error { + err = instrument.TimeRequestHistogram("ReceiveMessage", sqsRequestDuration, func() error { res, err = cr.service.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: responseQueueURL, WaitTimeSeconds: longPollTime, @@ -158,7 +159,7 @@ func (cr *sqsControlRouter) deleteMessages(queueURL *string, messages []*sqs.Mes Id: message.MessageId, }) } - return instrument.TimeRequestStatus("DeleteMessageBatch", sqsRequestDuration, nil, func() error { + return instrument.TimeRequestHistogram("DeleteMessageBatch", sqsRequestDuration, func() error { _, err := cr.service.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{ QueueUrl: queueURL, Entries: entries, @@ -194,7 +195,7 @@ func (cr *sqsControlRouter) sendMessage(queueURL *string, message interface{}) e } log.Infof("sendMessage to %s: %s", *queueURL, buf.String()) - return instrument.TimeRequestStatus("SendMessage", sqsRequestDuration, nil, func() error { + return instrument.TimeRequestHistogram("SendMessage", sqsRequestDuration, func() error { _, err := cr.service.SendMessage(&sqs.SendMessageInput{ QueueUrl: queueURL, MessageBody: aws.String(buf.String()), @@ -217,7 +218,7 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer } var probeQueueURL *sqs.GetQueueUrlOutput - err = instrument.TimeRequestStatus("GetQueueUrl", sqsRequestDuration, nil, func() error { + err = instrument.TimeRequestHistogram("GetQueueUrl", sqsRequestDuration, func() error { probeQueueName := fmt.Sprintf("%sprobe-%s-%s", cr.prefix, userID, probeID) probeQueueURL, err = cr.service.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String(probeQueueName), @@ -241,7 +242,7 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer }() // Next, send the request to that queue - if err := instrument.TimeRequestStatus("SendMessage", sqsRequestDuration, nil, func() error { + if err := instrument.TimeRequestHistogram("SendMessage", sqsRequestDuration, func() error { return cr.sendMessage(probeQueueURL.QueueUrl, sqsRequestMessage{ ID: id, Request: req, @@ -324,7 +325,7 @@ func (pw *probeWorker) loop() { var res *sqs.ReceiveMessageOutput var err error - err = instrument.TimeRequestStatus("ReceiveMessage", sqsRequestDuration, nil, func() error { + err = instrument.TimeRequestHistogram("ReceiveMessage", sqsRequestDuration, func() error { res, err = pw.router.service.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: pw.requestQueueURL, WaitTimeSeconds: longPollTime, diff --git a/common/instrument/instrument.go b/common/instrument/instrument.go index ce56f8f70..0c6151d21 100644 --- a/common/instrument/instrument.go +++ b/common/instrument/instrument.go @@ -39,3 +39,29 @@ func TimeRequestStatus(method string, metric *prometheus.SummaryVec, toStatusCod metric.WithLabelValues(method, toStatusCode(err)).Observe(duration.Seconds()) return err } + +// TimeRequestHistogram runs 'f' and records how long it took in the given Prometheus +// histogram metric. If 'f' returns successfully, record a "200". Otherwise, record +// "500". +// +// If you want more complicated logic for translating errors into statuses, +// use 'TimeRequestStatus'. +func TimeRequestHistogram(method string, metric *prometheus.HistogramVec, f func() error) error { + return TimeRequestHistogramStatus(method, metric, ErrorCode, f) +} + +// TimeRequestHistogramStatus runs 'f' and records how long it took in the given +// Prometheus histogram metric. +// +// toStatusCode is a function that translates errors returned by 'f' into +// HTTP-like status codes. +func TimeRequestHistogramStatus(method string, metric *prometheus.HistogramVec, toStatusCode func(error) string, f func() error) error { + if toStatusCode == nil { + toStatusCode = ErrorCode + } + startTime := time.Now() + err := f() + duration := time.Now().Sub(startTime) + metric.WithLabelValues(method, toStatusCode(err)).Observe(duration.Seconds()) + return err +} diff --git a/common/middleware/instrument.go b/common/middleware/instrument.go index a2c51ad76..0a93d1ee2 100644 --- a/common/middleware/instrument.go +++ b/common/middleware/instrument.go @@ -16,7 +16,7 @@ type Instrument struct { RouteMatcher interface { Match(*http.Request, *mux.RouteMatch) bool } - Duration *prometheus.SummaryVec + Duration *prometheus.HistogramVec } func isWSHandshakeRequest(req *http.Request) bool { diff --git a/prog/app.go b/prog/app.go index 124e0c095..0642676e8 100644 --- a/prog/app.go +++ b/prog/app.go @@ -32,10 +32,11 @@ const ( ) var ( - requestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "scope", Name: "request_duration_seconds", Help: "Time in seconds spent serving HTTP requests.", + Buckets: prometheus.DefBuckets, }, []string{"method", "route", "status_code", "ws"}) )