From 9142325d5493bd7b3e4f8fc242ec0a6eada4b8d1 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 12 Nov 2015 16:24:31 +0000 Subject: [PATCH 1/2] Instruments probe runtime to find slow reporter. --- probe/docker/reporter.go | 3 + probe/docker/tagger.go | 3 + probe/endpoint/reporter.go | 3 + probe/host/reporter.go | 3 + probe/host/tagger.go | 3 + probe/kubernetes/reporter.go | 3 + probe/probe.go | 35 +- probe/probe_internal_test.go | 2 + probe/process/reporter.go | 3 + probe/process/walker.go | 3 + probe/topology_tagger.go | 2 + prog/probe/main.go | 8 + .../DataDog/datadog-go/statsd/README.md | 45 +++ .../DataDog/datadog-go/statsd/statsd.go | 353 ++++++++++++++++++ .../DataDog/datadog-go/statsd/statsd_test.go | 312 ++++++++++++++++ vendor/github.com/armon/go-metrics/LICENSE | 20 + vendor/github.com/armon/go-metrics/README.md | 71 ++++ .../github.com/armon/go-metrics/const_unix.go | 12 + .../armon/go-metrics/const_windows.go | 13 + .../armon/go-metrics/datadog/dogstatsd.go | 109 ++++++ .../go-metrics/datadog/dogstatsd_test.go | 121 ++++++ vendor/github.com/armon/go-metrics/inmem.go | 241 ++++++++++++ .../armon/go-metrics/inmem_signal.go | 100 +++++ .../armon/go-metrics/inmem_signal_test.go | 46 +++ .../github.com/armon/go-metrics/inmem_test.go | 104 ++++++ vendor/github.com/armon/go-metrics/metrics.go | 115 ++++++ .../armon/go-metrics/metrics_test.go | 262 +++++++++++++ .../armon/go-metrics/prometheus/prometheus.go | 88 +++++ vendor/github.com/armon/go-metrics/sink.go | 52 +++ .../github.com/armon/go-metrics/sink_test.go | 120 ++++++ vendor/github.com/armon/go-metrics/start.go | 95 +++++ .../github.com/armon/go-metrics/start_test.go | 110 ++++++ vendor/github.com/armon/go-metrics/statsd.go | 154 ++++++++ .../armon/go-metrics/statsd_test.go | 105 ++++++ .../github.com/armon/go-metrics/statsite.go | 142 +++++++ .../armon/go-metrics/statsite_test.go | 101 +++++ vendor/manifest | 13 + 37 files changed, 2963 insertions(+), 12 deletions(-) create mode 100644 vendor/github.com/DataDog/datadog-go/statsd/README.md create mode 100644 vendor/github.com/DataDog/datadog-go/statsd/statsd.go create mode 100644 vendor/github.com/DataDog/datadog-go/statsd/statsd_test.go create mode 100644 vendor/github.com/armon/go-metrics/LICENSE create mode 100644 vendor/github.com/armon/go-metrics/README.md create mode 100644 vendor/github.com/armon/go-metrics/const_unix.go create mode 100644 vendor/github.com/armon/go-metrics/const_windows.go create mode 100644 vendor/github.com/armon/go-metrics/datadog/dogstatsd.go create mode 100644 vendor/github.com/armon/go-metrics/datadog/dogstatsd_test.go create mode 100644 vendor/github.com/armon/go-metrics/inmem.go create mode 100644 vendor/github.com/armon/go-metrics/inmem_signal.go create mode 100644 vendor/github.com/armon/go-metrics/inmem_signal_test.go create mode 100644 vendor/github.com/armon/go-metrics/inmem_test.go create mode 100644 vendor/github.com/armon/go-metrics/metrics.go create mode 100644 vendor/github.com/armon/go-metrics/metrics_test.go create mode 100644 vendor/github.com/armon/go-metrics/prometheus/prometheus.go create mode 100644 vendor/github.com/armon/go-metrics/sink.go create mode 100644 vendor/github.com/armon/go-metrics/sink_test.go create mode 100644 vendor/github.com/armon/go-metrics/start.go create mode 100644 vendor/github.com/armon/go-metrics/start_test.go create mode 100644 vendor/github.com/armon/go-metrics/statsd.go create mode 100644 vendor/github.com/armon/go-metrics/statsd_test.go create mode 100644 vendor/github.com/armon/go-metrics/statsite.go create mode 100644 vendor/github.com/armon/go-metrics/statsite_test.go diff --git a/probe/docker/reporter.go b/probe/docker/reporter.go index 6d1e088da..bd1ed0dc0 100644 --- a/probe/docker/reporter.go +++ b/probe/docker/reporter.go @@ -34,6 +34,9 @@ func NewReporter(registry Registry, hostID string, probe *probe.Probe) *Reporter return reporter } +// Name of this reporter, for metrics gathering +func (Reporter) Name() string { return "Docker" } + // ContainerUpdated should be called whenever a container is updated. func (r *Reporter) ContainerUpdated(c Container) { localAddrs, err := report.LocalAddresses() diff --git a/probe/docker/tagger.go b/probe/docker/tagger.go index 0b1a731ea..09b34d0ae 100644 --- a/probe/docker/tagger.go +++ b/probe/docker/tagger.go @@ -34,6 +34,9 @@ func NewTagger(registry Registry, procWalker process.Walker) *Tagger { } } +// Name of this tagger, for metrics gathering +func (Tagger) Name() string { return "Docker" } + // Tag implements Tagger. func (t *Tagger) Tag(r report.Report) (report.Report, error) { tree, err := NewProcessTreeStub(t.procWalker) diff --git a/probe/endpoint/reporter.go b/probe/endpoint/reporter.go index 88ef46bf9..9f8f66ff2 100644 --- a/probe/endpoint/reporter.go +++ b/probe/endpoint/reporter.go @@ -58,6 +58,9 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo } } +// Name of this reporter, for metrics gathering +func (Reporter) Name() string { return "Endpoint" } + // Stop stop stop func (r *Reporter) Stop() { r.flowWalker.stop() diff --git a/probe/host/reporter.go b/probe/host/reporter.go index 9ab8dfc1b..1d1425f94 100644 --- a/probe/host/reporter.go +++ b/probe/host/reporter.go @@ -48,6 +48,9 @@ func NewReporter(hostID, hostName string, localNets report.Networks) *Reporter { } } +// Name of this reporter, for metrics gathering +func (Reporter) Name() string { return "Host" } + // Report implements Reporter. func (r *Reporter) Report() (report.Report, error) { var ( diff --git a/probe/host/tagger.go b/probe/host/tagger.go index 4bba3ac0d..6b5237896 100644 --- a/probe/host/tagger.go +++ b/probe/host/tagger.go @@ -21,6 +21,9 @@ func NewTagger(hostID, probeID string) Tagger { } } +// Name of this tagger, for metrics gathering +func (Tagger) Name() string { return "Host" } + // Tag implements Tagger. func (t Tagger) Tag(r report.Report) (report.Report, error) { metadata := map[string]string{ diff --git a/probe/kubernetes/reporter.go b/probe/kubernetes/reporter.go index 766083c9c..816c0f005 100644 --- a/probe/kubernetes/reporter.go +++ b/probe/kubernetes/reporter.go @@ -16,6 +16,9 @@ func NewReporter(client Client) *Reporter { } } +// Name of this reporter, for metrics gathering +func (Reporter) Name() string { return "K8s" } + // Report generates a Report containing Container and ContainerImage topologies func (r *Reporter) Report() (report.Report, error) { result := report.MakeReport() diff --git a/probe/probe.go b/probe/probe.go index a0700ba2f..c28ac688b 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "github.com/armon/go-metrics" + "github.com/weaveworks/scope/report" "github.com/weaveworks/scope/xfer" ) @@ -31,11 +33,13 @@ type Probe struct { // Tagger tags nodes with value-add node metadata. type Tagger interface { + Name() string Tag(r report.Report) (report.Report, error) } // Reporter generates Reports. type Reporter interface { + Name() string Report() (report.Report, error) } @@ -43,6 +47,7 @@ type Reporter interface { // It's useful for things that should be updated on that interval. // For example, cached shared state between Taggers and Reporters. type Ticker interface { + Name() string Tick() error } @@ -100,32 +105,36 @@ func (p *Probe) spyLoop() { for { select { case <-spyTick: - start := time.Now() - for _, ticker := range p.tickers { - if err := ticker.Tick(); err != nil { - log.Printf("error doing ticker: %v", err) - } - } - + t := time.Now() + p.tick() rpt := p.report() rpt = p.tag(rpt) p.spiedReports <- rpt - - if took := time.Since(start); took > p.spyInterval { - log.Printf("report generation took too long (%s)", took) - } - + metrics.MeasureSince([]string{"Report Generaton"}, t) case <-p.quit: return } } } +func (p *Probe) tick() { + for _, ticker := range p.tickers { + t := time.Now() + err := ticker.Tick() + metrics.MeasureSince([]string{ticker.Name(), "ticker"}, t) + if err != nil { + log.Printf("error doing ticker: %v", err) + } + } +} + func (p *Probe) report() report.Report { reports := make(chan report.Report, len(p.reporters)) for _, rep := range p.reporters { go func(rep Reporter) { + t := time.Now() newReport, err := rep.Report() + metrics.MeasureSince([]string{rep.Name(), "reporter"}, t) if err != nil { log.Printf("error generating report: %v", err) newReport = report.MakeReport() // empty is OK to merge @@ -144,7 +153,9 @@ func (p *Probe) report() report.Report { func (p *Probe) tag(r report.Report) report.Report { var err error for _, tagger := range p.taggers { + t := time.Now() r, err = tagger.Tag(r) + metrics.MeasureSince([]string{tagger.Name(), "tagger"}, t) if err != nil { log.Printf("error applying tagger: %v", err) } diff --git a/probe/probe_internal_test.go b/probe/probe_internal_test.go index 6aa1f4869..3370447e5 100644 --- a/probe/probe_internal_test.go +++ b/probe/probe_internal_test.go @@ -50,6 +50,8 @@ func (m mockReporter) Report() (report.Report, error) { return m.r.Copy(), nil } +func (mockReporter) Name() string { return "Mock" } + type mockPublisher struct { have chan report.Report } diff --git a/probe/process/reporter.go b/probe/process/reporter.go index f7b22ffef..fa03d754d 100644 --- a/probe/process/reporter.go +++ b/probe/process/reporter.go @@ -29,6 +29,9 @@ func NewReporter(walker Walker, scope string) *Reporter { } } +// Name of this reporter, for metrics gathering +func (Reporter) Name() string { return "Process" } + // Report implements Reporter. func (r *Reporter) Report() (report.Report, error) { result := report.MakeReport() diff --git a/probe/process/walker.go b/probe/process/walker.go index 4ba20027e..da8d92fc2 100644 --- a/probe/process/walker.go +++ b/probe/process/walker.go @@ -28,6 +28,9 @@ func NewCachingWalker(source Walker) *CachingWalker { return &CachingWalker{source: source} } +// Name of this ticker, for metrics gathering +func (*CachingWalker) Name() string { return "Process" } + // Walk walks a cached copy of process list func (c *CachingWalker) Walk(f func(Process)) error { c.cacheLock.RLock() diff --git a/probe/topology_tagger.go b/probe/topology_tagger.go index 49e825b7e..1c8f975f9 100644 --- a/probe/topology_tagger.go +++ b/probe/topology_tagger.go @@ -15,6 +15,8 @@ func NewTopologyTagger() Tagger { return &topologyTagger{} } +func (topologyTagger) Name() string { return "Topology" } + // Tag implements Tagger func (topologyTagger) Tag(r report.Report) (report.Report, error) { for val, topology := range map[string]*report.Topology{ diff --git a/prog/probe/main.go b/prog/probe/main.go index 6e2d24222..5eae454a2 100644 --- a/prog/probe/main.go +++ b/prog/probe/main.go @@ -15,6 +15,8 @@ import ( "syscall" "time" + "github.com/armon/go-metrics" + "github.com/weaveworks/scope/probe" "github.com/weaveworks/scope/probe/controls" "github.com/weaveworks/scope/probe/docker" @@ -57,6 +59,12 @@ func main() { return } + // Setup in memory metrics sink + inm := metrics.NewInmemSink(time.Minute, 2*time.Minute) + sig := metrics.DefaultInmemSignal(inm) + defer sig.Stop() + metrics.NewGlobal(metrics.DefaultConfig("scope-probe"), inm) + if !strings.HasSuffix(*logPrefix, " ") { *logPrefix += " " } diff --git a/vendor/github.com/DataDog/datadog-go/statsd/README.md b/vendor/github.com/DataDog/datadog-go/statsd/README.md new file mode 100644 index 000000000..c3b462f85 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/README.md @@ -0,0 +1,45 @@ +## Overview + +Package `statsd` provides a Go [dogstatsd](http://docs.datadoghq.com/guides/dogstatsd/) client. Dogstatsd extends Statsd, adding tags +and histograms. + +## Get the code + + $ go get github.com/DataDog/datadog-go/statsd + +## Usage + +```go +// Create the client +c, err := statsd.New("127.0.0.1:8125") +if err != nil { + log.Fatal(err) +} +// Prefix every metric with the app name +c.Namespace = "flubber." +// Send the EC2 availability zone as a tag with every metric +c.Tags = append(c.Tags, "us-east-1a") +err = c.Gauge("request.duration", 1.2, nil, 1) +``` + +## Buffering Client + +Dogstatsd accepts packets with multiple statsd payloads in them. Using the BufferingClient via `NewBufferingClient` will buffer up commands and send them when the buffer is reached or after 100msec. + +## Development + +Run the tests with: + + $ go test + +## Documentation + +Please see: http://godoc.org/github.com/DataDog/datadog-go/statsd + +## License + +go-dogstatsd is released under the [MIT license](http://www.opensource.org/licenses/mit-license.php). + +## Credits + +Original code by [ooyala](https://github.com/ooyala/go-dogstatsd). diff --git a/vendor/github.com/DataDog/datadog-go/statsd/statsd.go b/vendor/github.com/DataDog/datadog-go/statsd/statsd.go new file mode 100644 index 000000000..4038b890a --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/statsd.go @@ -0,0 +1,353 @@ +// Copyright 2013 Ooyala, Inc. + +/* +Package statsd provides a Go dogstatsd client. Dogstatsd extends the popular statsd, +adding tags and histograms and pushing upstream to Datadog. + +Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD. + +Example Usage: + + // Create the client + c, err := statsd.New("127.0.0.1:8125") + if err != nil { + log.Fatal(err) + } + // Prefix every metric with the app name + c.Namespace = "flubber." + // Send the EC2 availability zone as a tag with every metric + c.Tags = append(c.Tags, "us-east-1a") + err = c.Gauge("request.duration", 1.2, nil, 1) + +statsd is based on go-statsd-client. +*/ +package statsd + +import ( + "bytes" + "fmt" + "math/rand" + "net" + "strconv" + "strings" + "sync" + "time" +) + +// A Client is a handle for sending udp messages to dogstatsd. It is safe to +// use one Client from multiple goroutines simultaneously. +type Client struct { + conn net.Conn + // Namespace to prepend to all statsd calls + Namespace string + // Tags are global tags to be added to every statsd call + Tags []string + // BufferLength is the length of the buffer in commands. + bufferLength int + flushTime time.Duration + commands []string + stop bool + sync.Mutex +} + +// New returns a pointer to a new Client given an addr in the format "hostname:port". +func New(addr string) (*Client, error) { + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return nil, err + } + client := &Client{conn: conn} + return client, nil +} + +// NewBuffered returns a Client that buffers its output and sends it in chunks. +// Buflen is the length of the buffer in number of commands. +func NewBuffered(addr string, buflen int) (*Client, error) { + client, err := New(addr) + if err != nil { + return nil, err + } + client.bufferLength = buflen + client.commands = make([]string, 0, buflen) + client.flushTime = time.Millisecond * 100 + go client.watch() + return client, nil +} + +// format a message from its name, value, tags and rate. Also adds global +// namespace and tags. +func (c *Client) format(name, value string, tags []string, rate float64) string { + var buf bytes.Buffer + if c.Namespace != "" { + buf.WriteString(c.Namespace) + } + buf.WriteString(name) + buf.WriteString(":") + buf.WriteString(value) + if rate < 1 { + buf.WriteString(`|@`) + buf.WriteString(strconv.FormatFloat(rate, 'f', -1, 64)) + } + + tags = append(c.Tags, tags...) + if len(tags) > 0 { + buf.WriteString("|#") + buf.WriteString(tags[0]) + for _, tag := range tags[1:] { + buf.WriteString(",") + buf.WriteString(tag) + } + } + return buf.String() +} + +func (c *Client) watch() { + for _ = range time.Tick(c.flushTime) { + if c.stop { + return + } + c.Lock() + if len(c.commands) > 0 { + // FIXME: eating error here + c.flush() + } + c.Unlock() + } +} + +func (c *Client) append(cmd string) error { + c.Lock() + c.commands = append(c.commands, cmd) + // if we should flush, lets do it + if len(c.commands) == c.bufferLength { + if err := c.flush(); err != nil { + c.Unlock() + return err + } + } + c.Unlock() + return nil +} + +// flush the commands in the buffer. Lock must be held by caller. +func (c *Client) flush() error { + data := strings.Join(c.commands, "\n") + _, err := c.conn.Write([]byte(data)) + // clear the slice with a slice op, doesn't realloc + c.commands = c.commands[:0] + return err +} + +func (c *Client) sendMsg(msg string) error { + // if this client is buffered, then we'll just append this + if c.bufferLength > 0 { + return c.append(msg) + } + c.Lock() + _, err := c.conn.Write([]byte(msg)) + c.Unlock() + return err +} + +// send handles sampling and sends the message over UDP. It also adds global namespace prefixes and tags. +func (c *Client) send(name, value string, tags []string, rate float64) error { + if c == nil { + return nil + } + if rate < 1 && rand.Float64() > rate { + return nil + } + data := c.format(name, value, tags, rate) + return c.sendMsg(data) +} + +// Gauge measures the value of a metric at a particular time. +func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error { + stat := fmt.Sprintf("%f|g", value) + return c.send(name, stat, tags, rate) +} + +// Count tracks how many times something happened per second. +func (c *Client) Count(name string, value int64, tags []string, rate float64) error { + stat := fmt.Sprintf("%d|c", value) + return c.send(name, stat, tags, rate) +} + +// Histogram tracks the statistical distribution of a set of values. +func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error { + stat := fmt.Sprintf("%f|h", value) + return c.send(name, stat, tags, rate) +} + +// Set counts the number of unique elements in a group. +func (c *Client) Set(name string, value string, tags []string, rate float64) error { + stat := fmt.Sprintf("%s|s", value) + return c.send(name, stat, tags, rate) +} + +// TimeInMilliseconds sends timing information in milliseconds. +// It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing) +func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error { + stat := fmt.Sprintf("%f|ms", value) + return c.send(name, stat, tags, rate) +} + +// Event sends the provided Event. +func (c *Client) Event(e *Event) error { + stat, err := e.Encode(c.Tags...) + if err != nil { + return err + } + return c.sendMsg(stat) +} + +// SimpleEvent sends an event with the provided title and text. +func (c *Client) SimpleEvent(title, text string) error { + e := NewEvent(title, text) + return c.Event(e) +} + +// Close the client connection. +func (c *Client) Close() error { + if c == nil { + return nil + } + c.stop = true + return c.conn.Close() +} + +// Events support + +type eventAlertType string + +const ( + // Info is the "info" AlertType for events + Info eventAlertType = "info" + // Error is the "error" AlertType for events + Error eventAlertType = "error" + // Warning is the "warning" AlertType for events + Warning eventAlertType = "warning" + // Success is the "success" AlertType for events + Success eventAlertType = "success" +) + +type eventPriority string + +const ( + // Normal is the "normal" Priority for events + Normal eventPriority = "normal" + // Low is the "low" Priority for events + Low eventPriority = "low" +) + +// An Event is an object that can be posted to your DataDog event stream. +type Event struct { + // Title of the event. Required. + Title string + // Text is the description of the event. Required. + Text string + // Timestamp is a timestamp for the event. If not provided, the dogstatsd + // server will set this to the current time. + Timestamp time.Time + // Hostname for the event. + Hostname string + // AggregationKey groups this event with others of the same key. + AggregationKey string + // Priority of the event. Can be statsd.Low or statsd.Normal. + Priority eventPriority + // SourceTypeName is a source type for the event. + SourceTypeName string + // AlertType can be statsd.Info, statsd.Error, statsd.Warning, or statsd.Success. + // If absent, the default value applied by the dogstatsd server is Info. + AlertType eventAlertType + // Tags for the event. + Tags []string +} + +// NewEvent creates a new event with the given title and text. Error checking +// against these values is done at send-time, or upon running e.Check. +func NewEvent(title, text string) *Event { + return &Event{ + Title: title, + Text: text, + } +} + +// Check verifies that an event is valid. +func (e Event) Check() error { + if len(e.Title) == 0 { + return fmt.Errorf("statsd.Event title is required") + } + if len(e.Text) == 0 { + return fmt.Errorf("statsd.Event text is required") + } + return nil +} + +// Encode returns the dogstatsd wire protocol representation for an event. +// Tags may be passed which will be added to the encoded output but not to +// the Event's list of tags, eg. for default tags. +func (e Event) Encode(tags ...string) (string, error) { + err := e.Check() + if err != nil { + return "", err + } + var buffer bytes.Buffer + buffer.WriteString("_e{") + buffer.WriteString(strconv.FormatInt(int64(len(e.Title)), 10)) + buffer.WriteRune(',') + buffer.WriteString(strconv.FormatInt(int64(len(e.Text)), 10)) + buffer.WriteString("}:") + buffer.WriteString(e.Title) + buffer.WriteRune('|') + buffer.WriteString(e.Text) + + if !e.Timestamp.IsZero() { + buffer.WriteString("|d:") + buffer.WriteString(strconv.FormatInt(int64(e.Timestamp.Unix()), 10)) + } + + if len(e.Hostname) != 0 { + buffer.WriteString("|h:") + buffer.WriteString(e.Hostname) + } + + if len(e.AggregationKey) != 0 { + buffer.WriteString("|k:") + buffer.WriteString(e.AggregationKey) + + } + + if len(e.Priority) != 0 { + buffer.WriteString("|p:") + buffer.WriteString(string(e.Priority)) + } + + if len(e.SourceTypeName) != 0 { + buffer.WriteString("|s:") + buffer.WriteString(e.SourceTypeName) + } + + if len(e.AlertType) != 0 { + buffer.WriteString("|t:") + buffer.WriteString(string(e.AlertType)) + } + + if len(tags)+len(e.Tags) > 0 { + all := make([]string, 0, len(tags)+len(e.Tags)) + all = append(all, tags...) + all = append(all, e.Tags...) + buffer.WriteString("|#") + buffer.WriteString(all[0]) + for _, tag := range all[1:] { + buffer.WriteString(",") + buffer.WriteString(tag) + } + } + + return buffer.String(), nil +} diff --git a/vendor/github.com/DataDog/datadog-go/statsd/statsd_test.go b/vendor/github.com/DataDog/datadog-go/statsd/statsd_test.go new file mode 100644 index 000000000..1f12fc04c --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/statsd_test.go @@ -0,0 +1,312 @@ +// Copyright 2013 Ooyala, Inc. + +package statsd + +import ( + "fmt" + "io" + "net" + "reflect" + "strings" + "testing" +) + +var dogstatsdTests = []struct { + GlobalNamespace string + GlobalTags []string + Method string + Metric string + Value interface{} + Tags []string + Rate float64 + Expected string +}{ + {"", nil, "Gauge", "test.gauge", 1.0, nil, 1.0, "test.gauge:1.000000|g"}, + {"", nil, "Gauge", "test.gauge", 1.0, nil, 0.999999, "test.gauge:1.000000|g|@0.999999"}, + {"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 1.0, "test.gauge:1.000000|g|#tagA"}, + {"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA", "tagB"}, 1.0, "test.gauge:1.000000|g|#tagA,tagB"}, + {"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 0.999999, "test.gauge:1.000000|g|@0.999999|#tagA"}, + {"", nil, "Count", "test.count", int64(1), []string{"tagA"}, 1.0, "test.count:1|c|#tagA"}, + {"", nil, "Count", "test.count", int64(-1), []string{"tagA"}, 1.0, "test.count:-1|c|#tagA"}, + {"", nil, "Histogram", "test.histogram", 2.3, []string{"tagA"}, 1.0, "test.histogram:2.300000|h|#tagA"}, + {"", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagA"}, + {"flubber.", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "flubber.test.set:uuid|s|#tagA"}, + {"", []string{"tagC"}, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagC,tagA"}, +} + +func assertNotPanics(t *testing.T, f func()) { + defer func() { + if r := recover(); r != nil { + t.Fatal(r) + } + }() + f() +} + +func TestClient(t *testing.T) { + addr := "localhost:1201" + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + t.Fatal(err) + } + + server, err := net.ListenUDP("udp", udpAddr) + if err != nil { + t.Fatal(err) + } + defer server.Close() + + client, err := New(addr) + if err != nil { + t.Fatal(err) + } + + for _, tt := range dogstatsdTests { + client.Namespace = tt.GlobalNamespace + client.Tags = tt.GlobalTags + method := reflect.ValueOf(client).MethodByName(tt.Method) + e := method.Call([]reflect.Value{ + reflect.ValueOf(tt.Metric), + reflect.ValueOf(tt.Value), + reflect.ValueOf(tt.Tags), + reflect.ValueOf(tt.Rate)})[0] + errInter := e.Interface() + if errInter != nil { + t.Fatal(errInter.(error)) + } + + bytes := make([]byte, 1024) + n, err := server.Read(bytes) + if err != nil { + t.Fatal(err) + } + message := bytes[:n] + if string(message) != tt.Expected { + t.Errorf("Expected: %s. Actual: %s", tt.Expected, string(message)) + } + } +} + +func TestBufferedClient(t *testing.T) { + addr := "localhost:1201" + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + t.Fatal(err) + } + + server, err := net.ListenUDP("udp", udpAddr) + if err != nil { + t.Fatal(err) + } + defer server.Close() + + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + t.Fatal(err) + } + + bufferLength := 5 + client := &Client{ + conn: conn, + commands: make([]string, 0, bufferLength), + bufferLength: bufferLength, + } + + client.Namespace = "foo." + client.Tags = []string{"dd:2"} + + client.Count("cc", 1, nil, 1) + client.Gauge("gg", 10, nil, 1) + client.Histogram("hh", 1, nil, 1) + client.Set("ss", "ss", nil, 1) + + if len(client.commands) != 4 { + t.Errorf("Expected client to have buffered 4 commands, but found %d\n", len(client.commands)) + } + + client.Set("ss", "xx", nil, 1) + err = client.flush() + if err != nil { + t.Errorf("Error sending: %s", err) + } + + if len(client.commands) != 0 { + t.Errorf("Expecting send to flush commands, but found %d\n", len(client.commands)) + } + + buffer := make([]byte, 4096) + n, err := io.ReadAtLeast(server, buffer, 1) + result := string(buffer[:n]) + + if err != nil { + t.Error(err) + } + + expected := []string{ + `foo.cc:1|c|#dd:2`, + `foo.gg:10.000000|g|#dd:2`, + `foo.hh:1.000000|h|#dd:2`, + `foo.ss:ss|s|#dd:2`, + `foo.ss:xx|s|#dd:2`, + } + + for i, res := range strings.Split(result, "\n") { + if res != expected[i] { + t.Errorf("Got `%s`, expected `%s`", res, expected[i]) + } + } + + client.Event(&Event{Title: "title1", Text: "text1", Priority: Normal, AlertType: Success, Tags: []string{"tagg"}}) + client.SimpleEvent("event1", "text1") + + if len(client.commands) != 2 { + t.Errorf("Expected to find %d commands, but found %d\n", 2, len(client.commands)) + } + + err = client.flush() + + if err != nil { + t.Errorf("Error sending: %s", err) + } + + if len(client.commands) != 0 { + t.Errorf("Expecting send to flush commands, but found %d\n", len(client.commands)) + } + + buffer = make([]byte, 1024) + n, err = io.ReadAtLeast(server, buffer, 1) + result = string(buffer[:n]) + + if err != nil { + t.Error(err) + } + + if n == 0 { + t.Errorf("Read 0 bytes but expected more.") + } + + expected = []string{ + `_e{6,5}:title1|text1|p:normal|t:success|#dd:2,tagg`, + `_e{6,5}:event1|text1|#dd:2`, + } + + for i, res := range strings.Split(result, "\n") { + if res != expected[i] { + t.Errorf("Got `%s`, expected `%s`", res, expected[i]) + } + } + +} + +func TestNilSafe(t *testing.T) { + var c *Client + assertNotPanics(t, func() { c.Close() }) + assertNotPanics(t, func() { c.Count("", 0, nil, 1) }) + assertNotPanics(t, func() { c.Histogram("", 0, nil, 1) }) + assertNotPanics(t, func() { c.Gauge("", 0, nil, 1) }) + assertNotPanics(t, func() { c.Set("", "", nil, 1) }) + assertNotPanics(t, func() { c.send("", "", nil, 1) }) +} + +func TestEvents(t *testing.T) { + matrix := []struct { + event *Event + encoded string + }{ + { + NewEvent("Hello", "Something happened to my event"), + `_e{5,30}:Hello|Something happened to my event`, + }, { + &Event{Title: "hi", Text: "okay", AggregationKey: "foo"}, + `_e{2,4}:hi|okay|k:foo`, + }, { + &Event{Title: "hi", Text: "okay", AggregationKey: "foo", AlertType: Info}, + `_e{2,4}:hi|okay|k:foo|t:info`, + }, { + &Event{Title: "hi", Text: "w/e", AlertType: Error, Priority: Normal}, + `_e{2,3}:hi|w/e|p:normal|t:error`, + }, { + &Event{Title: "hi", Text: "uh", Tags: []string{"host:foo", "app:bar"}}, + `_e{2,2}:hi|uh|#host:foo,app:bar`, + }, + } + + for _, m := range matrix { + r, err := m.event.Encode() + if err != nil { + t.Errorf("Error encoding: %s\n", err) + continue + } + if r != m.encoded { + t.Errorf("Expected `%s`, got `%s`\n", m.encoded, r) + } + } + + e := NewEvent("", "hi") + if _, err := e.Encode(); err == nil { + t.Errorf("Expected error on empty Title.") + } + + e = NewEvent("hi", "") + if _, err := e.Encode(); err == nil { + t.Errorf("Expected error on empty Text.") + } + + e = NewEvent("hello", "world") + s, err := e.Encode("tag1", "tag2") + if err != nil { + t.Error(err) + } + expected := "_e{5,5}:hello|world|#tag1,tag2" + if s != expected { + t.Errorf("Expected %s, got %s", expected, s) + } + if len(e.Tags) != 0 { + t.Errorf("Modified event in place illegally.") + } +} + +// These benchmarks show that using a buffer instead of sprintf-ing together +// a bunch of intermediate strings is 4-5x faster + +func BenchmarkFormatNew(b *testing.B) { + b.StopTimer() + c := &Client{} + c.Namespace = "foo.bar." + c.Tags = []string{"app:foo", "host:bar"} + b.StartTimer() + for i := 0; i < b.N; i++ { + c.format("system.cpu.idle", "10", []string{"foo"}, 1) + c.format("system.cpu.load", "0.1", nil, 0.9) + } +} + +// Old formatting function, added to client for tests +func (c *Client) formatOld(name, value string, tags []string, rate float64) string { + if rate < 1 { + value = fmt.Sprintf("%s|@%f", value, rate) + } + if c.Namespace != "" { + name = fmt.Sprintf("%s%s", c.Namespace, name) + } + + tags = append(c.Tags, tags...) + if len(tags) > 0 { + value = fmt.Sprintf("%s|#%s", value, strings.Join(tags, ",")) + } + + return fmt.Sprintf("%s:%s", name, value) + +} + +func BenchmarkFormatOld(b *testing.B) { + b.StopTimer() + c := &Client{} + c.Namespace = "foo.bar." + c.Tags = []string{"app:foo", "host:bar"} + b.StartTimer() + for i := 0; i < b.N; i++ { + c.formatOld("system.cpu.idle", "10", []string{"foo"}, 1) + c.formatOld("system.cpu.load", "0.1", nil, 0.9) + } +} diff --git a/vendor/github.com/armon/go-metrics/LICENSE b/vendor/github.com/armon/go-metrics/LICENSE new file mode 100644 index 000000000..106569e54 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2013 Armon Dadgar + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/armon/go-metrics/README.md b/vendor/github.com/armon/go-metrics/README.md new file mode 100644 index 000000000..7b6f23e29 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/README.md @@ -0,0 +1,71 @@ +go-metrics +========== + +This library provides a `metrics` package which can be used to instrument code, +expose application metrics, and profile runtime performance in a flexible manner. + +Current API: [![GoDoc](https://godoc.org/github.com/armon/go-metrics?status.svg)](https://godoc.org/github.com/armon/go-metrics) + +Sinks +===== + +The `metrics` package makes use of a `MetricSink` interface to support delivery +to any type of backend. Currently the following sinks are provided: + +* StatsiteSink : Sinks to a [statsite](https://github.com/armon/statsite/) instance (TCP) +* StatsdSink: Sinks to a [StatsD](https://github.com/etsy/statsd/) / statsite instance (UDP) +* PrometheusSink: Sinks to a [Prometheus](http://prometheus.io/) metrics endpoint (exposed via HTTP for scrapes) +* InmemSink : Provides in-memory aggregation, can be used to export stats +* FanoutSink : Sinks to multiple sinks. Enables writing to multiple statsite instances for example. +* BlackholeSink : Sinks to nowhere + +In addition to the sinks, the `InmemSignal` can be used to catch a signal, +and dump a formatted output of recent metrics. For example, when a process gets +a SIGUSR1, it can dump to stderr recent performance metrics for debugging. + +Examples +======== + +Here is an example of using the package: + + func SlowMethod() { + // Profiling the runtime of a method + defer metrics.MeasureSince([]string{"SlowMethod"}, time.Now()) + } + + // Configure a statsite sink as the global metrics sink + sink, _ := metrics.NewStatsiteSink("statsite:8125") + metrics.NewGlobal(metrics.DefaultConfig("service-name"), sink) + + // Emit a Key/Value pair + metrics.EmitKey([]string{"questions", "meaning of life"}, 42) + + +Here is an example of setting up an signal handler: + + // Setup the inmem sink and signal handler + inm := metrics.NewInmemSink(10*time.Second, time.Minute) + sig := metrics.DefaultInmemSignal(inm) + metrics.NewGlobal(metrics.DefaultConfig("service-name"), inm) + + // Run some code + inm.SetGauge([]string{"foo"}, 42) + inm.EmitKey([]string{"bar"}, 30) + + inm.IncrCounter([]string{"baz"}, 42) + inm.IncrCounter([]string{"baz"}, 1) + inm.IncrCounter([]string{"baz"}, 80) + + inm.AddSample([]string{"method", "wow"}, 42) + inm.AddSample([]string{"method", "wow"}, 100) + inm.AddSample([]string{"method", "wow"}, 22) + + .... + +When a signal comes in, output like the following will be dumped to stderr: + + [2014-01-28 14:57:33.04 -0800 PST][G] 'foo': 42.000 + [2014-01-28 14:57:33.04 -0800 PST][P] 'bar': 30.000 + [2014-01-28 14:57:33.04 -0800 PST][C] 'baz': Count: 3 Min: 1.000 Mean: 41.000 Max: 80.000 Stddev: 39.509 + [2014-01-28 14:57:33.04 -0800 PST][S] 'method.wow': Count: 3 Min: 22.000 Mean: 54.667 Max: 100.000 Stddev: 40.513 + diff --git a/vendor/github.com/armon/go-metrics/const_unix.go b/vendor/github.com/armon/go-metrics/const_unix.go new file mode 100644 index 000000000..31098dd57 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/const_unix.go @@ -0,0 +1,12 @@ +// +build !windows + +package metrics + +import ( + "syscall" +) + +const ( + // DefaultSignal is used with DefaultInmemSignal + DefaultSignal = syscall.SIGUSR1 +) diff --git a/vendor/github.com/armon/go-metrics/const_windows.go b/vendor/github.com/armon/go-metrics/const_windows.go new file mode 100644 index 000000000..38136af3e --- /dev/null +++ b/vendor/github.com/armon/go-metrics/const_windows.go @@ -0,0 +1,13 @@ +// +build windows + +package metrics + +import ( + "syscall" +) + +const ( + // DefaultSignal is used with DefaultInmemSignal + // Windows has no SIGUSR1, use SIGBREAK + DefaultSignal = syscall.Signal(21) +) diff --git a/vendor/github.com/armon/go-metrics/datadog/dogstatsd.go b/vendor/github.com/armon/go-metrics/datadog/dogstatsd.go new file mode 100644 index 000000000..d217cb83b --- /dev/null +++ b/vendor/github.com/armon/go-metrics/datadog/dogstatsd.go @@ -0,0 +1,109 @@ +package datadog + +import ( + "fmt" + "strings" + + "github.com/DataDog/datadog-go/statsd" +) + +// DogStatsdSink provides a MetricSink that can be used +// with a dogstatsd server. It utilizes the Dogstatsd client at github.com/DataDog/datadog-go/statsd +type DogStatsdSink struct { + client *statsd.Client + hostName string + propagateHostname bool +} + +// NewDogStatsdSink is used to create a new DogStatsdSink with sane defaults +func NewDogStatsdSink(addr string, hostName string) (*DogStatsdSink, error) { + client, err := statsd.New(addr) + if err != nil { + return nil, err + } + sink := &DogStatsdSink{ + client: client, + hostName: hostName, + propagateHostname: false, + } + return sink, nil +} + +// SetTags sets common tags on the Dogstatsd Client that will be sent +// along with all dogstatsd packets. +// Ref: http://docs.datadoghq.com/guides/dogstatsd/#tags +func (s *DogStatsdSink) SetTags(tags []string) { + s.client.Tags = tags +} + +// EnableHostnamePropagation forces a Dogstatsd `host` tag with the value specified by `s.HostName` +// Since the go-metrics package has its own mechanism for attaching a hostname to metrics, +// setting the `propagateHostname` flag ensures that `s.HostName` overrides the host tag naively set by the DogStatsd server +func (s *DogStatsdSink) EnableHostNamePropagation() { + s.propagateHostname = true +} + +func (s *DogStatsdSink) flattenKey(parts []string) string { + joined := strings.Join(parts, ".") + return strings.Map(func(r rune) rune { + switch r { + case ':': + fallthrough + case ' ': + return '_' + default: + return r + } + }, joined) +} + +func (s *DogStatsdSink) parseKey(key []string) ([]string, []string) { + // Since DogStatsd supports dimensionality via tags on metric keys, this sink's approach is to splice the hostname out of the key in favor of a `host` tag + // The `host` tag is either forced here, or set downstream by the DogStatsd server + + var tags []string + hostName := s.hostName + + //Splice the hostname out of the key + for i, el := range key { + if el == hostName { + key = append(key[:i], key[i+1:]...) + } + } + + if s.propagateHostname { + tags = append(tags, fmt.Sprintf("host:%s", hostName)) + } + return key, tags +} + +// Implementation of methods in the MetricSink interface + +func (s *DogStatsdSink) SetGauge(key []string, val float32) { + key, tags := s.parseKey(key) + flatKey := s.flattenKey(key) + + rate := 1.0 + s.client.Gauge(flatKey, float64(val), tags, rate) +} + +func (s *DogStatsdSink) IncrCounter(key []string, val float32) { + key, tags := s.parseKey(key) + flatKey := s.flattenKey(key) + + rate := 1.0 + s.client.Count(flatKey, int64(val), tags, rate) +} + +// EmitKey is not implemented since DogStatsd does not provide a metric type that holds an +// arbitrary number of values +func (s *DogStatsdSink) EmitKey(key []string, val float32) { +} + +func (s *DogStatsdSink) AddSample(key []string, val float32) { + key, tags := s.parseKey(key) + flatKey := s.flattenKey(key) + + rate := 1.0 + s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate) +} diff --git a/vendor/github.com/armon/go-metrics/datadog/dogstatsd_test.go b/vendor/github.com/armon/go-metrics/datadog/dogstatsd_test.go new file mode 100644 index 000000000..e7dc51152 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/datadog/dogstatsd_test.go @@ -0,0 +1,121 @@ +package datadog + +import ( + "fmt" + "net" + "reflect" + "testing" +) + +var EmptyTags []string + +const ( + DogStatsdAddr = "127.0.0.1:7254" + HostnameEnabled = true + HostnameDisabled = false + TestHostname = "test_hostname" +) + +func MockGetHostname() string { + return TestHostname +} + +var ParseKeyTests = []struct { + KeyToParse []string + Tags []string + PropagateHostname bool + ExpectedKey []string + ExpectedTags []string +}{ + {[]string{"a", MockGetHostname(), "b", "c"}, EmptyTags, HostnameDisabled, []string{"a", "b", "c"}, EmptyTags}, + {[]string{"a", "b", "c"}, EmptyTags, HostnameDisabled, []string{"a", "b", "c"}, EmptyTags}, + {[]string{"a", "b", "c"}, EmptyTags, HostnameEnabled, []string{"a", "b", "c"}, []string{fmt.Sprintf("host:%s", MockGetHostname())}}, +} + +var FlattenKeyTests = []struct { + KeyToFlatten []string + Expected string +}{ + {[]string{"a", "b", "c"}, "a.b.c"}, + {[]string{"spaces must", "flatten", "to", "underscores"}, "spaces_must.flatten.to.underscores"}, +} + +var MetricSinkTests = []struct { + Method string + Metric []string + Value interface{} + Tags []string + PropagateHostname bool + Expected string +}{ + {"SetGauge", []string{"foo", "bar"}, float32(42), EmptyTags, HostnameDisabled, "foo.bar:42.000000|g"}, + {"SetGauge", []string{"foo", "bar", "baz"}, float32(42), EmptyTags, HostnameDisabled, "foo.bar.baz:42.000000|g"}, + {"AddSample", []string{"sample", "thing"}, float32(4), EmptyTags, HostnameDisabled, "sample.thing:4.000000|ms"}, + {"IncrCounter", []string{"count", "me"}, float32(3), EmptyTags, HostnameDisabled, "count.me:3|c"}, + + {"SetGauge", []string{"foo", "baz"}, float32(42), []string{"my_tag:my_value"}, HostnameDisabled, "foo.baz:42.000000|g|#my_tag:my_value"}, + {"SetGauge", []string{"foo", "bar"}, float32(42), []string{"my_tag:my_value", "other_tag:other_value"}, HostnameDisabled, "foo.bar:42.000000|g|#my_tag:my_value,other_tag:other_value"}, + {"SetGauge", []string{"foo", "bar"}, float32(42), []string{"my_tag:my_value", "other_tag:other_value"}, HostnameEnabled, "foo.bar:42.000000|g|#my_tag:my_value,other_tag:other_value,host:test_hostname"}, +} + +func MockNewDogStatsdSink(addr string, tags []string, tagWithHostname bool) *DogStatsdSink { + dog, _ := NewDogStatsdSink(addr, MockGetHostname()) + dog.SetTags(tags) + if tagWithHostname { + dog.EnableHostNamePropagation() + } + + return dog +} + +func TestParseKey(t *testing.T) { + for _, tt := range ParseKeyTests { + dog := MockNewDogStatsdSink(DogStatsdAddr, tt.Tags, tt.PropagateHostname) + key, tags := dog.parseKey(tt.KeyToParse) + + if !reflect.DeepEqual(key, tt.ExpectedKey) { + t.Fatalf("Key Parsing failed for %v", tt.KeyToParse) + } + + if !reflect.DeepEqual(tags, tt.ExpectedTags) { + t.Fatalf("Tag Parsing Failed for %v", tt.KeyToParse) + } + } +} + +func TestFlattenKey(t *testing.T) { + dog := MockNewDogStatsdSink(DogStatsdAddr, EmptyTags, HostnameDisabled) + for _, tt := range FlattenKeyTests { + if !reflect.DeepEqual(dog.flattenKey(tt.KeyToFlatten), tt.Expected) { + t.Fatalf("Flattening %v failed", tt.KeyToFlatten) + } + } +} + +func TestMetricSink(t *testing.T) { + udpAddr, err := net.ResolveUDPAddr("udp", DogStatsdAddr) + if err != nil { + t.Fatal(err) + } + server, err := net.ListenUDP("udp", udpAddr) + if err != nil { + t.Fatal(err) + } + defer server.Close() + + buf := make([]byte, 1024) + + for _, tt := range MetricSinkTests { + dog := MockNewDogStatsdSink(DogStatsdAddr, tt.Tags, tt.PropagateHostname) + method := reflect.ValueOf(dog).MethodByName(tt.Method) + method.Call([]reflect.Value{ + reflect.ValueOf(tt.Metric), + reflect.ValueOf(tt.Value)}) + + n, _ := server.Read(buf) + msg := buf[:n] + if string(msg) != tt.Expected { + t.Fatalf("Line %s does not match expected: %s", string(msg), tt.Expected) + } + } +} diff --git a/vendor/github.com/armon/go-metrics/inmem.go b/vendor/github.com/armon/go-metrics/inmem.go new file mode 100644 index 000000000..da5032960 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/inmem.go @@ -0,0 +1,241 @@ +package metrics + +import ( + "fmt" + "math" + "strings" + "sync" + "time" +) + +// InmemSink provides a MetricSink that does in-memory aggregation +// without sending metrics over a network. It can be embedded within +// an application to provide profiling information. +type InmemSink struct { + // How long is each aggregation interval + interval time.Duration + + // Retain controls how many metrics interval we keep + retain time.Duration + + // maxIntervals is the maximum length of intervals. + // It is retain / interval. + maxIntervals int + + // intervals is a slice of the retained intervals + intervals []*IntervalMetrics + intervalLock sync.RWMutex +} + +// IntervalMetrics stores the aggregated metrics +// for a specific interval +type IntervalMetrics struct { + sync.RWMutex + + // The start time of the interval + Interval time.Time + + // Gauges maps the key to the last set value + Gauges map[string]float32 + + // Points maps the string to the list of emitted values + // from EmitKey + Points map[string][]float32 + + // Counters maps the string key to a sum of the counter + // values + Counters map[string]*AggregateSample + + // Samples maps the key to an AggregateSample, + // which has the rolled up view of a sample + Samples map[string]*AggregateSample +} + +// NewIntervalMetrics creates a new IntervalMetrics for a given interval +func NewIntervalMetrics(intv time.Time) *IntervalMetrics { + return &IntervalMetrics{ + Interval: intv, + Gauges: make(map[string]float32), + Points: make(map[string][]float32), + Counters: make(map[string]*AggregateSample), + Samples: make(map[string]*AggregateSample), + } +} + +// AggregateSample is used to hold aggregate metrics +// about a sample +type AggregateSample struct { + Count int // The count of emitted pairs + Sum float64 // The sum of values + SumSq float64 // The sum of squared values + Min float64 // Minimum value + Max float64 // Maximum value + LastUpdated time.Time // When value was last updated +} + +// Computes a Stddev of the values +func (a *AggregateSample) Stddev() float64 { + num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2) + div := float64(a.Count * (a.Count - 1)) + if div == 0 { + return 0 + } + return math.Sqrt(num / div) +} + +// Computes a mean of the values +func (a *AggregateSample) Mean() float64 { + if a.Count == 0 { + return 0 + } + return a.Sum / float64(a.Count) +} + +// Ingest is used to update a sample +func (a *AggregateSample) Ingest(v float64) { + a.Count++ + a.Sum += v + a.SumSq += (v * v) + if v < a.Min || a.Count == 1 { + a.Min = v + } + if v > a.Max || a.Count == 1 { + a.Max = v + } + a.LastUpdated = time.Now() +} + +func (a *AggregateSample) String() string { + if a.Count == 0 { + return "Count: 0" + } else if a.Stddev() == 0 { + return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated) + } else { + return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s", + a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated) + } +} + +// NewInmemSink is used to construct a new in-memory sink. +// Uses an aggregation interval and maximum retention period. +func NewInmemSink(interval, retain time.Duration) *InmemSink { + i := &InmemSink{ + interval: interval, + retain: retain, + maxIntervals: int(retain / interval), + } + i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals) + return i +} + +func (i *InmemSink) SetGauge(key []string, val float32) { + k := i.flattenKey(key) + intv := i.getInterval() + + intv.Lock() + defer intv.Unlock() + intv.Gauges[k] = val +} + +func (i *InmemSink) EmitKey(key []string, val float32) { + k := i.flattenKey(key) + intv := i.getInterval() + + intv.Lock() + defer intv.Unlock() + vals := intv.Points[k] + intv.Points[k] = append(vals, val) +} + +func (i *InmemSink) IncrCounter(key []string, val float32) { + k := i.flattenKey(key) + intv := i.getInterval() + + intv.Lock() + defer intv.Unlock() + + agg := intv.Counters[k] + if agg == nil { + agg = &AggregateSample{} + intv.Counters[k] = agg + } + agg.Ingest(float64(val)) +} + +func (i *InmemSink) AddSample(key []string, val float32) { + k := i.flattenKey(key) + intv := i.getInterval() + + intv.Lock() + defer intv.Unlock() + + agg := intv.Samples[k] + if agg == nil { + agg = &AggregateSample{} + intv.Samples[k] = agg + } + agg.Ingest(float64(val)) +} + +// Data is used to retrieve all the aggregated metrics +// Intervals may be in use, and a read lock should be acquired +func (i *InmemSink) Data() []*IntervalMetrics { + // Get the current interval, forces creation + i.getInterval() + + i.intervalLock.RLock() + defer i.intervalLock.RUnlock() + + intervals := make([]*IntervalMetrics, len(i.intervals)) + copy(intervals, i.intervals) + return intervals +} + +func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics { + i.intervalLock.RLock() + defer i.intervalLock.RUnlock() + + n := len(i.intervals) + if n > 0 && i.intervals[n-1].Interval == intv { + return i.intervals[n-1] + } + return nil +} + +func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics { + i.intervalLock.Lock() + defer i.intervalLock.Unlock() + + // Check for an existing interval + n := len(i.intervals) + if n > 0 && i.intervals[n-1].Interval == intv { + return i.intervals[n-1] + } + + // Add the current interval + current := NewIntervalMetrics(intv) + i.intervals = append(i.intervals, current) + n++ + + // Truncate the intervals if they are too long + if n >= i.maxIntervals { + copy(i.intervals[0:], i.intervals[n-i.maxIntervals:]) + i.intervals = i.intervals[:i.maxIntervals] + } + return current +} + +// getInterval returns the current interval to write to +func (i *InmemSink) getInterval() *IntervalMetrics { + intv := time.Now().Truncate(i.interval) + if m := i.getExistingInterval(intv); m != nil { + return m + } + return i.createInterval(intv) +} + +// Flattens the key for formatting, removes spaces +func (i *InmemSink) flattenKey(parts []string) string { + joined := strings.Join(parts, ".") + return strings.Replace(joined, " ", "_", -1) +} diff --git a/vendor/github.com/armon/go-metrics/inmem_signal.go b/vendor/github.com/armon/go-metrics/inmem_signal.go new file mode 100644 index 000000000..95d08ee10 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/inmem_signal.go @@ -0,0 +1,100 @@ +package metrics + +import ( + "bytes" + "fmt" + "io" + "os" + "os/signal" + "sync" + "syscall" +) + +// InmemSignal is used to listen for a given signal, and when received, +// to dump the current metrics from the InmemSink to an io.Writer +type InmemSignal struct { + signal syscall.Signal + inm *InmemSink + w io.Writer + sigCh chan os.Signal + + stop bool + stopCh chan struct{} + stopLock sync.Mutex +} + +// NewInmemSignal creates a new InmemSignal which listens for a given signal, +// and dumps the current metrics out to a writer +func NewInmemSignal(inmem *InmemSink, sig syscall.Signal, w io.Writer) *InmemSignal { + i := &InmemSignal{ + signal: sig, + inm: inmem, + w: w, + sigCh: make(chan os.Signal, 1), + stopCh: make(chan struct{}), + } + signal.Notify(i.sigCh, sig) + go i.run() + return i +} + +// DefaultInmemSignal returns a new InmemSignal that responds to SIGUSR1 +// and writes output to stderr. Windows uses SIGBREAK +func DefaultInmemSignal(inmem *InmemSink) *InmemSignal { + return NewInmemSignal(inmem, DefaultSignal, os.Stderr) +} + +// Stop is used to stop the InmemSignal from listening +func (i *InmemSignal) Stop() { + i.stopLock.Lock() + defer i.stopLock.Unlock() + + if i.stop { + return + } + i.stop = true + close(i.stopCh) + signal.Stop(i.sigCh) +} + +// run is a long running routine that handles signals +func (i *InmemSignal) run() { + for { + select { + case <-i.sigCh: + i.dumpStats() + case <-i.stopCh: + return + } + } +} + +// dumpStats is used to dump the data to output writer +func (i *InmemSignal) dumpStats() { + buf := bytes.NewBuffer(nil) + + data := i.inm.Data() + // Skip the last period which is still being aggregated + for i := 0; i < len(data)-1; i++ { + intv := data[i] + intv.RLock() + for name, val := range intv.Gauges { + fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val) + } + for name, vals := range intv.Points { + for _, val := range vals { + fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val) + } + } + for name, agg := range intv.Counters { + fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg) + } + for name, agg := range intv.Samples { + fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg) + } + intv.RUnlock() + } + + // Write out the bytes + i.w.Write(buf.Bytes()) +} diff --git a/vendor/github.com/armon/go-metrics/inmem_signal_test.go b/vendor/github.com/armon/go-metrics/inmem_signal_test.go new file mode 100644 index 000000000..9bbca5f25 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/inmem_signal_test.go @@ -0,0 +1,46 @@ +package metrics + +import ( + "bytes" + "os" + "strings" + "syscall" + "testing" + "time" +) + +func TestInmemSignal(t *testing.T) { + buf := bytes.NewBuffer(nil) + inm := NewInmemSink(10*time.Millisecond, 50*time.Millisecond) + sig := NewInmemSignal(inm, syscall.SIGUSR1, buf) + defer sig.Stop() + + inm.SetGauge([]string{"foo"}, 42) + inm.EmitKey([]string{"bar"}, 42) + inm.IncrCounter([]string{"baz"}, 42) + inm.AddSample([]string{"wow"}, 42) + + // Wait for period to end + time.Sleep(15 * time.Millisecond) + + // Send signal! + syscall.Kill(os.Getpid(), syscall.SIGUSR1) + + // Wait for flush + time.Sleep(10 * time.Millisecond) + + // Check the output + out := string(buf.Bytes()) + if !strings.Contains(out, "[G] 'foo': 42") { + t.Fatalf("bad: %v", out) + } + if !strings.Contains(out, "[P] 'bar': 42") { + t.Fatalf("bad: %v", out) + } + if !strings.Contains(out, "[C] 'baz': Count: 1 Sum: 42") { + t.Fatalf("bad: %v", out) + } + if !strings.Contains(out, "[S] 'wow': Count: 1 Sum: 42") { + t.Fatalf("bad: %v", out) + } +} diff --git a/vendor/github.com/armon/go-metrics/inmem_test.go b/vendor/github.com/armon/go-metrics/inmem_test.go new file mode 100644 index 000000000..228a2fc1a --- /dev/null +++ b/vendor/github.com/armon/go-metrics/inmem_test.go @@ -0,0 +1,104 @@ +package metrics + +import ( + "math" + "testing" + "time" +) + +func TestInmemSink(t *testing.T) { + inm := NewInmemSink(10*time.Millisecond, 50*time.Millisecond) + + data := inm.Data() + if len(data) != 1 { + t.Fatalf("bad: %v", data) + } + + // Add data points + inm.SetGauge([]string{"foo", "bar"}, 42) + inm.EmitKey([]string{"foo", "bar"}, 42) + inm.IncrCounter([]string{"foo", "bar"}, 20) + inm.IncrCounter([]string{"foo", "bar"}, 22) + inm.AddSample([]string{"foo", "bar"}, 20) + inm.AddSample([]string{"foo", "bar"}, 22) + + data = inm.Data() + if len(data) != 1 { + t.Fatalf("bad: %v", data) + } + + intvM := data[0] + intvM.RLock() + + if time.Now().Sub(intvM.Interval) > 10*time.Millisecond { + t.Fatalf("interval too old") + } + if intvM.Gauges["foo.bar"] != 42 { + t.Fatalf("bad val: %v", intvM.Gauges) + } + if intvM.Points["foo.bar"][0] != 42 { + t.Fatalf("bad val: %v", intvM.Points) + } + + agg := intvM.Counters["foo.bar"] + if agg.Count != 2 { + t.Fatalf("bad val: %v", agg) + } + if agg.Sum != 42 { + t.Fatalf("bad val: %v", agg) + } + if agg.SumSq != 884 { + t.Fatalf("bad val: %v", agg) + } + if agg.Min != 20 { + t.Fatalf("bad val: %v", agg) + } + if agg.Max != 22 { + t.Fatalf("bad val: %v", agg) + } + if agg.Mean() != 21 { + t.Fatalf("bad val: %v", agg) + } + if agg.Stddev() != math.Sqrt(2) { + t.Fatalf("bad val: %v", agg) + } + + if agg.LastUpdated.IsZero() { + t.Fatalf("agg.LastUpdated is not set: %v", agg) + } + + diff := time.Now().Sub(agg.LastUpdated).Seconds() + if diff > 1 { + t.Fatalf("time diff too great: %f", diff) + } + + if agg = intvM.Samples["foo.bar"]; agg == nil { + t.Fatalf("missing sample") + } + + intvM.RUnlock() + + for i := 1; i < 10; i++ { + time.Sleep(10 * time.Millisecond) + inm.SetGauge([]string{"foo", "bar"}, 42) + data = inm.Data() + if len(data) != min(i+1, 5) { + t.Fatalf("bad: %v", data) + } + } + + // Should not exceed 5 intervals! + time.Sleep(10 * time.Millisecond) + inm.SetGauge([]string{"foo", "bar"}, 42) + data = inm.Data() + if len(data) != 5 { + t.Fatalf("bad: %v", data) + } +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/vendor/github.com/armon/go-metrics/metrics.go b/vendor/github.com/armon/go-metrics/metrics.go new file mode 100644 index 000000000..b818e4182 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/metrics.go @@ -0,0 +1,115 @@ +package metrics + +import ( + "runtime" + "time" +) + +func (m *Metrics) SetGauge(key []string, val float32) { + if m.HostName != "" && m.EnableHostname { + key = insert(0, m.HostName, key) + } + if m.EnableTypePrefix { + key = insert(0, "gauge", key) + } + if m.ServiceName != "" { + key = insert(0, m.ServiceName, key) + } + m.sink.SetGauge(key, val) +} + +func (m *Metrics) EmitKey(key []string, val float32) { + if m.EnableTypePrefix { + key = insert(0, "kv", key) + } + if m.ServiceName != "" { + key = insert(0, m.ServiceName, key) + } + m.sink.EmitKey(key, val) +} + +func (m *Metrics) IncrCounter(key []string, val float32) { + if m.EnableTypePrefix { + key = insert(0, "counter", key) + } + if m.ServiceName != "" { + key = insert(0, m.ServiceName, key) + } + m.sink.IncrCounter(key, val) +} + +func (m *Metrics) AddSample(key []string, val float32) { + if m.EnableTypePrefix { + key = insert(0, "sample", key) + } + if m.ServiceName != "" { + key = insert(0, m.ServiceName, key) + } + m.sink.AddSample(key, val) +} + +func (m *Metrics) MeasureSince(key []string, start time.Time) { + if m.EnableTypePrefix { + key = insert(0, "timer", key) + } + if m.ServiceName != "" { + key = insert(0, m.ServiceName, key) + } + now := time.Now() + elapsed := now.Sub(start) + msec := float32(elapsed.Nanoseconds()) / float32(m.TimerGranularity) + m.sink.AddSample(key, msec) +} + +// Periodically collects runtime stats to publish +func (m *Metrics) collectStats() { + for { + time.Sleep(m.ProfileInterval) + m.emitRuntimeStats() + } +} + +// Emits various runtime statsitics +func (m *Metrics) emitRuntimeStats() { + // Export number of Goroutines + numRoutines := runtime.NumGoroutine() + m.SetGauge([]string{"runtime", "num_goroutines"}, float32(numRoutines)) + + // Export memory stats + var stats runtime.MemStats + runtime.ReadMemStats(&stats) + m.SetGauge([]string{"runtime", "alloc_bytes"}, float32(stats.Alloc)) + m.SetGauge([]string{"runtime", "sys_bytes"}, float32(stats.Sys)) + m.SetGauge([]string{"runtime", "malloc_count"}, float32(stats.Mallocs)) + m.SetGauge([]string{"runtime", "free_count"}, float32(stats.Frees)) + m.SetGauge([]string{"runtime", "heap_objects"}, float32(stats.HeapObjects)) + m.SetGauge([]string{"runtime", "total_gc_pause_ns"}, float32(stats.PauseTotalNs)) + m.SetGauge([]string{"runtime", "total_gc_runs"}, float32(stats.NumGC)) + + // Export info about the last few GC runs + num := stats.NumGC + + // Handle wrap around + if num < m.lastNumGC { + m.lastNumGC = 0 + } + + // Ensure we don't scan more than 256 + if num-m.lastNumGC >= 256 { + m.lastNumGC = num - 255 + } + + for i := m.lastNumGC; i < num; i++ { + pause := stats.PauseNs[i%256] + m.AddSample([]string{"runtime", "gc_pause_ns"}, float32(pause)) + } + m.lastNumGC = num +} + +// Inserts a string value at an index into the slice +func insert(i int, v string, s []string) []string { + s = append(s, "") + copy(s[i+1:], s[i:]) + s[i] = v + return s +} diff --git a/vendor/github.com/armon/go-metrics/metrics_test.go b/vendor/github.com/armon/go-metrics/metrics_test.go new file mode 100644 index 000000000..c7baf22bf --- /dev/null +++ b/vendor/github.com/armon/go-metrics/metrics_test.go @@ -0,0 +1,262 @@ +package metrics + +import ( + "reflect" + "runtime" + "testing" + "time" +) + +func mockMetric() (*MockSink, *Metrics) { + m := &MockSink{} + met := &Metrics{sink: m} + return m, met +} + +func TestMetrics_SetGauge(t *testing.T) { + m, met := mockMetric() + met.SetGauge([]string{"key"}, float32(1)) + if m.keys[0][0] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.HostName = "test" + met.EnableHostname = true + met.SetGauge([]string{"key"}, float32(1)) + if m.keys[0][0] != "test" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.EnableTypePrefix = true + met.SetGauge([]string{"key"}, float32(1)) + if m.keys[0][0] != "gauge" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.ServiceName = "service" + met.SetGauge([]string{"key"}, float32(1)) + if m.keys[0][0] != "service" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } +} + +func TestMetrics_EmitKey(t *testing.T) { + m, met := mockMetric() + met.EmitKey([]string{"key"}, float32(1)) + if m.keys[0][0] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.EnableTypePrefix = true + met.EmitKey([]string{"key"}, float32(1)) + if m.keys[0][0] != "kv" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.ServiceName = "service" + met.EmitKey([]string{"key"}, float32(1)) + if m.keys[0][0] != "service" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } +} + +func TestMetrics_IncrCounter(t *testing.T) { + m, met := mockMetric() + met.IncrCounter([]string{"key"}, float32(1)) + if m.keys[0][0] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.EnableTypePrefix = true + met.IncrCounter([]string{"key"}, float32(1)) + if m.keys[0][0] != "counter" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.ServiceName = "service" + met.IncrCounter([]string{"key"}, float32(1)) + if m.keys[0][0] != "service" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } +} + +func TestMetrics_AddSample(t *testing.T) { + m, met := mockMetric() + met.AddSample([]string{"key"}, float32(1)) + if m.keys[0][0] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.EnableTypePrefix = true + met.AddSample([]string{"key"}, float32(1)) + if m.keys[0][0] != "sample" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.ServiceName = "service" + met.AddSample([]string{"key"}, float32(1)) + if m.keys[0][0] != "service" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] != 1 { + t.Fatalf("") + } +} + +func TestMetrics_MeasureSince(t *testing.T) { + m, met := mockMetric() + met.TimerGranularity = time.Millisecond + n := time.Now() + met.MeasureSince([]string{"key"}, n) + if m.keys[0][0] != "key" { + t.Fatalf("") + } + if m.vals[0] > 0.1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.TimerGranularity = time.Millisecond + met.EnableTypePrefix = true + met.MeasureSince([]string{"key"}, n) + if m.keys[0][0] != "timer" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] > 0.1 { + t.Fatalf("") + } + + m, met = mockMetric() + met.TimerGranularity = time.Millisecond + met.ServiceName = "service" + met.MeasureSince([]string{"key"}, n) + if m.keys[0][0] != "service" || m.keys[0][1] != "key" { + t.Fatalf("") + } + if m.vals[0] > 0.1 { + t.Fatalf("") + } +} + +func TestMetrics_EmitRuntimeStats(t *testing.T) { + runtime.GC() + m, met := mockMetric() + met.emitRuntimeStats() + + if m.keys[0][0] != "runtime" || m.keys[0][1] != "num_goroutines" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[0] <= 1 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[1][0] != "runtime" || m.keys[1][1] != "alloc_bytes" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[1] <= 40000 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[2][0] != "runtime" || m.keys[2][1] != "sys_bytes" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[2] <= 100000 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[3][0] != "runtime" || m.keys[3][1] != "malloc_count" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[3] <= 100 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[4][0] != "runtime" || m.keys[4][1] != "free_count" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[4] <= 100 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[5][0] != "runtime" || m.keys[5][1] != "heap_objects" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[5] <= 100 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[6][0] != "runtime" || m.keys[6][1] != "total_gc_pause_ns" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[6] <= 100000 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[7][0] != "runtime" || m.keys[7][1] != "total_gc_runs" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[7] <= 1 { + t.Fatalf("bad val: %v", m.vals) + } + + if m.keys[8][0] != "runtime" || m.keys[8][1] != "gc_pause_ns" { + t.Fatalf("bad key %v", m.keys) + } + if m.vals[8] <= 1000 { + t.Fatalf("bad val: %v", m.vals) + } +} + +func TestInsert(t *testing.T) { + k := []string{"hi", "bob"} + exp := []string{"hi", "there", "bob"} + out := insert(1, "there", k) + if !reflect.DeepEqual(exp, out) { + t.Fatalf("bad insert %v %v", exp, out) + } +} diff --git a/vendor/github.com/armon/go-metrics/prometheus/prometheus.go b/vendor/github.com/armon/go-metrics/prometheus/prometheus.go new file mode 100644 index 000000000..362dbfb62 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/prometheus/prometheus.go @@ -0,0 +1,88 @@ +// +build go1.3 +package prometheus + +import ( + "strings" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +type PrometheusSink struct { + mu sync.Mutex + gauges map[string]prometheus.Gauge + summaries map[string]prometheus.Summary + counters map[string]prometheus.Counter +} + +func NewPrometheusSink() (*PrometheusSink, error) { + return &PrometheusSink{ + gauges: make(map[string]prometheus.Gauge), + summaries: make(map[string]prometheus.Summary), + counters: make(map[string]prometheus.Counter), + }, nil +} + +func (p *PrometheusSink) flattenKey(parts []string) string { + joined := strings.Join(parts, "_") + joined = strings.Replace(joined, " ", "_", -1) + joined = strings.Replace(joined, ".", "_", -1) + joined = strings.Replace(joined, "-", "_", -1) + return joined +} + +func (p *PrometheusSink) SetGauge(parts []string, val float32) { + p.mu.Lock() + defer p.mu.Unlock() + key := p.flattenKey(parts) + g, ok := p.gauges[key] + if !ok { + g = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: key, + Help: key, + }) + prometheus.MustRegister(g) + p.gauges[key] = g + } + g.Set(float64(val)) +} + +func (p *PrometheusSink) AddSample(parts []string, val float32) { + p.mu.Lock() + defer p.mu.Unlock() + key := p.flattenKey(parts) + g, ok := p.summaries[key] + if !ok { + g = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: key, + Help: key, + MaxAge: 10 * time.Second, + }) + prometheus.MustRegister(g) + p.summaries[key] = g + } + g.Observe(float64(val)) +} + +// EmitKey is not implemented. Prometheus doesn’t offer a type for which an +// arbitrary number of values is retained, as Prometheus works with a pull +// model, rather than a push model. +func (p *PrometheusSink) EmitKey(key []string, val float32) { +} + +func (p *PrometheusSink) IncrCounter(parts []string, val float32) { + p.mu.Lock() + defer p.mu.Unlock() + key := p.flattenKey(parts) + g, ok := p.counters[key] + if !ok { + g = prometheus.NewCounter(prometheus.CounterOpts{ + Name: key, + Help: key, + }) + prometheus.MustRegister(g) + p.counters[key] = g + } + g.Add(float64(val)) +} diff --git a/vendor/github.com/armon/go-metrics/sink.go b/vendor/github.com/armon/go-metrics/sink.go new file mode 100644 index 000000000..0c240c2c4 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/sink.go @@ -0,0 +1,52 @@ +package metrics + +// The MetricSink interface is used to transmit metrics information +// to an external system +type MetricSink interface { + // A Gauge should retain the last value it is set to + SetGauge(key []string, val float32) + + // Should emit a Key/Value pair for each call + EmitKey(key []string, val float32) + + // Counters should accumulate values + IncrCounter(key []string, val float32) + + // Samples are for timing information, where quantiles are used + AddSample(key []string, val float32) +} + +// BlackholeSink is used to just blackhole messages +type BlackholeSink struct{} + +func (*BlackholeSink) SetGauge(key []string, val float32) {} +func (*BlackholeSink) EmitKey(key []string, val float32) {} +func (*BlackholeSink) IncrCounter(key []string, val float32) {} +func (*BlackholeSink) AddSample(key []string, val float32) {} + +// FanoutSink is used to sink to fanout values to multiple sinks +type FanoutSink []MetricSink + +func (fh FanoutSink) SetGauge(key []string, val float32) { + for _, s := range fh { + s.SetGauge(key, val) + } +} + +func (fh FanoutSink) EmitKey(key []string, val float32) { + for _, s := range fh { + s.EmitKey(key, val) + } +} + +func (fh FanoutSink) IncrCounter(key []string, val float32) { + for _, s := range fh { + s.IncrCounter(key, val) + } +} + +func (fh FanoutSink) AddSample(key []string, val float32) { + for _, s := range fh { + s.AddSample(key, val) + } +} diff --git a/vendor/github.com/armon/go-metrics/sink_test.go b/vendor/github.com/armon/go-metrics/sink_test.go new file mode 100644 index 000000000..15c5d771a --- /dev/null +++ b/vendor/github.com/armon/go-metrics/sink_test.go @@ -0,0 +1,120 @@ +package metrics + +import ( + "reflect" + "testing" +) + +type MockSink struct { + keys [][]string + vals []float32 +} + +func (m *MockSink) SetGauge(key []string, val float32) { + m.keys = append(m.keys, key) + m.vals = append(m.vals, val) +} +func (m *MockSink) EmitKey(key []string, val float32) { + m.keys = append(m.keys, key) + m.vals = append(m.vals, val) +} +func (m *MockSink) IncrCounter(key []string, val float32) { + m.keys = append(m.keys, key) + m.vals = append(m.vals, val) +} +func (m *MockSink) AddSample(key []string, val float32) { + m.keys = append(m.keys, key) + m.vals = append(m.vals, val) +} + +func TestFanoutSink_Gauge(t *testing.T) { + m1 := &MockSink{} + m2 := &MockSink{} + fh := &FanoutSink{m1, m2} + + k := []string{"test"} + v := float32(42.0) + fh.SetGauge(k, v) + + if !reflect.DeepEqual(m1.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m2.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m1.vals[0], v) { + t.Fatalf("val not equal") + } + if !reflect.DeepEqual(m2.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func TestFanoutSink_Key(t *testing.T) { + m1 := &MockSink{} + m2 := &MockSink{} + fh := &FanoutSink{m1, m2} + + k := []string{"test"} + v := float32(42.0) + fh.EmitKey(k, v) + + if !reflect.DeepEqual(m1.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m2.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m1.vals[0], v) { + t.Fatalf("val not equal") + } + if !reflect.DeepEqual(m2.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func TestFanoutSink_Counter(t *testing.T) { + m1 := &MockSink{} + m2 := &MockSink{} + fh := &FanoutSink{m1, m2} + + k := []string{"test"} + v := float32(42.0) + fh.IncrCounter(k, v) + + if !reflect.DeepEqual(m1.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m2.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m1.vals[0], v) { + t.Fatalf("val not equal") + } + if !reflect.DeepEqual(m2.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func TestFanoutSink_Sample(t *testing.T) { + m1 := &MockSink{} + m2 := &MockSink{} + fh := &FanoutSink{m1, m2} + + k := []string{"test"} + v := float32(42.0) + fh.AddSample(k, v) + + if !reflect.DeepEqual(m1.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m2.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m1.vals[0], v) { + t.Fatalf("val not equal") + } + if !reflect.DeepEqual(m2.vals[0], v) { + t.Fatalf("val not equal") + } +} diff --git a/vendor/github.com/armon/go-metrics/start.go b/vendor/github.com/armon/go-metrics/start.go new file mode 100644 index 000000000..44113f100 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/start.go @@ -0,0 +1,95 @@ +package metrics + +import ( + "os" + "time" +) + +// Config is used to configure metrics settings +type Config struct { + ServiceName string // Prefixed with keys to seperate services + HostName string // Hostname to use. If not provided and EnableHostname, it will be os.Hostname + EnableHostname bool // Enable prefixing gauge values with hostname + EnableRuntimeMetrics bool // Enables profiling of runtime metrics (GC, Goroutines, Memory) + EnableTypePrefix bool // Prefixes key with a type ("counter", "gauge", "timer") + TimerGranularity time.Duration // Granularity of timers. + ProfileInterval time.Duration // Interval to profile runtime metrics +} + +// Metrics represents an instance of a metrics sink that can +// be used to emit +type Metrics struct { + Config + lastNumGC uint32 + sink MetricSink +} + +// Shared global metrics instance +var globalMetrics *Metrics + +func init() { + // Initialize to a blackhole sink to avoid errors + globalMetrics = &Metrics{sink: &BlackholeSink{}} +} + +// DefaultConfig provides a sane default configuration +func DefaultConfig(serviceName string) *Config { + c := &Config{ + ServiceName: serviceName, // Use client provided service + HostName: "", + EnableHostname: true, // Enable hostname prefix + EnableRuntimeMetrics: true, // Enable runtime profiling + EnableTypePrefix: false, // Disable type prefix + TimerGranularity: time.Millisecond, // Timers are in milliseconds + ProfileInterval: time.Second, // Poll runtime every second + } + + // Try to get the hostname + name, _ := os.Hostname() + c.HostName = name + return c +} + +// New is used to create a new instance of Metrics +func New(conf *Config, sink MetricSink) (*Metrics, error) { + met := &Metrics{} + met.Config = *conf + met.sink = sink + + // Start the runtime collector + if conf.EnableRuntimeMetrics { + go met.collectStats() + } + return met, nil +} + +// NewGlobal is the same as New, but it assigns the metrics object to be +// used globally as well as returning it. +func NewGlobal(conf *Config, sink MetricSink) (*Metrics, error) { + metrics, err := New(conf, sink) + if err == nil { + globalMetrics = metrics + } + return metrics, err +} + +// Proxy all the methods to the globalMetrics instance +func SetGauge(key []string, val float32) { + globalMetrics.SetGauge(key, val) +} + +func EmitKey(key []string, val float32) { + globalMetrics.EmitKey(key, val) +} + +func IncrCounter(key []string, val float32) { + globalMetrics.IncrCounter(key, val) +} + +func AddSample(key []string, val float32) { + globalMetrics.AddSample(key, val) +} + +func MeasureSince(key []string, start time.Time) { + globalMetrics.MeasureSince(key, start) +} diff --git a/vendor/github.com/armon/go-metrics/start_test.go b/vendor/github.com/armon/go-metrics/start_test.go new file mode 100644 index 000000000..8b3210c15 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/start_test.go @@ -0,0 +1,110 @@ +package metrics + +import ( + "reflect" + "testing" + "time" +) + +func TestDefaultConfig(t *testing.T) { + conf := DefaultConfig("service") + if conf.ServiceName != "service" { + t.Fatalf("Bad name") + } + if conf.HostName == "" { + t.Fatalf("missing hostname") + } + if !conf.EnableHostname || !conf.EnableRuntimeMetrics { + t.Fatalf("expect true") + } + if conf.EnableTypePrefix { + t.Fatalf("expect false") + } + if conf.TimerGranularity != time.Millisecond { + t.Fatalf("bad granularity") + } + if conf.ProfileInterval != time.Second { + t.Fatalf("bad interval") + } +} + +func Test_GlobalMetrics_SetGauge(t *testing.T) { + m := &MockSink{} + globalMetrics = &Metrics{sink: m} + + k := []string{"test"} + v := float32(42.0) + SetGauge(k, v) + + if !reflect.DeepEqual(m.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func Test_GlobalMetrics_EmitKey(t *testing.T) { + m := &MockSink{} + globalMetrics = &Metrics{sink: m} + + k := []string{"test"} + v := float32(42.0) + EmitKey(k, v) + + if !reflect.DeepEqual(m.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func Test_GlobalMetrics_IncrCounter(t *testing.T) { + m := &MockSink{} + globalMetrics = &Metrics{sink: m} + + k := []string{"test"} + v := float32(42.0) + IncrCounter(k, v) + + if !reflect.DeepEqual(m.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func Test_GlobalMetrics_AddSample(t *testing.T) { + m := &MockSink{} + globalMetrics = &Metrics{sink: m} + + k := []string{"test"} + v := float32(42.0) + AddSample(k, v) + + if !reflect.DeepEqual(m.keys[0], k) { + t.Fatalf("key not equal") + } + if !reflect.DeepEqual(m.vals[0], v) { + t.Fatalf("val not equal") + } +} + +func Test_GlobalMetrics_MeasureSince(t *testing.T) { + m := &MockSink{} + globalMetrics = &Metrics{sink: m} + globalMetrics.TimerGranularity = time.Millisecond + + k := []string{"test"} + now := time.Now() + MeasureSince(k, now) + + if !reflect.DeepEqual(m.keys[0], k) { + t.Fatalf("key not equal") + } + if m.vals[0] > 0.1 { + t.Fatalf("val too large %v", m.vals[0]) + } +} diff --git a/vendor/github.com/armon/go-metrics/statsd.go b/vendor/github.com/armon/go-metrics/statsd.go new file mode 100644 index 000000000..65a5021a0 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/statsd.go @@ -0,0 +1,154 @@ +package metrics + +import ( + "bytes" + "fmt" + "log" + "net" + "strings" + "time" +) + +const ( + // statsdMaxLen is the maximum size of a packet + // to send to statsd + statsdMaxLen = 1400 +) + +// StatsdSink provides a MetricSink that can be used +// with a statsite or statsd metrics server. It uses +// only UDP packets, while StatsiteSink uses TCP. +type StatsdSink struct { + addr string + metricQueue chan string +} + +// NewStatsdSink is used to create a new StatsdSink +func NewStatsdSink(addr string) (*StatsdSink, error) { + s := &StatsdSink{ + addr: addr, + metricQueue: make(chan string, 4096), + } + go s.flushMetrics() + return s, nil +} + +// Close is used to stop flushing to statsd +func (s *StatsdSink) Shutdown() { + close(s.metricQueue) +} + +func (s *StatsdSink) SetGauge(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val)) +} + +func (s *StatsdSink) EmitKey(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val)) +} + +func (s *StatsdSink) IncrCounter(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val)) +} + +func (s *StatsdSink) AddSample(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val)) +} + +// Flattens the key for formatting, removes spaces +func (s *StatsdSink) flattenKey(parts []string) string { + joined := strings.Join(parts, ".") + return strings.Map(func(r rune) rune { + switch r { + case ':': + fallthrough + case ' ': + return '_' + default: + return r + } + }, joined) +} + +// Does a non-blocking push to the metrics queue +func (s *StatsdSink) pushMetric(m string) { + select { + case s.metricQueue <- m: + default: + } +} + +// Flushes metrics +func (s *StatsdSink) flushMetrics() { + var sock net.Conn + var err error + var wait <-chan time.Time + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + +CONNECT: + // Create a buffer + buf := bytes.NewBuffer(nil) + + // Attempt to connect + sock, err = net.Dial("udp", s.addr) + if err != nil { + log.Printf("[ERR] Error connecting to statsd! Err: %s", err) + goto WAIT + } + + for { + select { + case metric, ok := <-s.metricQueue: + // Get a metric from the queue + if !ok { + goto QUIT + } + + // Check if this would overflow the packet size + if len(metric)+buf.Len() > statsdMaxLen { + _, err := sock.Write(buf.Bytes()) + buf.Reset() + if err != nil { + log.Printf("[ERR] Error writing to statsd! Err: %s", err) + goto WAIT + } + } + + // Append to the buffer + buf.WriteString(metric) + + case <-ticker.C: + if buf.Len() == 0 { + continue + } + + _, err := sock.Write(buf.Bytes()) + buf.Reset() + if err != nil { + log.Printf("[ERR] Error flushing to statsd! Err: %s", err) + goto WAIT + } + } + } + +WAIT: + // Wait for a while + wait = time.After(time.Duration(5) * time.Second) + for { + select { + // Dequeue the messages to avoid backlog + case _, ok := <-s.metricQueue: + if !ok { + goto QUIT + } + case <-wait: + goto CONNECT + } + } +QUIT: + s.metricQueue = nil +} diff --git a/vendor/github.com/armon/go-metrics/statsd_test.go b/vendor/github.com/armon/go-metrics/statsd_test.go new file mode 100644 index 000000000..622eb5d3a --- /dev/null +++ b/vendor/github.com/armon/go-metrics/statsd_test.go @@ -0,0 +1,105 @@ +package metrics + +import ( + "bufio" + "bytes" + "net" + "testing" + "time" +) + +func TestStatsd_Flatten(t *testing.T) { + s := &StatsdSink{} + flat := s.flattenKey([]string{"a", "b", "c", "d"}) + if flat != "a.b.c.d" { + t.Fatalf("Bad flat") + } +} + +func TestStatsd_PushFullQueue(t *testing.T) { + q := make(chan string, 1) + q <- "full" + + s := &StatsdSink{metricQueue: q} + s.pushMetric("omit") + + out := <-q + if out != "full" { + t.Fatalf("bad val %v", out) + } + + select { + case v := <-q: + t.Fatalf("bad val %v", v) + default: + } +} + +func TestStatsd_Conn(t *testing.T) { + addr := "127.0.0.1:7524" + done := make(chan bool) + go func() { + list, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 7524}) + if err != nil { + panic(err) + } + defer list.Close() + buf := make([]byte, 1500) + n, err := list.Read(buf) + if err != nil { + panic(err) + } + buf = buf[:n] + reader := bufio.NewReader(bytes.NewReader(buf)) + + line, err := reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "gauge.val:1.000000|g\n" { + t.Fatalf("bad line %s", line) + } + + line, err = reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "key.other:2.000000|kv\n" { + t.Fatalf("bad line %s", line) + } + + line, err = reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "counter.me:3.000000|c\n" { + t.Fatalf("bad line %s", line) + } + + line, err = reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "sample.slow_thingy:4.000000|ms\n" { + t.Fatalf("bad line %s", line) + } + + done <- true + }() + s, err := NewStatsdSink(addr) + if err != nil { + t.Fatalf("bad error") + } + + s.SetGauge([]string{"gauge", "val"}, float32(1)) + s.EmitKey([]string{"key", "other"}, float32(2)) + s.IncrCounter([]string{"counter", "me"}, float32(3)) + s.AddSample([]string{"sample", "slow thingy"}, float32(4)) + + select { + case <-done: + s.Shutdown() + case <-time.After(3 * time.Second): + t.Fatalf("timeout") + } +} diff --git a/vendor/github.com/armon/go-metrics/statsite.go b/vendor/github.com/armon/go-metrics/statsite.go new file mode 100644 index 000000000..68730139a --- /dev/null +++ b/vendor/github.com/armon/go-metrics/statsite.go @@ -0,0 +1,142 @@ +package metrics + +import ( + "bufio" + "fmt" + "log" + "net" + "strings" + "time" +) + +const ( + // We force flush the statsite metrics after this period of + // inactivity. Prevents stats from getting stuck in a buffer + // forever. + flushInterval = 100 * time.Millisecond +) + +// StatsiteSink provides a MetricSink that can be used with a +// statsite metrics server +type StatsiteSink struct { + addr string + metricQueue chan string +} + +// NewStatsiteSink is used to create a new StatsiteSink +func NewStatsiteSink(addr string) (*StatsiteSink, error) { + s := &StatsiteSink{ + addr: addr, + metricQueue: make(chan string, 4096), + } + go s.flushMetrics() + return s, nil +} + +// Close is used to stop flushing to statsite +func (s *StatsiteSink) Shutdown() { + close(s.metricQueue) +} + +func (s *StatsiteSink) SetGauge(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val)) +} + +func (s *StatsiteSink) EmitKey(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val)) +} + +func (s *StatsiteSink) IncrCounter(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val)) +} + +func (s *StatsiteSink) AddSample(key []string, val float32) { + flatKey := s.flattenKey(key) + s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val)) +} + +// Flattens the key for formatting, removes spaces +func (s *StatsiteSink) flattenKey(parts []string) string { + joined := strings.Join(parts, ".") + return strings.Map(func(r rune) rune { + switch r { + case ':': + fallthrough + case ' ': + return '_' + default: + return r + } + }, joined) +} + +// Does a non-blocking push to the metrics queue +func (s *StatsiteSink) pushMetric(m string) { + select { + case s.metricQueue <- m: + default: + } +} + +// Flushes metrics +func (s *StatsiteSink) flushMetrics() { + var sock net.Conn + var err error + var wait <-chan time.Time + var buffered *bufio.Writer + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + +CONNECT: + // Attempt to connect + sock, err = net.Dial("tcp", s.addr) + if err != nil { + log.Printf("[ERR] Error connecting to statsite! Err: %s", err) + goto WAIT + } + + // Create a buffered writer + buffered = bufio.NewWriter(sock) + + for { + select { + case metric, ok := <-s.metricQueue: + // Get a metric from the queue + if !ok { + goto QUIT + } + + // Try to send to statsite + _, err := buffered.Write([]byte(metric)) + if err != nil { + log.Printf("[ERR] Error writing to statsite! Err: %s", err) + goto WAIT + } + case <-ticker.C: + if err := buffered.Flush(); err != nil { + log.Printf("[ERR] Error flushing to statsite! Err: %s", err) + goto WAIT + } + } + } + +WAIT: + // Wait for a while + wait = time.After(time.Duration(5) * time.Second) + for { + select { + // Dequeue the messages to avoid backlog + case _, ok := <-s.metricQueue: + if !ok { + goto QUIT + } + case <-wait: + goto CONNECT + } + } +QUIT: + s.metricQueue = nil +} diff --git a/vendor/github.com/armon/go-metrics/statsite_test.go b/vendor/github.com/armon/go-metrics/statsite_test.go new file mode 100644 index 000000000..d9c744f41 --- /dev/null +++ b/vendor/github.com/armon/go-metrics/statsite_test.go @@ -0,0 +1,101 @@ +package metrics + +import ( + "bufio" + "net" + "testing" + "time" +) + +func acceptConn(addr string) net.Conn { + ln, _ := net.Listen("tcp", addr) + conn, _ := ln.Accept() + return conn +} + +func TestStatsite_Flatten(t *testing.T) { + s := &StatsiteSink{} + flat := s.flattenKey([]string{"a", "b", "c", "d"}) + if flat != "a.b.c.d" { + t.Fatalf("Bad flat") + } +} + +func TestStatsite_PushFullQueue(t *testing.T) { + q := make(chan string, 1) + q <- "full" + + s := &StatsiteSink{metricQueue: q} + s.pushMetric("omit") + + out := <-q + if out != "full" { + t.Fatalf("bad val %v", out) + } + + select { + case v := <-q: + t.Fatalf("bad val %v", v) + default: + } +} + +func TestStatsite_Conn(t *testing.T) { + addr := "localhost:7523" + done := make(chan bool) + go func() { + conn := acceptConn(addr) + reader := bufio.NewReader(conn) + + line, err := reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "gauge.val:1.000000|g\n" { + t.Fatalf("bad line %s", line) + } + + line, err = reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "key.other:2.000000|kv\n" { + t.Fatalf("bad line %s", line) + } + + line, err = reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "counter.me:3.000000|c\n" { + t.Fatalf("bad line %s", line) + } + + line, err = reader.ReadString('\n') + if err != nil { + t.Fatalf("unexpected err %s", err) + } + if line != "sample.slow_thingy:4.000000|ms\n" { + t.Fatalf("bad line %s", line) + } + + conn.Close() + done <- true + }() + s, err := NewStatsiteSink(addr) + if err != nil { + t.Fatalf("bad error") + } + + s.SetGauge([]string{"gauge", "val"}, float32(1)) + s.EmitKey([]string{"key", "other"}, float32(2)) + s.IncrCounter([]string{"counter", "me"}, float32(3)) + s.AddSample([]string{"sample", "slow thingy"}, float32(4)) + + select { + case <-done: + s.Shutdown() + case <-time.After(3 * time.Second): + t.Fatalf("timeout") + } +} diff --git a/vendor/manifest b/vendor/manifest index a267f9e4c..229349ce4 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -7,6 +7,13 @@ "revision": "75cd24fc2f2c", "branch": "default" }, + { + "importpath": "github.com/DataDog/datadog-go/statsd", + "repository": "https://github.com/DataDog/datadog-go", + "revision": "b050cd8f4d7c394545fd7d966c8e2909ce89d552", + "branch": "master", + "path": "/statsd" + }, { "importpath": "github.com/PuerkitoBio/ghost", "repository": "https://github.com/PuerkitoBio/ghost", @@ -20,6 +27,12 @@ "branch": "master", "path": "/handlers" }, + { + "importpath": "github.com/armon/go-metrics", + "repository": "https://github.com/armon/go-metrics", + "revision": "6c5fa0d8f48f4661c9ba8709799c88d425ad20f0", + "branch": "master" + }, { "importpath": "github.com/beorn7/perks/quantile", "repository": "https://github.com/beorn7/perks", From 3e1be5cbfe6035aa3f28589cdbbaa21eb3c9c903 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 12 Nov 2015 17:02:28 +0000 Subject: [PATCH 2/2] Move calls to weave ps to a background goroutine. --- probe/overlay/weave.go | 62 ++++++++++++++++++++++++++++++------- probe/overlay/weave_test.go | 1 + prog/probe/main.go | 1 + 3 files changed, 53 insertions(+), 11 deletions(-) diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index 05ff51b92..67ccdb505 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -9,6 +9,7 @@ import ( "regexp" "strings" "sync" + "time" "github.com/weaveworks/scope/common/exec" "github.com/weaveworks/scope/common/sanitize" @@ -44,8 +45,11 @@ type Weave struct { url string hostID string + quit chan struct{} + done sync.WaitGroup mtx sync.RWMutex status weaveStatus + ps map[string]psEntry } type weaveStatus struct { @@ -68,9 +72,51 @@ type weaveStatus struct { // NewWeave returns a new Weave tagger based on the Weave router at // address. The address should be an IP or FQDN, no port. func NewWeave(hostID, weaveRouterAddress string) *Weave { - return &Weave{ + w := &Weave{ url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress), hostID: hostID, + quit: make(chan struct{}), + ps: map[string]psEntry{}, + } + w.done.Add(1) + go w.loop() + return w +} + +// Name of this reporter/tagger/ticker, for metrics gathering +func (*Weave) Name() string { return "Weave" } + +// Stop gathering weave ps output. +func (w *Weave) Stop() { + close(w.quit) + w.done.Wait() +} + +func (w *Weave) loop() { + defer w.done.Done() + tick := time.Tick(5 * time.Second) + + for { + psEntries, err := w.getPSEntries() + if err != nil { + log.Printf("Error running weave ps: %v", err) + break + } + + psEntriesByPrefix := map[string]psEntry{} + for _, entry := range psEntries { + psEntriesByPrefix[entry.containerIDPrefix] = entry + } + + w.mtx.Lock() + w.ps = psEntriesByPrefix + w.mtx.Unlock() + + select { + case <-w.quit: + return + case <-tick: + } } } @@ -108,7 +154,7 @@ type psEntry struct { ips []string } -func (w *Weave) ps() ([]psEntry, error) { +func (w *Weave) getPSEntries() ([]psEntry, error) { var result []psEntry cmd := exec.Command("weave", "--local", "ps") out, err := cmd.StdoutPipe() @@ -160,17 +206,11 @@ func (w *Weave) Tag(r report.Report) (report.Report, error) { } // Put information from weave ps on the container nodes - psEntries, err := w.ps() - if err != nil { - return r, nil - } - psEntriesByPrefix := map[string]psEntry{} - for _, entry := range psEntries { - psEntriesByPrefix[entry.containerIDPrefix] = entry - } + w.mtx.RLock() + defer w.mtx.RUnlock() for id, node := range r.Container.Nodes { prefix := node.Metadata[docker.ContainerID][:12] - entry, ok := psEntriesByPrefix[prefix] + entry, ok := w.ps[prefix] if !ok { continue } diff --git a/probe/overlay/weave_test.go b/probe/overlay/weave_test.go index cd73f4252..9d551f793 100644 --- a/probe/overlay/weave_test.go +++ b/probe/overlay/weave_test.go @@ -26,6 +26,7 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) { defer s.Close() w := overlay.NewWeave(mockHostID, s.URL) + defer w.Stop() w.Tick() { diff --git a/prog/probe/main.go b/prog/probe/main.go index 5eae454a2..03f28cdb3 100644 --- a/prog/probe/main.go +++ b/prog/probe/main.go @@ -159,6 +159,7 @@ func main() { if *weaveRouterAddr != "" { weave := overlay.NewWeave(hostID, *weaveRouterAddr) + defer weave.Stop() p.AddTicker(weave) p.AddTagger(weave) p.AddReporter(weave)