diff --git a/app/multitenant/sqs_control_router.go b/app/multitenant/sqs_control_router.go index 405051eef..8beb16c96 100644 --- a/app/multitenant/sqs_control_router.go +++ b/app/multitenant/sqs_control_router.go @@ -12,6 +12,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/context" "github.com/weaveworks/scope/app" @@ -19,10 +20,19 @@ import ( ) var ( - longPollTime = aws.Int64(10) - rpcTimeout = time.Minute + longPollTime = aws.Int64(10) + rpcTimeout = time.Minute + sqsRequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "scope", + Name: "sqs_request_duration_nanoseconds", + Help: "Time spent doing SQS requests.", + }, []string{"method", "status_code"}) ) +func init() { + prometheus.MustRegister(sqsRequestDuration) +} + // sqsControlRouter: // Creates a queue for every probe that connects to it, and a queue for // responses back to it. When it receives a request, posts it to the @@ -80,12 +90,16 @@ func (cr *sqsControlRouter) getResponseQueueURL() *string { func (cr *sqsControlRouter) getOrCreateQueue(name string) (*string, error) { // CreateQueue creates a queue or if it already exists, returns url of said queue + start := time.Now() createQueueRes, err := cr.service.CreateQueue(&sqs.CreateQueueInput{ QueueName: aws.String(name), }) + duration := time.Now().Sub(start) if err != nil { + sqsRequestDuration.WithLabelValues("CreateQueue", "500").Observe(float64(duration.Nanoseconds())) return nil, err } + sqsRequestDuration.WithLabelValues("CreateQueue", "200").Observe(float64(duration.Nanoseconds())) return createQueueRes.QueueUrl, nil } @@ -108,14 +122,19 @@ func (cr *sqsControlRouter) loop() { } for { + start := time.Now() res, err := cr.service.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: responseQueueURL, WaitTimeSeconds: longPollTime, }) + duration := time.Now().Sub(start) if err != nil { + sqsRequestDuration.WithLabelValues("ReceiveMessage", "500").Observe(float64(duration.Nanoseconds())) log.Errorf("Error receiving message from %s: %v", *responseQueueURL, err) continue } + sqsRequestDuration.WithLabelValues("ReceiveMessage", "200").Observe(float64(duration.Nanoseconds())) + if len(res.Messages) == 0 { continue } @@ -134,10 +153,17 @@ func (cr *sqsControlRouter) deleteMessages(queueURL *string, messages []*sqs.Mes Id: message.MessageId, }) } + start := time.Now() _, err := cr.service.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{ QueueUrl: queueURL, Entries: entries, }) + duration := time.Now().Sub(start) + if err != nil { + sqsRequestDuration.WithLabelValues("DeleteMessageBatch", "500").Observe(float64(duration.Nanoseconds())) + } else { + sqsRequestDuration.WithLabelValues("DeleteMessageBatch", "200").Observe(float64(duration.Nanoseconds())) + } return err } @@ -167,10 +193,17 @@ func (cr *sqsControlRouter) sendMessage(queueURL *string, message interface{}) e return err } log.Infof("sendMessage to %s: %s", *queueURL, buf.String()) + start := time.Now() _, err := cr.service.SendMessage(&sqs.SendMessageInput{ QueueUrl: queueURL, MessageBody: aws.String(buf.String()), }) + duration := time.Now().Sub(start) + if err != nil { + sqsRequestDuration.WithLabelValues("SendMessage", "500").Observe(float64(duration.Nanoseconds())) + } else { + sqsRequestDuration.WithLabelValues("SendMessage", "200").Observe(float64(duration.Nanoseconds())) + } return err } @@ -188,12 +221,16 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer } probeQueueName := fmt.Sprintf("probe-%s-%s", userID, probeID) + start := time.Now() probeQueueURL, err := cr.service.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String(probeQueueName), }) + duration := time.Now().Sub(start) if err != nil { + sqsRequestDuration.WithLabelValues("GetQueueUrl", "500").Observe(float64(duration.Nanoseconds())) return xfer.Response{}, err } + sqsRequestDuration.WithLabelValues("GetQueueUrl", "200").Observe(float64(duration.Nanoseconds())) // Add a response channel before we send the request, to prevent races id := fmt.Sprintf("request-%s-%d", userID, rand.Int63()) @@ -213,6 +250,7 @@ func (cr *sqsControlRouter) Handle(ctx context.Context, probeID string, req xfer Request: req, ResponseQueueURL: *responseQueueURL, }); err != nil { + sqsRequestDuration.WithLabelValues("GetQueueUrl", "500").Observe(float64(duration.Nanoseconds())) return xfer.Response{}, err } @@ -287,14 +325,19 @@ func (pw *probeWorker) loop() { default: } + start := time.Now() res, err := pw.router.service.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: pw.requestQueueURL, WaitTimeSeconds: longPollTime, }) + duration := time.Now().Sub(start) if err != nil { + sqsRequestDuration.WithLabelValues("ReceiveMessage", "500").Observe(float64(duration.Nanoseconds())) log.Errorf("Error recieving message: %v", err) continue } + sqsRequestDuration.WithLabelValues("ReceiveMessage", "200").Observe(float64(duration.Nanoseconds())) + if len(res.Messages) == 0 { continue }