Instrument SQS calls

This commit is contained in:
Tom Wilkie
2016-05-23 16:48:31 +01:00
parent 5a9aebbcb4
commit 861605a5ee

View File

@@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"github.com/weaveworks/scope/app"
@@ -19,10 +20,19 @@ import (
)
var (
longPollTime = aws.Int64(10)
rpcTimeout = time.Minute
longPollTime = aws.Int64(10)
rpcTimeout = time.Minute
sqsRequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "sqs_request_duration_nanoseconds",
Help: "Time spent doing SQS requests.",
}, []string{"method", "status_code"})
)
func init() {
prometheus.MustRegister(sqsRequestDuration)
}
// sqsControlRouter:
// Creates a queue for every probe that connects to it, and a queue for
// responses back to it. When it receives a request, posts it to the
@@ -80,12 +90,16 @@ func (cr *sqsControlRouter) getResponseQueueURL() *string {
func (cr *sqsControlRouter) getOrCreateQueue(name string) (*string, error) {
// CreateQueue creates a queue or if it already exists, returns url of said queue
start := time.Now()
createQueueRes, err := cr.service.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(name),
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("CreateQueue", "500").Observe(float64(duration.Nanoseconds()))
return nil, err
}
sqsRequestDuration.WithLabelValues("CreateQueue", "200").Observe(float64(duration.Nanoseconds()))
return createQueueRes.QueueUrl, nil
}
@@ -108,14 +122,19 @@ func (cr *sqsControlRouter) loop() {
}
for {
start := time.Now()
res, err := cr.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: responseQueueURL,
WaitTimeSeconds: longPollTime,
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("ReceiveMessage", "500").Observe(float64(duration.Nanoseconds()))
log.Errorf("Error receiving message from %s: %v", *responseQueueURL, err)
continue
}
sqsRequestDuration.WithLabelValues("ReceiveMessage", "200").Observe(float64(duration.Nanoseconds()))
if len(res.Messages) == 0 {
continue
}
@@ -134,10 +153,17 @@ func (cr *sqsControlRouter) deleteMessages(queueURL *string, messages []*sqs.Mes
Id: message.MessageId,
})
}
start := time.Now()
_, err := cr.service.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
QueueUrl: queueURL,
Entries: entries,
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("DeleteMessageBatch", "500").Observe(float64(duration.Nanoseconds()))
} else {
sqsRequestDuration.WithLabelValues("DeleteMessageBatch", "200").Observe(float64(duration.Nanoseconds()))
}
return err
}
@@ -167,10 +193,17 @@ func (cr *sqsControlRouter) sendMessage(queueURL *string, message interface{}) e
return err
}
log.Infof("sendMessage to %s: %s", *queueURL, buf.String())
start := time.Now()
_, err := cr.service.SendMessage(&sqs.SendMessageInput{
QueueUrl: queueURL,
MessageBody: aws.String(buf.String()),
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("SendMessage", "500").Observe(float64(duration.Nanoseconds()))
} else {
sqsRequestDuration.WithLabelValues("SendMessage", "200").Observe(float64(duration.Nanoseconds()))
}
return err
}
@@ -188,12 +221,16 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer
}
probeQueueName := fmt.Sprintf("probe-%s-%s", userID, probeID)
start := time.Now()
probeQueueURL, err := cr.service.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(probeQueueName),
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("GetQueueUrl", "500").Observe(float64(duration.Nanoseconds()))
return xfer.Response{}, err
}
sqsRequestDuration.WithLabelValues("GetQueueUrl", "200").Observe(float64(duration.Nanoseconds()))
// Add a response channel before we send the request, to prevent races
id := fmt.Sprintf("request-%s-%d", userID, rand.Int63())
@@ -213,6 +250,7 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer
Request: req,
ResponseQueueURL: *responseQueueURL,
}); err != nil {
sqsRequestDuration.WithLabelValues("GetQueueUrl", "500").Observe(float64(duration.Nanoseconds()))
return xfer.Response{}, err
}
@@ -287,14 +325,19 @@ func (pw *probeWorker) loop() {
default:
}
start := time.Now()
res, err := pw.router.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: pw.requestQueueURL,
WaitTimeSeconds: longPollTime,
})
duration := time.Now().Sub(start)
if err != nil {
sqsRequestDuration.WithLabelValues("ReceiveMessage", "500").Observe(float64(duration.Nanoseconds()))
log.Errorf("Error recieving message: %v", err)
continue
}
sqsRequestDuration.WithLabelValues("ReceiveMessage", "200").Observe(float64(duration.Nanoseconds()))
if len(res.Messages) == 0 {
continue
}