mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 02:00:43 +00:00
Merge pull request #658 from weaveworks/report-generation
Move weave ps calls to a background goroutine, instruments probe runtime.
This commit is contained in:
@@ -34,6 +34,9 @@ func NewReporter(registry Registry, hostID string, probe *probe.Probe) *Reporter
|
||||
return reporter
|
||||
}
|
||||
|
||||
// Name of this reporter, for metrics gathering
|
||||
func (Reporter) Name() string { return "Docker" }
|
||||
|
||||
// ContainerUpdated should be called whenever a container is updated.
|
||||
func (r *Reporter) ContainerUpdated(c Container) {
|
||||
localAddrs, err := report.LocalAddresses()
|
||||
|
||||
@@ -34,6 +34,9 @@ func NewTagger(registry Registry, procWalker process.Walker) *Tagger {
|
||||
}
|
||||
}
|
||||
|
||||
// Name of this tagger, for metrics gathering
|
||||
func (Tagger) Name() string { return "Docker" }
|
||||
|
||||
// Tag implements Tagger.
|
||||
func (t *Tagger) Tag(r report.Report) (report.Report, error) {
|
||||
tree, err := NewProcessTreeStub(t.procWalker)
|
||||
|
||||
@@ -58,6 +58,9 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo
|
||||
}
|
||||
}
|
||||
|
||||
// Name of this reporter, for metrics gathering
|
||||
func (Reporter) Name() string { return "Endpoint" }
|
||||
|
||||
// Stop stop stop
|
||||
func (r *Reporter) Stop() {
|
||||
r.flowWalker.stop()
|
||||
|
||||
@@ -48,6 +48,9 @@ func NewReporter(hostID, hostName string, localNets report.Networks) *Reporter {
|
||||
}
|
||||
}
|
||||
|
||||
// Name of this reporter, for metrics gathering
|
||||
func (Reporter) Name() string { return "Host" }
|
||||
|
||||
// Report implements Reporter.
|
||||
func (r *Reporter) Report() (report.Report, error) {
|
||||
var (
|
||||
|
||||
@@ -21,6 +21,9 @@ func NewTagger(hostID, probeID string) Tagger {
|
||||
}
|
||||
}
|
||||
|
||||
// Name of this tagger, for metrics gathering
|
||||
func (Tagger) Name() string { return "Host" }
|
||||
|
||||
// Tag implements Tagger.
|
||||
func (t Tagger) Tag(r report.Report) (report.Report, error) {
|
||||
metadata := map[string]string{
|
||||
|
||||
@@ -16,6 +16,9 @@ func NewReporter(client Client) *Reporter {
|
||||
}
|
||||
}
|
||||
|
||||
// Name of this reporter, for metrics gathering
|
||||
func (Reporter) Name() string { return "K8s" }
|
||||
|
||||
// Report generates a Report containing Container and ContainerImage topologies
|
||||
func (r *Reporter) Report() (report.Report, error) {
|
||||
result := report.MakeReport()
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/weaveworks/scope/common/exec"
|
||||
"github.com/weaveworks/scope/common/sanitize"
|
||||
@@ -44,8 +45,11 @@ type Weave struct {
|
||||
url string
|
||||
hostID string
|
||||
|
||||
quit chan struct{}
|
||||
done sync.WaitGroup
|
||||
mtx sync.RWMutex
|
||||
status weaveStatus
|
||||
ps map[string]psEntry
|
||||
}
|
||||
|
||||
type weaveStatus struct {
|
||||
@@ -68,9 +72,51 @@ type weaveStatus struct {
|
||||
// NewWeave returns a new Weave tagger based on the Weave router at
|
||||
// address. The address should be an IP or FQDN, no port.
|
||||
func NewWeave(hostID, weaveRouterAddress string) *Weave {
|
||||
return &Weave{
|
||||
w := &Weave{
|
||||
url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress),
|
||||
hostID: hostID,
|
||||
quit: make(chan struct{}),
|
||||
ps: map[string]psEntry{},
|
||||
}
|
||||
w.done.Add(1)
|
||||
go w.loop()
|
||||
return w
|
||||
}
|
||||
|
||||
// Name of this reporter/tagger/ticker, for metrics gathering
|
||||
func (*Weave) Name() string { return "Weave" }
|
||||
|
||||
// Stop gathering weave ps output.
|
||||
func (w *Weave) Stop() {
|
||||
close(w.quit)
|
||||
w.done.Wait()
|
||||
}
|
||||
|
||||
func (w *Weave) loop() {
|
||||
defer w.done.Done()
|
||||
tick := time.Tick(5 * time.Second)
|
||||
|
||||
for {
|
||||
psEntries, err := w.getPSEntries()
|
||||
if err != nil {
|
||||
log.Printf("Error running weave ps: %v", err)
|
||||
break
|
||||
}
|
||||
|
||||
psEntriesByPrefix := map[string]psEntry{}
|
||||
for _, entry := range psEntries {
|
||||
psEntriesByPrefix[entry.containerIDPrefix] = entry
|
||||
}
|
||||
|
||||
w.mtx.Lock()
|
||||
w.ps = psEntriesByPrefix
|
||||
w.mtx.Unlock()
|
||||
|
||||
select {
|
||||
case <-w.quit:
|
||||
return
|
||||
case <-tick:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,7 +154,7 @@ type psEntry struct {
|
||||
ips []string
|
||||
}
|
||||
|
||||
func (w *Weave) ps() ([]psEntry, error) {
|
||||
func (w *Weave) getPSEntries() ([]psEntry, error) {
|
||||
var result []psEntry
|
||||
cmd := exec.Command("weave", "--local", "ps")
|
||||
out, err := cmd.StdoutPipe()
|
||||
@@ -160,17 +206,11 @@ func (w *Weave) Tag(r report.Report) (report.Report, error) {
|
||||
}
|
||||
|
||||
// Put information from weave ps on the container nodes
|
||||
psEntries, err := w.ps()
|
||||
if err != nil {
|
||||
return r, nil
|
||||
}
|
||||
psEntriesByPrefix := map[string]psEntry{}
|
||||
for _, entry := range psEntries {
|
||||
psEntriesByPrefix[entry.containerIDPrefix] = entry
|
||||
}
|
||||
w.mtx.RLock()
|
||||
defer w.mtx.RUnlock()
|
||||
for id, node := range r.Container.Nodes {
|
||||
prefix := node.Metadata[docker.ContainerID][:12]
|
||||
entry, ok := psEntriesByPrefix[prefix]
|
||||
entry, ok := w.ps[prefix]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {
|
||||
defer s.Close()
|
||||
|
||||
w := overlay.NewWeave(mockHostID, s.URL)
|
||||
defer w.Stop()
|
||||
w.Tick()
|
||||
|
||||
{
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/xfer"
|
||||
)
|
||||
@@ -31,11 +33,13 @@ type Probe struct {
|
||||
|
||||
// Tagger tags nodes with value-add node metadata.
|
||||
type Tagger interface {
|
||||
Name() string
|
||||
Tag(r report.Report) (report.Report, error)
|
||||
}
|
||||
|
||||
// Reporter generates Reports.
|
||||
type Reporter interface {
|
||||
Name() string
|
||||
Report() (report.Report, error)
|
||||
}
|
||||
|
||||
@@ -43,6 +47,7 @@ type Reporter interface {
|
||||
// It's useful for things that should be updated on that interval.
|
||||
// For example, cached shared state between Taggers and Reporters.
|
||||
type Ticker interface {
|
||||
Name() string
|
||||
Tick() error
|
||||
}
|
||||
|
||||
@@ -100,32 +105,36 @@ func (p *Probe) spyLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-spyTick:
|
||||
start := time.Now()
|
||||
for _, ticker := range p.tickers {
|
||||
if err := ticker.Tick(); err != nil {
|
||||
log.Printf("error doing ticker: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
t := time.Now()
|
||||
p.tick()
|
||||
rpt := p.report()
|
||||
rpt = p.tag(rpt)
|
||||
p.spiedReports <- rpt
|
||||
|
||||
if took := time.Since(start); took > p.spyInterval {
|
||||
log.Printf("report generation took too long (%s)", took)
|
||||
}
|
||||
|
||||
metrics.MeasureSince([]string{"Report Generaton"}, t)
|
||||
case <-p.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Probe) tick() {
|
||||
for _, ticker := range p.tickers {
|
||||
t := time.Now()
|
||||
err := ticker.Tick()
|
||||
metrics.MeasureSince([]string{ticker.Name(), "ticker"}, t)
|
||||
if err != nil {
|
||||
log.Printf("error doing ticker: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Probe) report() report.Report {
|
||||
reports := make(chan report.Report, len(p.reporters))
|
||||
for _, rep := range p.reporters {
|
||||
go func(rep Reporter) {
|
||||
t := time.Now()
|
||||
newReport, err := rep.Report()
|
||||
metrics.MeasureSince([]string{rep.Name(), "reporter"}, t)
|
||||
if err != nil {
|
||||
log.Printf("error generating report: %v", err)
|
||||
newReport = report.MakeReport() // empty is OK to merge
|
||||
@@ -144,7 +153,9 @@ func (p *Probe) report() report.Report {
|
||||
func (p *Probe) tag(r report.Report) report.Report {
|
||||
var err error
|
||||
for _, tagger := range p.taggers {
|
||||
t := time.Now()
|
||||
r, err = tagger.Tag(r)
|
||||
metrics.MeasureSince([]string{tagger.Name(), "tagger"}, t)
|
||||
if err != nil {
|
||||
log.Printf("error applying tagger: %v", err)
|
||||
}
|
||||
|
||||
@@ -50,6 +50,8 @@ func (m mockReporter) Report() (report.Report, error) {
|
||||
return m.r.Copy(), nil
|
||||
}
|
||||
|
||||
func (mockReporter) Name() string { return "Mock" }
|
||||
|
||||
type mockPublisher struct {
|
||||
have chan report.Report
|
||||
}
|
||||
|
||||
@@ -29,6 +29,9 @@ func NewReporter(walker Walker, scope string) *Reporter {
|
||||
}
|
||||
}
|
||||
|
||||
// Name of this reporter, for metrics gathering
|
||||
func (Reporter) Name() string { return "Process" }
|
||||
|
||||
// Report implements Reporter.
|
||||
func (r *Reporter) Report() (report.Report, error) {
|
||||
result := report.MakeReport()
|
||||
|
||||
@@ -28,6 +28,9 @@ func NewCachingWalker(source Walker) *CachingWalker {
|
||||
return &CachingWalker{source: source}
|
||||
}
|
||||
|
||||
// Name of this ticker, for metrics gathering
|
||||
func (*CachingWalker) Name() string { return "Process" }
|
||||
|
||||
// Walk walks a cached copy of process list
|
||||
func (c *CachingWalker) Walk(f func(Process)) error {
|
||||
c.cacheLock.RLock()
|
||||
|
||||
@@ -15,6 +15,8 @@ func NewTopologyTagger() Tagger {
|
||||
return &topologyTagger{}
|
||||
}
|
||||
|
||||
func (topologyTagger) Name() string { return "Topology" }
|
||||
|
||||
// Tag implements Tagger
|
||||
func (topologyTagger) Tag(r report.Report) (report.Report, error) {
|
||||
for val, topology := range map[string]*report.Topology{
|
||||
|
||||
@@ -15,6 +15,8 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
|
||||
"github.com/weaveworks/scope/probe"
|
||||
"github.com/weaveworks/scope/probe/controls"
|
||||
"github.com/weaveworks/scope/probe/docker"
|
||||
@@ -57,6 +59,12 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
// Setup in memory metrics sink
|
||||
inm := metrics.NewInmemSink(time.Minute, 2*time.Minute)
|
||||
sig := metrics.DefaultInmemSignal(inm)
|
||||
defer sig.Stop()
|
||||
metrics.NewGlobal(metrics.DefaultConfig("scope-probe"), inm)
|
||||
|
||||
if !strings.HasSuffix(*logPrefix, " ") {
|
||||
*logPrefix += " "
|
||||
}
|
||||
@@ -151,6 +159,7 @@ func main() {
|
||||
|
||||
if *weaveRouterAddr != "" {
|
||||
weave := overlay.NewWeave(hostID, *weaveRouterAddr)
|
||||
defer weave.Stop()
|
||||
p.AddTicker(weave)
|
||||
p.AddTagger(weave)
|
||||
p.AddReporter(weave)
|
||||
|
||||
45
vendor/github.com/DataDog/datadog-go/statsd/README.md
generated
vendored
Normal file
45
vendor/github.com/DataDog/datadog-go/statsd/README.md
generated
vendored
Normal file
@@ -0,0 +1,45 @@
|
||||
## Overview
|
||||
|
||||
Package `statsd` provides a Go [dogstatsd](http://docs.datadoghq.com/guides/dogstatsd/) client. Dogstatsd extends Statsd, adding tags
|
||||
and histograms.
|
||||
|
||||
## Get the code
|
||||
|
||||
$ go get github.com/DataDog/datadog-go/statsd
|
||||
|
||||
## Usage
|
||||
|
||||
```go
|
||||
// Create the client
|
||||
c, err := statsd.New("127.0.0.1:8125")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// Prefix every metric with the app name
|
||||
c.Namespace = "flubber."
|
||||
// Send the EC2 availability zone as a tag with every metric
|
||||
c.Tags = append(c.Tags, "us-east-1a")
|
||||
err = c.Gauge("request.duration", 1.2, nil, 1)
|
||||
```
|
||||
|
||||
## Buffering Client
|
||||
|
||||
Dogstatsd accepts packets with multiple statsd payloads in them. Using the BufferingClient via `NewBufferingClient` will buffer up commands and send them when the buffer is reached or after 100msec.
|
||||
|
||||
## Development
|
||||
|
||||
Run the tests with:
|
||||
|
||||
$ go test
|
||||
|
||||
## Documentation
|
||||
|
||||
Please see: http://godoc.org/github.com/DataDog/datadog-go/statsd
|
||||
|
||||
## License
|
||||
|
||||
go-dogstatsd is released under the [MIT license](http://www.opensource.org/licenses/mit-license.php).
|
||||
|
||||
## Credits
|
||||
|
||||
Original code by [ooyala](https://github.com/ooyala/go-dogstatsd).
|
||||
353
vendor/github.com/DataDog/datadog-go/statsd/statsd.go
generated
vendored
Normal file
353
vendor/github.com/DataDog/datadog-go/statsd/statsd.go
generated
vendored
Normal file
@@ -0,0 +1,353 @@
|
||||
// Copyright 2013 Ooyala, Inc.
|
||||
|
||||
/*
|
||||
Package statsd provides a Go dogstatsd client. Dogstatsd extends the popular statsd,
|
||||
adding tags and histograms and pushing upstream to Datadog.
|
||||
|
||||
Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD.
|
||||
|
||||
Example Usage:
|
||||
|
||||
// Create the client
|
||||
c, err := statsd.New("127.0.0.1:8125")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// Prefix every metric with the app name
|
||||
c.Namespace = "flubber."
|
||||
// Send the EC2 availability zone as a tag with every metric
|
||||
c.Tags = append(c.Tags, "us-east-1a")
|
||||
err = c.Gauge("request.duration", 1.2, nil, 1)
|
||||
|
||||
statsd is based on go-statsd-client.
|
||||
*/
|
||||
package statsd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// A Client is a handle for sending udp messages to dogstatsd. It is safe to
|
||||
// use one Client from multiple goroutines simultaneously.
|
||||
type Client struct {
|
||||
conn net.Conn
|
||||
// Namespace to prepend to all statsd calls
|
||||
Namespace string
|
||||
// Tags are global tags to be added to every statsd call
|
||||
Tags []string
|
||||
// BufferLength is the length of the buffer in commands.
|
||||
bufferLength int
|
||||
flushTime time.Duration
|
||||
commands []string
|
||||
stop bool
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// New returns a pointer to a new Client given an addr in the format "hostname:port".
|
||||
func New(addr string) (*Client, error) {
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn, err := net.DialUDP("udp", nil, udpAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client := &Client{conn: conn}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// NewBuffered returns a Client that buffers its output and sends it in chunks.
|
||||
// Buflen is the length of the buffer in number of commands.
|
||||
func NewBuffered(addr string, buflen int) (*Client, error) {
|
||||
client, err := New(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client.bufferLength = buflen
|
||||
client.commands = make([]string, 0, buflen)
|
||||
client.flushTime = time.Millisecond * 100
|
||||
go client.watch()
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// format a message from its name, value, tags and rate. Also adds global
|
||||
// namespace and tags.
|
||||
func (c *Client) format(name, value string, tags []string, rate float64) string {
|
||||
var buf bytes.Buffer
|
||||
if c.Namespace != "" {
|
||||
buf.WriteString(c.Namespace)
|
||||
}
|
||||
buf.WriteString(name)
|
||||
buf.WriteString(":")
|
||||
buf.WriteString(value)
|
||||
if rate < 1 {
|
||||
buf.WriteString(`|@`)
|
||||
buf.WriteString(strconv.FormatFloat(rate, 'f', -1, 64))
|
||||
}
|
||||
|
||||
tags = append(c.Tags, tags...)
|
||||
if len(tags) > 0 {
|
||||
buf.WriteString("|#")
|
||||
buf.WriteString(tags[0])
|
||||
for _, tag := range tags[1:] {
|
||||
buf.WriteString(",")
|
||||
buf.WriteString(tag)
|
||||
}
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
func (c *Client) watch() {
|
||||
for _ = range time.Tick(c.flushTime) {
|
||||
if c.stop {
|
||||
return
|
||||
}
|
||||
c.Lock()
|
||||
if len(c.commands) > 0 {
|
||||
// FIXME: eating error here
|
||||
c.flush()
|
||||
}
|
||||
c.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) append(cmd string) error {
|
||||
c.Lock()
|
||||
c.commands = append(c.commands, cmd)
|
||||
// if we should flush, lets do it
|
||||
if len(c.commands) == c.bufferLength {
|
||||
if err := c.flush(); err != nil {
|
||||
c.Unlock()
|
||||
return err
|
||||
}
|
||||
}
|
||||
c.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// flush the commands in the buffer. Lock must be held by caller.
|
||||
func (c *Client) flush() error {
|
||||
data := strings.Join(c.commands, "\n")
|
||||
_, err := c.conn.Write([]byte(data))
|
||||
// clear the slice with a slice op, doesn't realloc
|
||||
c.commands = c.commands[:0]
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Client) sendMsg(msg string) error {
|
||||
// if this client is buffered, then we'll just append this
|
||||
if c.bufferLength > 0 {
|
||||
return c.append(msg)
|
||||
}
|
||||
c.Lock()
|
||||
_, err := c.conn.Write([]byte(msg))
|
||||
c.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// send handles sampling and sends the message over UDP. It also adds global namespace prefixes and tags.
|
||||
func (c *Client) send(name, value string, tags []string, rate float64) error {
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
if rate < 1 && rand.Float64() > rate {
|
||||
return nil
|
||||
}
|
||||
data := c.format(name, value, tags, rate)
|
||||
return c.sendMsg(data)
|
||||
}
|
||||
|
||||
// Gauge measures the value of a metric at a particular time.
|
||||
func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error {
|
||||
stat := fmt.Sprintf("%f|g", value)
|
||||
return c.send(name, stat, tags, rate)
|
||||
}
|
||||
|
||||
// Count tracks how many times something happened per second.
|
||||
func (c *Client) Count(name string, value int64, tags []string, rate float64) error {
|
||||
stat := fmt.Sprintf("%d|c", value)
|
||||
return c.send(name, stat, tags, rate)
|
||||
}
|
||||
|
||||
// Histogram tracks the statistical distribution of a set of values.
|
||||
func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error {
|
||||
stat := fmt.Sprintf("%f|h", value)
|
||||
return c.send(name, stat, tags, rate)
|
||||
}
|
||||
|
||||
// Set counts the number of unique elements in a group.
|
||||
func (c *Client) Set(name string, value string, tags []string, rate float64) error {
|
||||
stat := fmt.Sprintf("%s|s", value)
|
||||
return c.send(name, stat, tags, rate)
|
||||
}
|
||||
|
||||
// TimeInMilliseconds sends timing information in milliseconds.
|
||||
// It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing)
|
||||
func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error {
|
||||
stat := fmt.Sprintf("%f|ms", value)
|
||||
return c.send(name, stat, tags, rate)
|
||||
}
|
||||
|
||||
// Event sends the provided Event.
|
||||
func (c *Client) Event(e *Event) error {
|
||||
stat, err := e.Encode(c.Tags...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.sendMsg(stat)
|
||||
}
|
||||
|
||||
// SimpleEvent sends an event with the provided title and text.
|
||||
func (c *Client) SimpleEvent(title, text string) error {
|
||||
e := NewEvent(title, text)
|
||||
return c.Event(e)
|
||||
}
|
||||
|
||||
// Close the client connection.
|
||||
func (c *Client) Close() error {
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
c.stop = true
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
// Events support
|
||||
|
||||
type eventAlertType string
|
||||
|
||||
const (
|
||||
// Info is the "info" AlertType for events
|
||||
Info eventAlertType = "info"
|
||||
// Error is the "error" AlertType for events
|
||||
Error eventAlertType = "error"
|
||||
// Warning is the "warning" AlertType for events
|
||||
Warning eventAlertType = "warning"
|
||||
// Success is the "success" AlertType for events
|
||||
Success eventAlertType = "success"
|
||||
)
|
||||
|
||||
type eventPriority string
|
||||
|
||||
const (
|
||||
// Normal is the "normal" Priority for events
|
||||
Normal eventPriority = "normal"
|
||||
// Low is the "low" Priority for events
|
||||
Low eventPriority = "low"
|
||||
)
|
||||
|
||||
// An Event is an object that can be posted to your DataDog event stream.
|
||||
type Event struct {
|
||||
// Title of the event. Required.
|
||||
Title string
|
||||
// Text is the description of the event. Required.
|
||||
Text string
|
||||
// Timestamp is a timestamp for the event. If not provided, the dogstatsd
|
||||
// server will set this to the current time.
|
||||
Timestamp time.Time
|
||||
// Hostname for the event.
|
||||
Hostname string
|
||||
// AggregationKey groups this event with others of the same key.
|
||||
AggregationKey string
|
||||
// Priority of the event. Can be statsd.Low or statsd.Normal.
|
||||
Priority eventPriority
|
||||
// SourceTypeName is a source type for the event.
|
||||
SourceTypeName string
|
||||
// AlertType can be statsd.Info, statsd.Error, statsd.Warning, or statsd.Success.
|
||||
// If absent, the default value applied by the dogstatsd server is Info.
|
||||
AlertType eventAlertType
|
||||
// Tags for the event.
|
||||
Tags []string
|
||||
}
|
||||
|
||||
// NewEvent creates a new event with the given title and text. Error checking
|
||||
// against these values is done at send-time, or upon running e.Check.
|
||||
func NewEvent(title, text string) *Event {
|
||||
return &Event{
|
||||
Title: title,
|
||||
Text: text,
|
||||
}
|
||||
}
|
||||
|
||||
// Check verifies that an event is valid.
|
||||
func (e Event) Check() error {
|
||||
if len(e.Title) == 0 {
|
||||
return fmt.Errorf("statsd.Event title is required")
|
||||
}
|
||||
if len(e.Text) == 0 {
|
||||
return fmt.Errorf("statsd.Event text is required")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Encode returns the dogstatsd wire protocol representation for an event.
|
||||
// Tags may be passed which will be added to the encoded output but not to
|
||||
// the Event's list of tags, eg. for default tags.
|
||||
func (e Event) Encode(tags ...string) (string, error) {
|
||||
err := e.Check()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
var buffer bytes.Buffer
|
||||
buffer.WriteString("_e{")
|
||||
buffer.WriteString(strconv.FormatInt(int64(len(e.Title)), 10))
|
||||
buffer.WriteRune(',')
|
||||
buffer.WriteString(strconv.FormatInt(int64(len(e.Text)), 10))
|
||||
buffer.WriteString("}:")
|
||||
buffer.WriteString(e.Title)
|
||||
buffer.WriteRune('|')
|
||||
buffer.WriteString(e.Text)
|
||||
|
||||
if !e.Timestamp.IsZero() {
|
||||
buffer.WriteString("|d:")
|
||||
buffer.WriteString(strconv.FormatInt(int64(e.Timestamp.Unix()), 10))
|
||||
}
|
||||
|
||||
if len(e.Hostname) != 0 {
|
||||
buffer.WriteString("|h:")
|
||||
buffer.WriteString(e.Hostname)
|
||||
}
|
||||
|
||||
if len(e.AggregationKey) != 0 {
|
||||
buffer.WriteString("|k:")
|
||||
buffer.WriteString(e.AggregationKey)
|
||||
|
||||
}
|
||||
|
||||
if len(e.Priority) != 0 {
|
||||
buffer.WriteString("|p:")
|
||||
buffer.WriteString(string(e.Priority))
|
||||
}
|
||||
|
||||
if len(e.SourceTypeName) != 0 {
|
||||
buffer.WriteString("|s:")
|
||||
buffer.WriteString(e.SourceTypeName)
|
||||
}
|
||||
|
||||
if len(e.AlertType) != 0 {
|
||||
buffer.WriteString("|t:")
|
||||
buffer.WriteString(string(e.AlertType))
|
||||
}
|
||||
|
||||
if len(tags)+len(e.Tags) > 0 {
|
||||
all := make([]string, 0, len(tags)+len(e.Tags))
|
||||
all = append(all, tags...)
|
||||
all = append(all, e.Tags...)
|
||||
buffer.WriteString("|#")
|
||||
buffer.WriteString(all[0])
|
||||
for _, tag := range all[1:] {
|
||||
buffer.WriteString(",")
|
||||
buffer.WriteString(tag)
|
||||
}
|
||||
}
|
||||
|
||||
return buffer.String(), nil
|
||||
}
|
||||
312
vendor/github.com/DataDog/datadog-go/statsd/statsd_test.go
generated
vendored
Normal file
312
vendor/github.com/DataDog/datadog-go/statsd/statsd_test.go
generated
vendored
Normal file
@@ -0,0 +1,312 @@
|
||||
// Copyright 2013 Ooyala, Inc.
|
||||
|
||||
package statsd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var dogstatsdTests = []struct {
|
||||
GlobalNamespace string
|
||||
GlobalTags []string
|
||||
Method string
|
||||
Metric string
|
||||
Value interface{}
|
||||
Tags []string
|
||||
Rate float64
|
||||
Expected string
|
||||
}{
|
||||
{"", nil, "Gauge", "test.gauge", 1.0, nil, 1.0, "test.gauge:1.000000|g"},
|
||||
{"", nil, "Gauge", "test.gauge", 1.0, nil, 0.999999, "test.gauge:1.000000|g|@0.999999"},
|
||||
{"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 1.0, "test.gauge:1.000000|g|#tagA"},
|
||||
{"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA", "tagB"}, 1.0, "test.gauge:1.000000|g|#tagA,tagB"},
|
||||
{"", nil, "Gauge", "test.gauge", 1.0, []string{"tagA"}, 0.999999, "test.gauge:1.000000|g|@0.999999|#tagA"},
|
||||
{"", nil, "Count", "test.count", int64(1), []string{"tagA"}, 1.0, "test.count:1|c|#tagA"},
|
||||
{"", nil, "Count", "test.count", int64(-1), []string{"tagA"}, 1.0, "test.count:-1|c|#tagA"},
|
||||
{"", nil, "Histogram", "test.histogram", 2.3, []string{"tagA"}, 1.0, "test.histogram:2.300000|h|#tagA"},
|
||||
{"", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagA"},
|
||||
{"flubber.", nil, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "flubber.test.set:uuid|s|#tagA"},
|
||||
{"", []string{"tagC"}, "Set", "test.set", "uuid", []string{"tagA"}, 1.0, "test.set:uuid|s|#tagC,tagA"},
|
||||
}
|
||||
|
||||
func assertNotPanics(t *testing.T, f func()) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Fatal(r)
|
||||
}
|
||||
}()
|
||||
f()
|
||||
}
|
||||
|
||||
func TestClient(t *testing.T) {
|
||||
addr := "localhost:1201"
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", addr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
server, err := net.ListenUDP("udp", udpAddr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer server.Close()
|
||||
|
||||
client, err := New(addr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, tt := range dogstatsdTests {
|
||||
client.Namespace = tt.GlobalNamespace
|
||||
client.Tags = tt.GlobalTags
|
||||
method := reflect.ValueOf(client).MethodByName(tt.Method)
|
||||
e := method.Call([]reflect.Value{
|
||||
reflect.ValueOf(tt.Metric),
|
||||
reflect.ValueOf(tt.Value),
|
||||
reflect.ValueOf(tt.Tags),
|
||||
reflect.ValueOf(tt.Rate)})[0]
|
||||
errInter := e.Interface()
|
||||
if errInter != nil {
|
||||
t.Fatal(errInter.(error))
|
||||
}
|
||||
|
||||
bytes := make([]byte, 1024)
|
||||
n, err := server.Read(bytes)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
message := bytes[:n]
|
||||
if string(message) != tt.Expected {
|
||||
t.Errorf("Expected: %s. Actual: %s", tt.Expected, string(message))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBufferedClient(t *testing.T) {
|
||||
addr := "localhost:1201"
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", addr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
server, err := net.ListenUDP("udp", udpAddr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer server.Close()
|
||||
|
||||
conn, err := net.DialUDP("udp", nil, udpAddr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
bufferLength := 5
|
||||
client := &Client{
|
||||
conn: conn,
|
||||
commands: make([]string, 0, bufferLength),
|
||||
bufferLength: bufferLength,
|
||||
}
|
||||
|
||||
client.Namespace = "foo."
|
||||
client.Tags = []string{"dd:2"}
|
||||
|
||||
client.Count("cc", 1, nil, 1)
|
||||
client.Gauge("gg", 10, nil, 1)
|
||||
client.Histogram("hh", 1, nil, 1)
|
||||
client.Set("ss", "ss", nil, 1)
|
||||
|
||||
if len(client.commands) != 4 {
|
||||
t.Errorf("Expected client to have buffered 4 commands, but found %d\n", len(client.commands))
|
||||
}
|
||||
|
||||
client.Set("ss", "xx", nil, 1)
|
||||
err = client.flush()
|
||||
if err != nil {
|
||||
t.Errorf("Error sending: %s", err)
|
||||
}
|
||||
|
||||
if len(client.commands) != 0 {
|
||||
t.Errorf("Expecting send to flush commands, but found %d\n", len(client.commands))
|
||||
}
|
||||
|
||||
buffer := make([]byte, 4096)
|
||||
n, err := io.ReadAtLeast(server, buffer, 1)
|
||||
result := string(buffer[:n])
|
||||
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
expected := []string{
|
||||
`foo.cc:1|c|#dd:2`,
|
||||
`foo.gg:10.000000|g|#dd:2`,
|
||||
`foo.hh:1.000000|h|#dd:2`,
|
||||
`foo.ss:ss|s|#dd:2`,
|
||||
`foo.ss:xx|s|#dd:2`,
|
||||
}
|
||||
|
||||
for i, res := range strings.Split(result, "\n") {
|
||||
if res != expected[i] {
|
||||
t.Errorf("Got `%s`, expected `%s`", res, expected[i])
|
||||
}
|
||||
}
|
||||
|
||||
client.Event(&Event{Title: "title1", Text: "text1", Priority: Normal, AlertType: Success, Tags: []string{"tagg"}})
|
||||
client.SimpleEvent("event1", "text1")
|
||||
|
||||
if len(client.commands) != 2 {
|
||||
t.Errorf("Expected to find %d commands, but found %d\n", 2, len(client.commands))
|
||||
}
|
||||
|
||||
err = client.flush()
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Error sending: %s", err)
|
||||
}
|
||||
|
||||
if len(client.commands) != 0 {
|
||||
t.Errorf("Expecting send to flush commands, but found %d\n", len(client.commands))
|
||||
}
|
||||
|
||||
buffer = make([]byte, 1024)
|
||||
n, err = io.ReadAtLeast(server, buffer, 1)
|
||||
result = string(buffer[:n])
|
||||
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
t.Errorf("Read 0 bytes but expected more.")
|
||||
}
|
||||
|
||||
expected = []string{
|
||||
`_e{6,5}:title1|text1|p:normal|t:success|#dd:2,tagg`,
|
||||
`_e{6,5}:event1|text1|#dd:2`,
|
||||
}
|
||||
|
||||
for i, res := range strings.Split(result, "\n") {
|
||||
if res != expected[i] {
|
||||
t.Errorf("Got `%s`, expected `%s`", res, expected[i])
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestNilSafe(t *testing.T) {
|
||||
var c *Client
|
||||
assertNotPanics(t, func() { c.Close() })
|
||||
assertNotPanics(t, func() { c.Count("", 0, nil, 1) })
|
||||
assertNotPanics(t, func() { c.Histogram("", 0, nil, 1) })
|
||||
assertNotPanics(t, func() { c.Gauge("", 0, nil, 1) })
|
||||
assertNotPanics(t, func() { c.Set("", "", nil, 1) })
|
||||
assertNotPanics(t, func() { c.send("", "", nil, 1) })
|
||||
}
|
||||
|
||||
func TestEvents(t *testing.T) {
|
||||
matrix := []struct {
|
||||
event *Event
|
||||
encoded string
|
||||
}{
|
||||
{
|
||||
NewEvent("Hello", "Something happened to my event"),
|
||||
`_e{5,30}:Hello|Something happened to my event`,
|
||||
}, {
|
||||
&Event{Title: "hi", Text: "okay", AggregationKey: "foo"},
|
||||
`_e{2,4}:hi|okay|k:foo`,
|
||||
}, {
|
||||
&Event{Title: "hi", Text: "okay", AggregationKey: "foo", AlertType: Info},
|
||||
`_e{2,4}:hi|okay|k:foo|t:info`,
|
||||
}, {
|
||||
&Event{Title: "hi", Text: "w/e", AlertType: Error, Priority: Normal},
|
||||
`_e{2,3}:hi|w/e|p:normal|t:error`,
|
||||
}, {
|
||||
&Event{Title: "hi", Text: "uh", Tags: []string{"host:foo", "app:bar"}},
|
||||
`_e{2,2}:hi|uh|#host:foo,app:bar`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, m := range matrix {
|
||||
r, err := m.event.Encode()
|
||||
if err != nil {
|
||||
t.Errorf("Error encoding: %s\n", err)
|
||||
continue
|
||||
}
|
||||
if r != m.encoded {
|
||||
t.Errorf("Expected `%s`, got `%s`\n", m.encoded, r)
|
||||
}
|
||||
}
|
||||
|
||||
e := NewEvent("", "hi")
|
||||
if _, err := e.Encode(); err == nil {
|
||||
t.Errorf("Expected error on empty Title.")
|
||||
}
|
||||
|
||||
e = NewEvent("hi", "")
|
||||
if _, err := e.Encode(); err == nil {
|
||||
t.Errorf("Expected error on empty Text.")
|
||||
}
|
||||
|
||||
e = NewEvent("hello", "world")
|
||||
s, err := e.Encode("tag1", "tag2")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
expected := "_e{5,5}:hello|world|#tag1,tag2"
|
||||
if s != expected {
|
||||
t.Errorf("Expected %s, got %s", expected, s)
|
||||
}
|
||||
if len(e.Tags) != 0 {
|
||||
t.Errorf("Modified event in place illegally.")
|
||||
}
|
||||
}
|
||||
|
||||
// These benchmarks show that using a buffer instead of sprintf-ing together
|
||||
// a bunch of intermediate strings is 4-5x faster
|
||||
|
||||
func BenchmarkFormatNew(b *testing.B) {
|
||||
b.StopTimer()
|
||||
c := &Client{}
|
||||
c.Namespace = "foo.bar."
|
||||
c.Tags = []string{"app:foo", "host:bar"}
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
c.format("system.cpu.idle", "10", []string{"foo"}, 1)
|
||||
c.format("system.cpu.load", "0.1", nil, 0.9)
|
||||
}
|
||||
}
|
||||
|
||||
// Old formatting function, added to client for tests
|
||||
func (c *Client) formatOld(name, value string, tags []string, rate float64) string {
|
||||
if rate < 1 {
|
||||
value = fmt.Sprintf("%s|@%f", value, rate)
|
||||
}
|
||||
if c.Namespace != "" {
|
||||
name = fmt.Sprintf("%s%s", c.Namespace, name)
|
||||
}
|
||||
|
||||
tags = append(c.Tags, tags...)
|
||||
if len(tags) > 0 {
|
||||
value = fmt.Sprintf("%s|#%s", value, strings.Join(tags, ","))
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s:%s", name, value)
|
||||
|
||||
}
|
||||
|
||||
func BenchmarkFormatOld(b *testing.B) {
|
||||
b.StopTimer()
|
||||
c := &Client{}
|
||||
c.Namespace = "foo.bar."
|
||||
c.Tags = []string{"app:foo", "host:bar"}
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
c.formatOld("system.cpu.idle", "10", []string{"foo"}, 1)
|
||||
c.formatOld("system.cpu.load", "0.1", nil, 0.9)
|
||||
}
|
||||
}
|
||||
20
vendor/github.com/armon/go-metrics/LICENSE
generated
vendored
Normal file
20
vendor/github.com/armon/go-metrics/LICENSE
generated
vendored
Normal file
@@ -0,0 +1,20 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2013 Armon Dadgar
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in
|
||||
the Software without restriction, including without limitation the rights to
|
||||
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
71
vendor/github.com/armon/go-metrics/README.md
generated
vendored
Normal file
71
vendor/github.com/armon/go-metrics/README.md
generated
vendored
Normal file
@@ -0,0 +1,71 @@
|
||||
go-metrics
|
||||
==========
|
||||
|
||||
This library provides a `metrics` package which can be used to instrument code,
|
||||
expose application metrics, and profile runtime performance in a flexible manner.
|
||||
|
||||
Current API: [](https://godoc.org/github.com/armon/go-metrics)
|
||||
|
||||
Sinks
|
||||
=====
|
||||
|
||||
The `metrics` package makes use of a `MetricSink` interface to support delivery
|
||||
to any type of backend. Currently the following sinks are provided:
|
||||
|
||||
* StatsiteSink : Sinks to a [statsite](https://github.com/armon/statsite/) instance (TCP)
|
||||
* StatsdSink: Sinks to a [StatsD](https://github.com/etsy/statsd/) / statsite instance (UDP)
|
||||
* PrometheusSink: Sinks to a [Prometheus](http://prometheus.io/) metrics endpoint (exposed via HTTP for scrapes)
|
||||
* InmemSink : Provides in-memory aggregation, can be used to export stats
|
||||
* FanoutSink : Sinks to multiple sinks. Enables writing to multiple statsite instances for example.
|
||||
* BlackholeSink : Sinks to nowhere
|
||||
|
||||
In addition to the sinks, the `InmemSignal` can be used to catch a signal,
|
||||
and dump a formatted output of recent metrics. For example, when a process gets
|
||||
a SIGUSR1, it can dump to stderr recent performance metrics for debugging.
|
||||
|
||||
Examples
|
||||
========
|
||||
|
||||
Here is an example of using the package:
|
||||
|
||||
func SlowMethod() {
|
||||
// Profiling the runtime of a method
|
||||
defer metrics.MeasureSince([]string{"SlowMethod"}, time.Now())
|
||||
}
|
||||
|
||||
// Configure a statsite sink as the global metrics sink
|
||||
sink, _ := metrics.NewStatsiteSink("statsite:8125")
|
||||
metrics.NewGlobal(metrics.DefaultConfig("service-name"), sink)
|
||||
|
||||
// Emit a Key/Value pair
|
||||
metrics.EmitKey([]string{"questions", "meaning of life"}, 42)
|
||||
|
||||
|
||||
Here is an example of setting up an signal handler:
|
||||
|
||||
// Setup the inmem sink and signal handler
|
||||
inm := metrics.NewInmemSink(10*time.Second, time.Minute)
|
||||
sig := metrics.DefaultInmemSignal(inm)
|
||||
metrics.NewGlobal(metrics.DefaultConfig("service-name"), inm)
|
||||
|
||||
// Run some code
|
||||
inm.SetGauge([]string{"foo"}, 42)
|
||||
inm.EmitKey([]string{"bar"}, 30)
|
||||
|
||||
inm.IncrCounter([]string{"baz"}, 42)
|
||||
inm.IncrCounter([]string{"baz"}, 1)
|
||||
inm.IncrCounter([]string{"baz"}, 80)
|
||||
|
||||
inm.AddSample([]string{"method", "wow"}, 42)
|
||||
inm.AddSample([]string{"method", "wow"}, 100)
|
||||
inm.AddSample([]string{"method", "wow"}, 22)
|
||||
|
||||
....
|
||||
|
||||
When a signal comes in, output like the following will be dumped to stderr:
|
||||
|
||||
[2014-01-28 14:57:33.04 -0800 PST][G] 'foo': 42.000
|
||||
[2014-01-28 14:57:33.04 -0800 PST][P] 'bar': 30.000
|
||||
[2014-01-28 14:57:33.04 -0800 PST][C] 'baz': Count: 3 Min: 1.000 Mean: 41.000 Max: 80.000 Stddev: 39.509
|
||||
[2014-01-28 14:57:33.04 -0800 PST][S] 'method.wow': Count: 3 Min: 22.000 Mean: 54.667 Max: 100.000 Stddev: 40.513
|
||||
|
||||
12
vendor/github.com/armon/go-metrics/const_unix.go
generated
vendored
Normal file
12
vendor/github.com/armon/go-metrics/const_unix.go
generated
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
// +build !windows
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultSignal is used with DefaultInmemSignal
|
||||
DefaultSignal = syscall.SIGUSR1
|
||||
)
|
||||
13
vendor/github.com/armon/go-metrics/const_windows.go
generated
vendored
Normal file
13
vendor/github.com/armon/go-metrics/const_windows.go
generated
vendored
Normal file
@@ -0,0 +1,13 @@
|
||||
// +build windows
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultSignal is used with DefaultInmemSignal
|
||||
// Windows has no SIGUSR1, use SIGBREAK
|
||||
DefaultSignal = syscall.Signal(21)
|
||||
)
|
||||
109
vendor/github.com/armon/go-metrics/datadog/dogstatsd.go
generated
vendored
Normal file
109
vendor/github.com/armon/go-metrics/datadog/dogstatsd.go
generated
vendored
Normal file
@@ -0,0 +1,109 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/DataDog/datadog-go/statsd"
|
||||
)
|
||||
|
||||
// DogStatsdSink provides a MetricSink that can be used
|
||||
// with a dogstatsd server. It utilizes the Dogstatsd client at github.com/DataDog/datadog-go/statsd
|
||||
type DogStatsdSink struct {
|
||||
client *statsd.Client
|
||||
hostName string
|
||||
propagateHostname bool
|
||||
}
|
||||
|
||||
// NewDogStatsdSink is used to create a new DogStatsdSink with sane defaults
|
||||
func NewDogStatsdSink(addr string, hostName string) (*DogStatsdSink, error) {
|
||||
client, err := statsd.New(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sink := &DogStatsdSink{
|
||||
client: client,
|
||||
hostName: hostName,
|
||||
propagateHostname: false,
|
||||
}
|
||||
return sink, nil
|
||||
}
|
||||
|
||||
// SetTags sets common tags on the Dogstatsd Client that will be sent
|
||||
// along with all dogstatsd packets.
|
||||
// Ref: http://docs.datadoghq.com/guides/dogstatsd/#tags
|
||||
func (s *DogStatsdSink) SetTags(tags []string) {
|
||||
s.client.Tags = tags
|
||||
}
|
||||
|
||||
// EnableHostnamePropagation forces a Dogstatsd `host` tag with the value specified by `s.HostName`
|
||||
// Since the go-metrics package has its own mechanism for attaching a hostname to metrics,
|
||||
// setting the `propagateHostname` flag ensures that `s.HostName` overrides the host tag naively set by the DogStatsd server
|
||||
func (s *DogStatsdSink) EnableHostNamePropagation() {
|
||||
s.propagateHostname = true
|
||||
}
|
||||
|
||||
func (s *DogStatsdSink) flattenKey(parts []string) string {
|
||||
joined := strings.Join(parts, ".")
|
||||
return strings.Map(func(r rune) rune {
|
||||
switch r {
|
||||
case ':':
|
||||
fallthrough
|
||||
case ' ':
|
||||
return '_'
|
||||
default:
|
||||
return r
|
||||
}
|
||||
}, joined)
|
||||
}
|
||||
|
||||
func (s *DogStatsdSink) parseKey(key []string) ([]string, []string) {
|
||||
// Since DogStatsd supports dimensionality via tags on metric keys, this sink's approach is to splice the hostname out of the key in favor of a `host` tag
|
||||
// The `host` tag is either forced here, or set downstream by the DogStatsd server
|
||||
|
||||
var tags []string
|
||||
hostName := s.hostName
|
||||
|
||||
//Splice the hostname out of the key
|
||||
for i, el := range key {
|
||||
if el == hostName {
|
||||
key = append(key[:i], key[i+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
if s.propagateHostname {
|
||||
tags = append(tags, fmt.Sprintf("host:%s", hostName))
|
||||
}
|
||||
return key, tags
|
||||
}
|
||||
|
||||
// Implementation of methods in the MetricSink interface
|
||||
|
||||
func (s *DogStatsdSink) SetGauge(key []string, val float32) {
|
||||
key, tags := s.parseKey(key)
|
||||
flatKey := s.flattenKey(key)
|
||||
|
||||
rate := 1.0
|
||||
s.client.Gauge(flatKey, float64(val), tags, rate)
|
||||
}
|
||||
|
||||
func (s *DogStatsdSink) IncrCounter(key []string, val float32) {
|
||||
key, tags := s.parseKey(key)
|
||||
flatKey := s.flattenKey(key)
|
||||
|
||||
rate := 1.0
|
||||
s.client.Count(flatKey, int64(val), tags, rate)
|
||||
}
|
||||
|
||||
// EmitKey is not implemented since DogStatsd does not provide a metric type that holds an
|
||||
// arbitrary number of values
|
||||
func (s *DogStatsdSink) EmitKey(key []string, val float32) {
|
||||
}
|
||||
|
||||
func (s *DogStatsdSink) AddSample(key []string, val float32) {
|
||||
key, tags := s.parseKey(key)
|
||||
flatKey := s.flattenKey(key)
|
||||
|
||||
rate := 1.0
|
||||
s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate)
|
||||
}
|
||||
121
vendor/github.com/armon/go-metrics/datadog/dogstatsd_test.go
generated
vendored
Normal file
121
vendor/github.com/armon/go-metrics/datadog/dogstatsd_test.go
generated
vendored
Normal file
@@ -0,0 +1,121 @@
|
||||
package datadog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var EmptyTags []string
|
||||
|
||||
const (
|
||||
DogStatsdAddr = "127.0.0.1:7254"
|
||||
HostnameEnabled = true
|
||||
HostnameDisabled = false
|
||||
TestHostname = "test_hostname"
|
||||
)
|
||||
|
||||
func MockGetHostname() string {
|
||||
return TestHostname
|
||||
}
|
||||
|
||||
var ParseKeyTests = []struct {
|
||||
KeyToParse []string
|
||||
Tags []string
|
||||
PropagateHostname bool
|
||||
ExpectedKey []string
|
||||
ExpectedTags []string
|
||||
}{
|
||||
{[]string{"a", MockGetHostname(), "b", "c"}, EmptyTags, HostnameDisabled, []string{"a", "b", "c"}, EmptyTags},
|
||||
{[]string{"a", "b", "c"}, EmptyTags, HostnameDisabled, []string{"a", "b", "c"}, EmptyTags},
|
||||
{[]string{"a", "b", "c"}, EmptyTags, HostnameEnabled, []string{"a", "b", "c"}, []string{fmt.Sprintf("host:%s", MockGetHostname())}},
|
||||
}
|
||||
|
||||
var FlattenKeyTests = []struct {
|
||||
KeyToFlatten []string
|
||||
Expected string
|
||||
}{
|
||||
{[]string{"a", "b", "c"}, "a.b.c"},
|
||||
{[]string{"spaces must", "flatten", "to", "underscores"}, "spaces_must.flatten.to.underscores"},
|
||||
}
|
||||
|
||||
var MetricSinkTests = []struct {
|
||||
Method string
|
||||
Metric []string
|
||||
Value interface{}
|
||||
Tags []string
|
||||
PropagateHostname bool
|
||||
Expected string
|
||||
}{
|
||||
{"SetGauge", []string{"foo", "bar"}, float32(42), EmptyTags, HostnameDisabled, "foo.bar:42.000000|g"},
|
||||
{"SetGauge", []string{"foo", "bar", "baz"}, float32(42), EmptyTags, HostnameDisabled, "foo.bar.baz:42.000000|g"},
|
||||
{"AddSample", []string{"sample", "thing"}, float32(4), EmptyTags, HostnameDisabled, "sample.thing:4.000000|ms"},
|
||||
{"IncrCounter", []string{"count", "me"}, float32(3), EmptyTags, HostnameDisabled, "count.me:3|c"},
|
||||
|
||||
{"SetGauge", []string{"foo", "baz"}, float32(42), []string{"my_tag:my_value"}, HostnameDisabled, "foo.baz:42.000000|g|#my_tag:my_value"},
|
||||
{"SetGauge", []string{"foo", "bar"}, float32(42), []string{"my_tag:my_value", "other_tag:other_value"}, HostnameDisabled, "foo.bar:42.000000|g|#my_tag:my_value,other_tag:other_value"},
|
||||
{"SetGauge", []string{"foo", "bar"}, float32(42), []string{"my_tag:my_value", "other_tag:other_value"}, HostnameEnabled, "foo.bar:42.000000|g|#my_tag:my_value,other_tag:other_value,host:test_hostname"},
|
||||
}
|
||||
|
||||
func MockNewDogStatsdSink(addr string, tags []string, tagWithHostname bool) *DogStatsdSink {
|
||||
dog, _ := NewDogStatsdSink(addr, MockGetHostname())
|
||||
dog.SetTags(tags)
|
||||
if tagWithHostname {
|
||||
dog.EnableHostNamePropagation()
|
||||
}
|
||||
|
||||
return dog
|
||||
}
|
||||
|
||||
func TestParseKey(t *testing.T) {
|
||||
for _, tt := range ParseKeyTests {
|
||||
dog := MockNewDogStatsdSink(DogStatsdAddr, tt.Tags, tt.PropagateHostname)
|
||||
key, tags := dog.parseKey(tt.KeyToParse)
|
||||
|
||||
if !reflect.DeepEqual(key, tt.ExpectedKey) {
|
||||
t.Fatalf("Key Parsing failed for %v", tt.KeyToParse)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(tags, tt.ExpectedTags) {
|
||||
t.Fatalf("Tag Parsing Failed for %v", tt.KeyToParse)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlattenKey(t *testing.T) {
|
||||
dog := MockNewDogStatsdSink(DogStatsdAddr, EmptyTags, HostnameDisabled)
|
||||
for _, tt := range FlattenKeyTests {
|
||||
if !reflect.DeepEqual(dog.flattenKey(tt.KeyToFlatten), tt.Expected) {
|
||||
t.Fatalf("Flattening %v failed", tt.KeyToFlatten)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricSink(t *testing.T) {
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", DogStatsdAddr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
server, err := net.ListenUDP("udp", udpAddr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer server.Close()
|
||||
|
||||
buf := make([]byte, 1024)
|
||||
|
||||
for _, tt := range MetricSinkTests {
|
||||
dog := MockNewDogStatsdSink(DogStatsdAddr, tt.Tags, tt.PropagateHostname)
|
||||
method := reflect.ValueOf(dog).MethodByName(tt.Method)
|
||||
method.Call([]reflect.Value{
|
||||
reflect.ValueOf(tt.Metric),
|
||||
reflect.ValueOf(tt.Value)})
|
||||
|
||||
n, _ := server.Read(buf)
|
||||
msg := buf[:n]
|
||||
if string(msg) != tt.Expected {
|
||||
t.Fatalf("Line %s does not match expected: %s", string(msg), tt.Expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
241
vendor/github.com/armon/go-metrics/inmem.go
generated
vendored
Normal file
241
vendor/github.com/armon/go-metrics/inmem.go
generated
vendored
Normal file
@@ -0,0 +1,241 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// InmemSink provides a MetricSink that does in-memory aggregation
|
||||
// without sending metrics over a network. It can be embedded within
|
||||
// an application to provide profiling information.
|
||||
type InmemSink struct {
|
||||
// How long is each aggregation interval
|
||||
interval time.Duration
|
||||
|
||||
// Retain controls how many metrics interval we keep
|
||||
retain time.Duration
|
||||
|
||||
// maxIntervals is the maximum length of intervals.
|
||||
// It is retain / interval.
|
||||
maxIntervals int
|
||||
|
||||
// intervals is a slice of the retained intervals
|
||||
intervals []*IntervalMetrics
|
||||
intervalLock sync.RWMutex
|
||||
}
|
||||
|
||||
// IntervalMetrics stores the aggregated metrics
|
||||
// for a specific interval
|
||||
type IntervalMetrics struct {
|
||||
sync.RWMutex
|
||||
|
||||
// The start time of the interval
|
||||
Interval time.Time
|
||||
|
||||
// Gauges maps the key to the last set value
|
||||
Gauges map[string]float32
|
||||
|
||||
// Points maps the string to the list of emitted values
|
||||
// from EmitKey
|
||||
Points map[string][]float32
|
||||
|
||||
// Counters maps the string key to a sum of the counter
|
||||
// values
|
||||
Counters map[string]*AggregateSample
|
||||
|
||||
// Samples maps the key to an AggregateSample,
|
||||
// which has the rolled up view of a sample
|
||||
Samples map[string]*AggregateSample
|
||||
}
|
||||
|
||||
// NewIntervalMetrics creates a new IntervalMetrics for a given interval
|
||||
func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
|
||||
return &IntervalMetrics{
|
||||
Interval: intv,
|
||||
Gauges: make(map[string]float32),
|
||||
Points: make(map[string][]float32),
|
||||
Counters: make(map[string]*AggregateSample),
|
||||
Samples: make(map[string]*AggregateSample),
|
||||
}
|
||||
}
|
||||
|
||||
// AggregateSample is used to hold aggregate metrics
|
||||
// about a sample
|
||||
type AggregateSample struct {
|
||||
Count int // The count of emitted pairs
|
||||
Sum float64 // The sum of values
|
||||
SumSq float64 // The sum of squared values
|
||||
Min float64 // Minimum value
|
||||
Max float64 // Maximum value
|
||||
LastUpdated time.Time // When value was last updated
|
||||
}
|
||||
|
||||
// Computes a Stddev of the values
|
||||
func (a *AggregateSample) Stddev() float64 {
|
||||
num := (float64(a.Count) * a.SumSq) - math.Pow(a.Sum, 2)
|
||||
div := float64(a.Count * (a.Count - 1))
|
||||
if div == 0 {
|
||||
return 0
|
||||
}
|
||||
return math.Sqrt(num / div)
|
||||
}
|
||||
|
||||
// Computes a mean of the values
|
||||
func (a *AggregateSample) Mean() float64 {
|
||||
if a.Count == 0 {
|
||||
return 0
|
||||
}
|
||||
return a.Sum / float64(a.Count)
|
||||
}
|
||||
|
||||
// Ingest is used to update a sample
|
||||
func (a *AggregateSample) Ingest(v float64) {
|
||||
a.Count++
|
||||
a.Sum += v
|
||||
a.SumSq += (v * v)
|
||||
if v < a.Min || a.Count == 1 {
|
||||
a.Min = v
|
||||
}
|
||||
if v > a.Max || a.Count == 1 {
|
||||
a.Max = v
|
||||
}
|
||||
a.LastUpdated = time.Now()
|
||||
}
|
||||
|
||||
func (a *AggregateSample) String() string {
|
||||
if a.Count == 0 {
|
||||
return "Count: 0"
|
||||
} else if a.Stddev() == 0 {
|
||||
return fmt.Sprintf("Count: %d Sum: %0.3f LastUpdated: %s", a.Count, a.Sum, a.LastUpdated)
|
||||
} else {
|
||||
return fmt.Sprintf("Count: %d Min: %0.3f Mean: %0.3f Max: %0.3f Stddev: %0.3f Sum: %0.3f LastUpdated: %s",
|
||||
a.Count, a.Min, a.Mean(), a.Max, a.Stddev(), a.Sum, a.LastUpdated)
|
||||
}
|
||||
}
|
||||
|
||||
// NewInmemSink is used to construct a new in-memory sink.
|
||||
// Uses an aggregation interval and maximum retention period.
|
||||
func NewInmemSink(interval, retain time.Duration) *InmemSink {
|
||||
i := &InmemSink{
|
||||
interval: interval,
|
||||
retain: retain,
|
||||
maxIntervals: int(retain / interval),
|
||||
}
|
||||
i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
|
||||
return i
|
||||
}
|
||||
|
||||
func (i *InmemSink) SetGauge(key []string, val float32) {
|
||||
k := i.flattenKey(key)
|
||||
intv := i.getInterval()
|
||||
|
||||
intv.Lock()
|
||||
defer intv.Unlock()
|
||||
intv.Gauges[k] = val
|
||||
}
|
||||
|
||||
func (i *InmemSink) EmitKey(key []string, val float32) {
|
||||
k := i.flattenKey(key)
|
||||
intv := i.getInterval()
|
||||
|
||||
intv.Lock()
|
||||
defer intv.Unlock()
|
||||
vals := intv.Points[k]
|
||||
intv.Points[k] = append(vals, val)
|
||||
}
|
||||
|
||||
func (i *InmemSink) IncrCounter(key []string, val float32) {
|
||||
k := i.flattenKey(key)
|
||||
intv := i.getInterval()
|
||||
|
||||
intv.Lock()
|
||||
defer intv.Unlock()
|
||||
|
||||
agg := intv.Counters[k]
|
||||
if agg == nil {
|
||||
agg = &AggregateSample{}
|
||||
intv.Counters[k] = agg
|
||||
}
|
||||
agg.Ingest(float64(val))
|
||||
}
|
||||
|
||||
func (i *InmemSink) AddSample(key []string, val float32) {
|
||||
k := i.flattenKey(key)
|
||||
intv := i.getInterval()
|
||||
|
||||
intv.Lock()
|
||||
defer intv.Unlock()
|
||||
|
||||
agg := intv.Samples[k]
|
||||
if agg == nil {
|
||||
agg = &AggregateSample{}
|
||||
intv.Samples[k] = agg
|
||||
}
|
||||
agg.Ingest(float64(val))
|
||||
}
|
||||
|
||||
// Data is used to retrieve all the aggregated metrics
|
||||
// Intervals may be in use, and a read lock should be acquired
|
||||
func (i *InmemSink) Data() []*IntervalMetrics {
|
||||
// Get the current interval, forces creation
|
||||
i.getInterval()
|
||||
|
||||
i.intervalLock.RLock()
|
||||
defer i.intervalLock.RUnlock()
|
||||
|
||||
intervals := make([]*IntervalMetrics, len(i.intervals))
|
||||
copy(intervals, i.intervals)
|
||||
return intervals
|
||||
}
|
||||
|
||||
func (i *InmemSink) getExistingInterval(intv time.Time) *IntervalMetrics {
|
||||
i.intervalLock.RLock()
|
||||
defer i.intervalLock.RUnlock()
|
||||
|
||||
n := len(i.intervals)
|
||||
if n > 0 && i.intervals[n-1].Interval == intv {
|
||||
return i.intervals[n-1]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *InmemSink) createInterval(intv time.Time) *IntervalMetrics {
|
||||
i.intervalLock.Lock()
|
||||
defer i.intervalLock.Unlock()
|
||||
|
||||
// Check for an existing interval
|
||||
n := len(i.intervals)
|
||||
if n > 0 && i.intervals[n-1].Interval == intv {
|
||||
return i.intervals[n-1]
|
||||
}
|
||||
|
||||
// Add the current interval
|
||||
current := NewIntervalMetrics(intv)
|
||||
i.intervals = append(i.intervals, current)
|
||||
n++
|
||||
|
||||
// Truncate the intervals if they are too long
|
||||
if n >= i.maxIntervals {
|
||||
copy(i.intervals[0:], i.intervals[n-i.maxIntervals:])
|
||||
i.intervals = i.intervals[:i.maxIntervals]
|
||||
}
|
||||
return current
|
||||
}
|
||||
|
||||
// getInterval returns the current interval to write to
|
||||
func (i *InmemSink) getInterval() *IntervalMetrics {
|
||||
intv := time.Now().Truncate(i.interval)
|
||||
if m := i.getExistingInterval(intv); m != nil {
|
||||
return m
|
||||
}
|
||||
return i.createInterval(intv)
|
||||
}
|
||||
|
||||
// Flattens the key for formatting, removes spaces
|
||||
func (i *InmemSink) flattenKey(parts []string) string {
|
||||
joined := strings.Join(parts, ".")
|
||||
return strings.Replace(joined, " ", "_", -1)
|
||||
}
|
||||
100
vendor/github.com/armon/go-metrics/inmem_signal.go
generated
vendored
Normal file
100
vendor/github.com/armon/go-metrics/inmem_signal.go
generated
vendored
Normal file
@@ -0,0 +1,100 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// InmemSignal is used to listen for a given signal, and when received,
|
||||
// to dump the current metrics from the InmemSink to an io.Writer
|
||||
type InmemSignal struct {
|
||||
signal syscall.Signal
|
||||
inm *InmemSink
|
||||
w io.Writer
|
||||
sigCh chan os.Signal
|
||||
|
||||
stop bool
|
||||
stopCh chan struct{}
|
||||
stopLock sync.Mutex
|
||||
}
|
||||
|
||||
// NewInmemSignal creates a new InmemSignal which listens for a given signal,
|
||||
// and dumps the current metrics out to a writer
|
||||
func NewInmemSignal(inmem *InmemSink, sig syscall.Signal, w io.Writer) *InmemSignal {
|
||||
i := &InmemSignal{
|
||||
signal: sig,
|
||||
inm: inmem,
|
||||
w: w,
|
||||
sigCh: make(chan os.Signal, 1),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
signal.Notify(i.sigCh, sig)
|
||||
go i.run()
|
||||
return i
|
||||
}
|
||||
|
||||
// DefaultInmemSignal returns a new InmemSignal that responds to SIGUSR1
|
||||
// and writes output to stderr. Windows uses SIGBREAK
|
||||
func DefaultInmemSignal(inmem *InmemSink) *InmemSignal {
|
||||
return NewInmemSignal(inmem, DefaultSignal, os.Stderr)
|
||||
}
|
||||
|
||||
// Stop is used to stop the InmemSignal from listening
|
||||
func (i *InmemSignal) Stop() {
|
||||
i.stopLock.Lock()
|
||||
defer i.stopLock.Unlock()
|
||||
|
||||
if i.stop {
|
||||
return
|
||||
}
|
||||
i.stop = true
|
||||
close(i.stopCh)
|
||||
signal.Stop(i.sigCh)
|
||||
}
|
||||
|
||||
// run is a long running routine that handles signals
|
||||
func (i *InmemSignal) run() {
|
||||
for {
|
||||
select {
|
||||
case <-i.sigCh:
|
||||
i.dumpStats()
|
||||
case <-i.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dumpStats is used to dump the data to output writer
|
||||
func (i *InmemSignal) dumpStats() {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
|
||||
data := i.inm.Data()
|
||||
// Skip the last period which is still being aggregated
|
||||
for i := 0; i < len(data)-1; i++ {
|
||||
intv := data[i]
|
||||
intv.RLock()
|
||||
for name, val := range intv.Gauges {
|
||||
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val)
|
||||
}
|
||||
for name, vals := range intv.Points {
|
||||
for _, val := range vals {
|
||||
fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val)
|
||||
}
|
||||
}
|
||||
for name, agg := range intv.Counters {
|
||||
fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg)
|
||||
}
|
||||
for name, agg := range intv.Samples {
|
||||
fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg)
|
||||
}
|
||||
intv.RUnlock()
|
||||
}
|
||||
|
||||
// Write out the bytes
|
||||
i.w.Write(buf.Bytes())
|
||||
}
|
||||
46
vendor/github.com/armon/go-metrics/inmem_signal_test.go
generated
vendored
Normal file
46
vendor/github.com/armon/go-metrics/inmem_signal_test.go
generated
vendored
Normal file
@@ -0,0 +1,46 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestInmemSignal(t *testing.T) {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
inm := NewInmemSink(10*time.Millisecond, 50*time.Millisecond)
|
||||
sig := NewInmemSignal(inm, syscall.SIGUSR1, buf)
|
||||
defer sig.Stop()
|
||||
|
||||
inm.SetGauge([]string{"foo"}, 42)
|
||||
inm.EmitKey([]string{"bar"}, 42)
|
||||
inm.IncrCounter([]string{"baz"}, 42)
|
||||
inm.AddSample([]string{"wow"}, 42)
|
||||
|
||||
// Wait for period to end
|
||||
time.Sleep(15 * time.Millisecond)
|
||||
|
||||
// Send signal!
|
||||
syscall.Kill(os.Getpid(), syscall.SIGUSR1)
|
||||
|
||||
// Wait for flush
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Check the output
|
||||
out := string(buf.Bytes())
|
||||
if !strings.Contains(out, "[G] 'foo': 42") {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if !strings.Contains(out, "[P] 'bar': 42") {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if !strings.Contains(out, "[C] 'baz': Count: 1 Sum: 42") {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
if !strings.Contains(out, "[S] 'wow': Count: 1 Sum: 42") {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
}
|
||||
104
vendor/github.com/armon/go-metrics/inmem_test.go
generated
vendored
Normal file
104
vendor/github.com/armon/go-metrics/inmem_test.go
generated
vendored
Normal file
@@ -0,0 +1,104 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestInmemSink(t *testing.T) {
|
||||
inm := NewInmemSink(10*time.Millisecond, 50*time.Millisecond)
|
||||
|
||||
data := inm.Data()
|
||||
if len(data) != 1 {
|
||||
t.Fatalf("bad: %v", data)
|
||||
}
|
||||
|
||||
// Add data points
|
||||
inm.SetGauge([]string{"foo", "bar"}, 42)
|
||||
inm.EmitKey([]string{"foo", "bar"}, 42)
|
||||
inm.IncrCounter([]string{"foo", "bar"}, 20)
|
||||
inm.IncrCounter([]string{"foo", "bar"}, 22)
|
||||
inm.AddSample([]string{"foo", "bar"}, 20)
|
||||
inm.AddSample([]string{"foo", "bar"}, 22)
|
||||
|
||||
data = inm.Data()
|
||||
if len(data) != 1 {
|
||||
t.Fatalf("bad: %v", data)
|
||||
}
|
||||
|
||||
intvM := data[0]
|
||||
intvM.RLock()
|
||||
|
||||
if time.Now().Sub(intvM.Interval) > 10*time.Millisecond {
|
||||
t.Fatalf("interval too old")
|
||||
}
|
||||
if intvM.Gauges["foo.bar"] != 42 {
|
||||
t.Fatalf("bad val: %v", intvM.Gauges)
|
||||
}
|
||||
if intvM.Points["foo.bar"][0] != 42 {
|
||||
t.Fatalf("bad val: %v", intvM.Points)
|
||||
}
|
||||
|
||||
agg := intvM.Counters["foo.bar"]
|
||||
if agg.Count != 2 {
|
||||
t.Fatalf("bad val: %v", agg)
|
||||
}
|
||||
if agg.Sum != 42 {
|
||||
t.Fatalf("bad val: %v", agg)
|
||||
}
|
||||
if agg.SumSq != 884 {
|
||||
t.Fatalf("bad val: %v", agg)
|
||||
}
|
||||
if agg.Min != 20 {
|
||||
t.Fatalf("bad val: %v", agg)
|
||||
}
|
||||
if agg.Max != 22 {
|
||||
t.Fatalf("bad val: %v", agg)
|
||||
}
|
||||
if agg.Mean() != 21 {
|
||||
t.Fatalf("bad val: %v", agg)
|
||||
}
|
||||
if agg.Stddev() != math.Sqrt(2) {
|
||||
t.Fatalf("bad val: %v", agg)
|
||||
}
|
||||
|
||||
if agg.LastUpdated.IsZero() {
|
||||
t.Fatalf("agg.LastUpdated is not set: %v", agg)
|
||||
}
|
||||
|
||||
diff := time.Now().Sub(agg.LastUpdated).Seconds()
|
||||
if diff > 1 {
|
||||
t.Fatalf("time diff too great: %f", diff)
|
||||
}
|
||||
|
||||
if agg = intvM.Samples["foo.bar"]; agg == nil {
|
||||
t.Fatalf("missing sample")
|
||||
}
|
||||
|
||||
intvM.RUnlock()
|
||||
|
||||
for i := 1; i < 10; i++ {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
inm.SetGauge([]string{"foo", "bar"}, 42)
|
||||
data = inm.Data()
|
||||
if len(data) != min(i+1, 5) {
|
||||
t.Fatalf("bad: %v", data)
|
||||
}
|
||||
}
|
||||
|
||||
// Should not exceed 5 intervals!
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
inm.SetGauge([]string{"foo", "bar"}, 42)
|
||||
data = inm.Data()
|
||||
if len(data) != 5 {
|
||||
t.Fatalf("bad: %v", data)
|
||||
}
|
||||
}
|
||||
|
||||
func min(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
115
vendor/github.com/armon/go-metrics/metrics.go
generated
vendored
Normal file
115
vendor/github.com/armon/go-metrics/metrics.go
generated
vendored
Normal file
@@ -0,0 +1,115 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (m *Metrics) SetGauge(key []string, val float32) {
|
||||
if m.HostName != "" && m.EnableHostname {
|
||||
key = insert(0, m.HostName, key)
|
||||
}
|
||||
if m.EnableTypePrefix {
|
||||
key = insert(0, "gauge", key)
|
||||
}
|
||||
if m.ServiceName != "" {
|
||||
key = insert(0, m.ServiceName, key)
|
||||
}
|
||||
m.sink.SetGauge(key, val)
|
||||
}
|
||||
|
||||
func (m *Metrics) EmitKey(key []string, val float32) {
|
||||
if m.EnableTypePrefix {
|
||||
key = insert(0, "kv", key)
|
||||
}
|
||||
if m.ServiceName != "" {
|
||||
key = insert(0, m.ServiceName, key)
|
||||
}
|
||||
m.sink.EmitKey(key, val)
|
||||
}
|
||||
|
||||
func (m *Metrics) IncrCounter(key []string, val float32) {
|
||||
if m.EnableTypePrefix {
|
||||
key = insert(0, "counter", key)
|
||||
}
|
||||
if m.ServiceName != "" {
|
||||
key = insert(0, m.ServiceName, key)
|
||||
}
|
||||
m.sink.IncrCounter(key, val)
|
||||
}
|
||||
|
||||
func (m *Metrics) AddSample(key []string, val float32) {
|
||||
if m.EnableTypePrefix {
|
||||
key = insert(0, "sample", key)
|
||||
}
|
||||
if m.ServiceName != "" {
|
||||
key = insert(0, m.ServiceName, key)
|
||||
}
|
||||
m.sink.AddSample(key, val)
|
||||
}
|
||||
|
||||
func (m *Metrics) MeasureSince(key []string, start time.Time) {
|
||||
if m.EnableTypePrefix {
|
||||
key = insert(0, "timer", key)
|
||||
}
|
||||
if m.ServiceName != "" {
|
||||
key = insert(0, m.ServiceName, key)
|
||||
}
|
||||
now := time.Now()
|
||||
elapsed := now.Sub(start)
|
||||
msec := float32(elapsed.Nanoseconds()) / float32(m.TimerGranularity)
|
||||
m.sink.AddSample(key, msec)
|
||||
}
|
||||
|
||||
// Periodically collects runtime stats to publish
|
||||
func (m *Metrics) collectStats() {
|
||||
for {
|
||||
time.Sleep(m.ProfileInterval)
|
||||
m.emitRuntimeStats()
|
||||
}
|
||||
}
|
||||
|
||||
// Emits various runtime statsitics
|
||||
func (m *Metrics) emitRuntimeStats() {
|
||||
// Export number of Goroutines
|
||||
numRoutines := runtime.NumGoroutine()
|
||||
m.SetGauge([]string{"runtime", "num_goroutines"}, float32(numRoutines))
|
||||
|
||||
// Export memory stats
|
||||
var stats runtime.MemStats
|
||||
runtime.ReadMemStats(&stats)
|
||||
m.SetGauge([]string{"runtime", "alloc_bytes"}, float32(stats.Alloc))
|
||||
m.SetGauge([]string{"runtime", "sys_bytes"}, float32(stats.Sys))
|
||||
m.SetGauge([]string{"runtime", "malloc_count"}, float32(stats.Mallocs))
|
||||
m.SetGauge([]string{"runtime", "free_count"}, float32(stats.Frees))
|
||||
m.SetGauge([]string{"runtime", "heap_objects"}, float32(stats.HeapObjects))
|
||||
m.SetGauge([]string{"runtime", "total_gc_pause_ns"}, float32(stats.PauseTotalNs))
|
||||
m.SetGauge([]string{"runtime", "total_gc_runs"}, float32(stats.NumGC))
|
||||
|
||||
// Export info about the last few GC runs
|
||||
num := stats.NumGC
|
||||
|
||||
// Handle wrap around
|
||||
if num < m.lastNumGC {
|
||||
m.lastNumGC = 0
|
||||
}
|
||||
|
||||
// Ensure we don't scan more than 256
|
||||
if num-m.lastNumGC >= 256 {
|
||||
m.lastNumGC = num - 255
|
||||
}
|
||||
|
||||
for i := m.lastNumGC; i < num; i++ {
|
||||
pause := stats.PauseNs[i%256]
|
||||
m.AddSample([]string{"runtime", "gc_pause_ns"}, float32(pause))
|
||||
}
|
||||
m.lastNumGC = num
|
||||
}
|
||||
|
||||
// Inserts a string value at an index into the slice
|
||||
func insert(i int, v string, s []string) []string {
|
||||
s = append(s, "")
|
||||
copy(s[i+1:], s[i:])
|
||||
s[i] = v
|
||||
return s
|
||||
}
|
||||
262
vendor/github.com/armon/go-metrics/metrics_test.go
generated
vendored
Normal file
262
vendor/github.com/armon/go-metrics/metrics_test.go
generated
vendored
Normal file
@@ -0,0 +1,262 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func mockMetric() (*MockSink, *Metrics) {
|
||||
m := &MockSink{}
|
||||
met := &Metrics{sink: m}
|
||||
return m, met
|
||||
}
|
||||
|
||||
func TestMetrics_SetGauge(t *testing.T) {
|
||||
m, met := mockMetric()
|
||||
met.SetGauge([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
|
||||
m, met = mockMetric()
|
||||
met.HostName = "test"
|
||||
met.EnableHostname = true
|
||||
met.SetGauge([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "test" || m.keys[0][1] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
|
||||
m, met = mockMetric()
|
||||
met.EnableTypePrefix = true
|
||||
met.SetGauge([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "gauge" || m.keys[0][1] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
|
||||
m, met = mockMetric()
|
||||
met.ServiceName = "service"
|
||||
met.SetGauge([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "service" || m.keys[0][1] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetrics_EmitKey(t *testing.T) {
|
||||
m, met := mockMetric()
|
||||
met.EmitKey([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
|
||||
m, met = mockMetric()
|
||||
met.EnableTypePrefix = true
|
||||
met.EmitKey([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "kv" || m.keys[0][1] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
|
||||
m, met = mockMetric()
|
||||
met.ServiceName = "service"
|
||||
met.EmitKey([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "service" || m.keys[0][1] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetrics_IncrCounter(t *testing.T) {
|
||||
m, met := mockMetric()
|
||||
met.IncrCounter([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
|
||||
m, met = mockMetric()
|
||||
met.EnableTypePrefix = true
|
||||
met.IncrCounter([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "counter" || m.keys[0][1] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
|
||||
m, met = mockMetric()
|
||||
met.ServiceName = "service"
|
||||
met.IncrCounter([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "service" || m.keys[0][1] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetrics_AddSample(t *testing.T) {
|
||||
m, met := mockMetric()
|
||||
met.AddSample([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
|
||||
m, met = mockMetric()
|
||||
met.EnableTypePrefix = true
|
||||
met.AddSample([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "sample" || m.keys[0][1] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
|
||||
m, met = mockMetric()
|
||||
met.ServiceName = "service"
|
||||
met.AddSample([]string{"key"}, float32(1))
|
||||
if m.keys[0][0] != "service" || m.keys[0][1] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] != 1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetrics_MeasureSince(t *testing.T) {
|
||||
m, met := mockMetric()
|
||||
met.TimerGranularity = time.Millisecond
|
||||
n := time.Now()
|
||||
met.MeasureSince([]string{"key"}, n)
|
||||
if m.keys[0][0] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] > 0.1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
|
||||
m, met = mockMetric()
|
||||
met.TimerGranularity = time.Millisecond
|
||||
met.EnableTypePrefix = true
|
||||
met.MeasureSince([]string{"key"}, n)
|
||||
if m.keys[0][0] != "timer" || m.keys[0][1] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] > 0.1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
|
||||
m, met = mockMetric()
|
||||
met.TimerGranularity = time.Millisecond
|
||||
met.ServiceName = "service"
|
||||
met.MeasureSince([]string{"key"}, n)
|
||||
if m.keys[0][0] != "service" || m.keys[0][1] != "key" {
|
||||
t.Fatalf("")
|
||||
}
|
||||
if m.vals[0] > 0.1 {
|
||||
t.Fatalf("")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetrics_EmitRuntimeStats(t *testing.T) {
|
||||
runtime.GC()
|
||||
m, met := mockMetric()
|
||||
met.emitRuntimeStats()
|
||||
|
||||
if m.keys[0][0] != "runtime" || m.keys[0][1] != "num_goroutines" {
|
||||
t.Fatalf("bad key %v", m.keys)
|
||||
}
|
||||
if m.vals[0] <= 1 {
|
||||
t.Fatalf("bad val: %v", m.vals)
|
||||
}
|
||||
|
||||
if m.keys[1][0] != "runtime" || m.keys[1][1] != "alloc_bytes" {
|
||||
t.Fatalf("bad key %v", m.keys)
|
||||
}
|
||||
if m.vals[1] <= 40000 {
|
||||
t.Fatalf("bad val: %v", m.vals)
|
||||
}
|
||||
|
||||
if m.keys[2][0] != "runtime" || m.keys[2][1] != "sys_bytes" {
|
||||
t.Fatalf("bad key %v", m.keys)
|
||||
}
|
||||
if m.vals[2] <= 100000 {
|
||||
t.Fatalf("bad val: %v", m.vals)
|
||||
}
|
||||
|
||||
if m.keys[3][0] != "runtime" || m.keys[3][1] != "malloc_count" {
|
||||
t.Fatalf("bad key %v", m.keys)
|
||||
}
|
||||
if m.vals[3] <= 100 {
|
||||
t.Fatalf("bad val: %v", m.vals)
|
||||
}
|
||||
|
||||
if m.keys[4][0] != "runtime" || m.keys[4][1] != "free_count" {
|
||||
t.Fatalf("bad key %v", m.keys)
|
||||
}
|
||||
if m.vals[4] <= 100 {
|
||||
t.Fatalf("bad val: %v", m.vals)
|
||||
}
|
||||
|
||||
if m.keys[5][0] != "runtime" || m.keys[5][1] != "heap_objects" {
|
||||
t.Fatalf("bad key %v", m.keys)
|
||||
}
|
||||
if m.vals[5] <= 100 {
|
||||
t.Fatalf("bad val: %v", m.vals)
|
||||
}
|
||||
|
||||
if m.keys[6][0] != "runtime" || m.keys[6][1] != "total_gc_pause_ns" {
|
||||
t.Fatalf("bad key %v", m.keys)
|
||||
}
|
||||
if m.vals[6] <= 100000 {
|
||||
t.Fatalf("bad val: %v", m.vals)
|
||||
}
|
||||
|
||||
if m.keys[7][0] != "runtime" || m.keys[7][1] != "total_gc_runs" {
|
||||
t.Fatalf("bad key %v", m.keys)
|
||||
}
|
||||
if m.vals[7] <= 1 {
|
||||
t.Fatalf("bad val: %v", m.vals)
|
||||
}
|
||||
|
||||
if m.keys[8][0] != "runtime" || m.keys[8][1] != "gc_pause_ns" {
|
||||
t.Fatalf("bad key %v", m.keys)
|
||||
}
|
||||
if m.vals[8] <= 1000 {
|
||||
t.Fatalf("bad val: %v", m.vals)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInsert(t *testing.T) {
|
||||
k := []string{"hi", "bob"}
|
||||
exp := []string{"hi", "there", "bob"}
|
||||
out := insert(1, "there", k)
|
||||
if !reflect.DeepEqual(exp, out) {
|
||||
t.Fatalf("bad insert %v %v", exp, out)
|
||||
}
|
||||
}
|
||||
88
vendor/github.com/armon/go-metrics/prometheus/prometheus.go
generated
vendored
Normal file
88
vendor/github.com/armon/go-metrics/prometheus/prometheus.go
generated
vendored
Normal file
@@ -0,0 +1,88 @@
|
||||
// +build go1.3
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type PrometheusSink struct {
|
||||
mu sync.Mutex
|
||||
gauges map[string]prometheus.Gauge
|
||||
summaries map[string]prometheus.Summary
|
||||
counters map[string]prometheus.Counter
|
||||
}
|
||||
|
||||
func NewPrometheusSink() (*PrometheusSink, error) {
|
||||
return &PrometheusSink{
|
||||
gauges: make(map[string]prometheus.Gauge),
|
||||
summaries: make(map[string]prometheus.Summary),
|
||||
counters: make(map[string]prometheus.Counter),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *PrometheusSink) flattenKey(parts []string) string {
|
||||
joined := strings.Join(parts, "_")
|
||||
joined = strings.Replace(joined, " ", "_", -1)
|
||||
joined = strings.Replace(joined, ".", "_", -1)
|
||||
joined = strings.Replace(joined, "-", "_", -1)
|
||||
return joined
|
||||
}
|
||||
|
||||
func (p *PrometheusSink) SetGauge(parts []string, val float32) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
key := p.flattenKey(parts)
|
||||
g, ok := p.gauges[key]
|
||||
if !ok {
|
||||
g = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: key,
|
||||
Help: key,
|
||||
})
|
||||
prometheus.MustRegister(g)
|
||||
p.gauges[key] = g
|
||||
}
|
||||
g.Set(float64(val))
|
||||
}
|
||||
|
||||
func (p *PrometheusSink) AddSample(parts []string, val float32) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
key := p.flattenKey(parts)
|
||||
g, ok := p.summaries[key]
|
||||
if !ok {
|
||||
g = prometheus.NewSummary(prometheus.SummaryOpts{
|
||||
Name: key,
|
||||
Help: key,
|
||||
MaxAge: 10 * time.Second,
|
||||
})
|
||||
prometheus.MustRegister(g)
|
||||
p.summaries[key] = g
|
||||
}
|
||||
g.Observe(float64(val))
|
||||
}
|
||||
|
||||
// EmitKey is not implemented. Prometheus doesn’t offer a type for which an
|
||||
// arbitrary number of values is retained, as Prometheus works with a pull
|
||||
// model, rather than a push model.
|
||||
func (p *PrometheusSink) EmitKey(key []string, val float32) {
|
||||
}
|
||||
|
||||
func (p *PrometheusSink) IncrCounter(parts []string, val float32) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
key := p.flattenKey(parts)
|
||||
g, ok := p.counters[key]
|
||||
if !ok {
|
||||
g = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: key,
|
||||
Help: key,
|
||||
})
|
||||
prometheus.MustRegister(g)
|
||||
p.counters[key] = g
|
||||
}
|
||||
g.Add(float64(val))
|
||||
}
|
||||
52
vendor/github.com/armon/go-metrics/sink.go
generated
vendored
Normal file
52
vendor/github.com/armon/go-metrics/sink.go
generated
vendored
Normal file
@@ -0,0 +1,52 @@
|
||||
package metrics
|
||||
|
||||
// The MetricSink interface is used to transmit metrics information
|
||||
// to an external system
|
||||
type MetricSink interface {
|
||||
// A Gauge should retain the last value it is set to
|
||||
SetGauge(key []string, val float32)
|
||||
|
||||
// Should emit a Key/Value pair for each call
|
||||
EmitKey(key []string, val float32)
|
||||
|
||||
// Counters should accumulate values
|
||||
IncrCounter(key []string, val float32)
|
||||
|
||||
// Samples are for timing information, where quantiles are used
|
||||
AddSample(key []string, val float32)
|
||||
}
|
||||
|
||||
// BlackholeSink is used to just blackhole messages
|
||||
type BlackholeSink struct{}
|
||||
|
||||
func (*BlackholeSink) SetGauge(key []string, val float32) {}
|
||||
func (*BlackholeSink) EmitKey(key []string, val float32) {}
|
||||
func (*BlackholeSink) IncrCounter(key []string, val float32) {}
|
||||
func (*BlackholeSink) AddSample(key []string, val float32) {}
|
||||
|
||||
// FanoutSink is used to sink to fanout values to multiple sinks
|
||||
type FanoutSink []MetricSink
|
||||
|
||||
func (fh FanoutSink) SetGauge(key []string, val float32) {
|
||||
for _, s := range fh {
|
||||
s.SetGauge(key, val)
|
||||
}
|
||||
}
|
||||
|
||||
func (fh FanoutSink) EmitKey(key []string, val float32) {
|
||||
for _, s := range fh {
|
||||
s.EmitKey(key, val)
|
||||
}
|
||||
}
|
||||
|
||||
func (fh FanoutSink) IncrCounter(key []string, val float32) {
|
||||
for _, s := range fh {
|
||||
s.IncrCounter(key, val)
|
||||
}
|
||||
}
|
||||
|
||||
func (fh FanoutSink) AddSample(key []string, val float32) {
|
||||
for _, s := range fh {
|
||||
s.AddSample(key, val)
|
||||
}
|
||||
}
|
||||
120
vendor/github.com/armon/go-metrics/sink_test.go
generated
vendored
Normal file
120
vendor/github.com/armon/go-metrics/sink_test.go
generated
vendored
Normal file
@@ -0,0 +1,120 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type MockSink struct {
|
||||
keys [][]string
|
||||
vals []float32
|
||||
}
|
||||
|
||||
func (m *MockSink) SetGauge(key []string, val float32) {
|
||||
m.keys = append(m.keys, key)
|
||||
m.vals = append(m.vals, val)
|
||||
}
|
||||
func (m *MockSink) EmitKey(key []string, val float32) {
|
||||
m.keys = append(m.keys, key)
|
||||
m.vals = append(m.vals, val)
|
||||
}
|
||||
func (m *MockSink) IncrCounter(key []string, val float32) {
|
||||
m.keys = append(m.keys, key)
|
||||
m.vals = append(m.vals, val)
|
||||
}
|
||||
func (m *MockSink) AddSample(key []string, val float32) {
|
||||
m.keys = append(m.keys, key)
|
||||
m.vals = append(m.vals, val)
|
||||
}
|
||||
|
||||
func TestFanoutSink_Gauge(t *testing.T) {
|
||||
m1 := &MockSink{}
|
||||
m2 := &MockSink{}
|
||||
fh := &FanoutSink{m1, m2}
|
||||
|
||||
k := []string{"test"}
|
||||
v := float32(42.0)
|
||||
fh.SetGauge(k, v)
|
||||
|
||||
if !reflect.DeepEqual(m1.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m2.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m1.vals[0], v) {
|
||||
t.Fatalf("val not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m2.vals[0], v) {
|
||||
t.Fatalf("val not equal")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFanoutSink_Key(t *testing.T) {
|
||||
m1 := &MockSink{}
|
||||
m2 := &MockSink{}
|
||||
fh := &FanoutSink{m1, m2}
|
||||
|
||||
k := []string{"test"}
|
||||
v := float32(42.0)
|
||||
fh.EmitKey(k, v)
|
||||
|
||||
if !reflect.DeepEqual(m1.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m2.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m1.vals[0], v) {
|
||||
t.Fatalf("val not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m2.vals[0], v) {
|
||||
t.Fatalf("val not equal")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFanoutSink_Counter(t *testing.T) {
|
||||
m1 := &MockSink{}
|
||||
m2 := &MockSink{}
|
||||
fh := &FanoutSink{m1, m2}
|
||||
|
||||
k := []string{"test"}
|
||||
v := float32(42.0)
|
||||
fh.IncrCounter(k, v)
|
||||
|
||||
if !reflect.DeepEqual(m1.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m2.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m1.vals[0], v) {
|
||||
t.Fatalf("val not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m2.vals[0], v) {
|
||||
t.Fatalf("val not equal")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFanoutSink_Sample(t *testing.T) {
|
||||
m1 := &MockSink{}
|
||||
m2 := &MockSink{}
|
||||
fh := &FanoutSink{m1, m2}
|
||||
|
||||
k := []string{"test"}
|
||||
v := float32(42.0)
|
||||
fh.AddSample(k, v)
|
||||
|
||||
if !reflect.DeepEqual(m1.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m2.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m1.vals[0], v) {
|
||||
t.Fatalf("val not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m2.vals[0], v) {
|
||||
t.Fatalf("val not equal")
|
||||
}
|
||||
}
|
||||
95
vendor/github.com/armon/go-metrics/start.go
generated
vendored
Normal file
95
vendor/github.com/armon/go-metrics/start.go
generated
vendored
Normal file
@@ -0,0 +1,95 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Config is used to configure metrics settings
|
||||
type Config struct {
|
||||
ServiceName string // Prefixed with keys to seperate services
|
||||
HostName string // Hostname to use. If not provided and EnableHostname, it will be os.Hostname
|
||||
EnableHostname bool // Enable prefixing gauge values with hostname
|
||||
EnableRuntimeMetrics bool // Enables profiling of runtime metrics (GC, Goroutines, Memory)
|
||||
EnableTypePrefix bool // Prefixes key with a type ("counter", "gauge", "timer")
|
||||
TimerGranularity time.Duration // Granularity of timers.
|
||||
ProfileInterval time.Duration // Interval to profile runtime metrics
|
||||
}
|
||||
|
||||
// Metrics represents an instance of a metrics sink that can
|
||||
// be used to emit
|
||||
type Metrics struct {
|
||||
Config
|
||||
lastNumGC uint32
|
||||
sink MetricSink
|
||||
}
|
||||
|
||||
// Shared global metrics instance
|
||||
var globalMetrics *Metrics
|
||||
|
||||
func init() {
|
||||
// Initialize to a blackhole sink to avoid errors
|
||||
globalMetrics = &Metrics{sink: &BlackholeSink{}}
|
||||
}
|
||||
|
||||
// DefaultConfig provides a sane default configuration
|
||||
func DefaultConfig(serviceName string) *Config {
|
||||
c := &Config{
|
||||
ServiceName: serviceName, // Use client provided service
|
||||
HostName: "",
|
||||
EnableHostname: true, // Enable hostname prefix
|
||||
EnableRuntimeMetrics: true, // Enable runtime profiling
|
||||
EnableTypePrefix: false, // Disable type prefix
|
||||
TimerGranularity: time.Millisecond, // Timers are in milliseconds
|
||||
ProfileInterval: time.Second, // Poll runtime every second
|
||||
}
|
||||
|
||||
// Try to get the hostname
|
||||
name, _ := os.Hostname()
|
||||
c.HostName = name
|
||||
return c
|
||||
}
|
||||
|
||||
// New is used to create a new instance of Metrics
|
||||
func New(conf *Config, sink MetricSink) (*Metrics, error) {
|
||||
met := &Metrics{}
|
||||
met.Config = *conf
|
||||
met.sink = sink
|
||||
|
||||
// Start the runtime collector
|
||||
if conf.EnableRuntimeMetrics {
|
||||
go met.collectStats()
|
||||
}
|
||||
return met, nil
|
||||
}
|
||||
|
||||
// NewGlobal is the same as New, but it assigns the metrics object to be
|
||||
// used globally as well as returning it.
|
||||
func NewGlobal(conf *Config, sink MetricSink) (*Metrics, error) {
|
||||
metrics, err := New(conf, sink)
|
||||
if err == nil {
|
||||
globalMetrics = metrics
|
||||
}
|
||||
return metrics, err
|
||||
}
|
||||
|
||||
// Proxy all the methods to the globalMetrics instance
|
||||
func SetGauge(key []string, val float32) {
|
||||
globalMetrics.SetGauge(key, val)
|
||||
}
|
||||
|
||||
func EmitKey(key []string, val float32) {
|
||||
globalMetrics.EmitKey(key, val)
|
||||
}
|
||||
|
||||
func IncrCounter(key []string, val float32) {
|
||||
globalMetrics.IncrCounter(key, val)
|
||||
}
|
||||
|
||||
func AddSample(key []string, val float32) {
|
||||
globalMetrics.AddSample(key, val)
|
||||
}
|
||||
|
||||
func MeasureSince(key []string, start time.Time) {
|
||||
globalMetrics.MeasureSince(key, start)
|
||||
}
|
||||
110
vendor/github.com/armon/go-metrics/start_test.go
generated
vendored
Normal file
110
vendor/github.com/armon/go-metrics/start_test.go
generated
vendored
Normal file
@@ -0,0 +1,110 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestDefaultConfig(t *testing.T) {
|
||||
conf := DefaultConfig("service")
|
||||
if conf.ServiceName != "service" {
|
||||
t.Fatalf("Bad name")
|
||||
}
|
||||
if conf.HostName == "" {
|
||||
t.Fatalf("missing hostname")
|
||||
}
|
||||
if !conf.EnableHostname || !conf.EnableRuntimeMetrics {
|
||||
t.Fatalf("expect true")
|
||||
}
|
||||
if conf.EnableTypePrefix {
|
||||
t.Fatalf("expect false")
|
||||
}
|
||||
if conf.TimerGranularity != time.Millisecond {
|
||||
t.Fatalf("bad granularity")
|
||||
}
|
||||
if conf.ProfileInterval != time.Second {
|
||||
t.Fatalf("bad interval")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_GlobalMetrics_SetGauge(t *testing.T) {
|
||||
m := &MockSink{}
|
||||
globalMetrics = &Metrics{sink: m}
|
||||
|
||||
k := []string{"test"}
|
||||
v := float32(42.0)
|
||||
SetGauge(k, v)
|
||||
|
||||
if !reflect.DeepEqual(m.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m.vals[0], v) {
|
||||
t.Fatalf("val not equal")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_GlobalMetrics_EmitKey(t *testing.T) {
|
||||
m := &MockSink{}
|
||||
globalMetrics = &Metrics{sink: m}
|
||||
|
||||
k := []string{"test"}
|
||||
v := float32(42.0)
|
||||
EmitKey(k, v)
|
||||
|
||||
if !reflect.DeepEqual(m.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m.vals[0], v) {
|
||||
t.Fatalf("val not equal")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_GlobalMetrics_IncrCounter(t *testing.T) {
|
||||
m := &MockSink{}
|
||||
globalMetrics = &Metrics{sink: m}
|
||||
|
||||
k := []string{"test"}
|
||||
v := float32(42.0)
|
||||
IncrCounter(k, v)
|
||||
|
||||
if !reflect.DeepEqual(m.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m.vals[0], v) {
|
||||
t.Fatalf("val not equal")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_GlobalMetrics_AddSample(t *testing.T) {
|
||||
m := &MockSink{}
|
||||
globalMetrics = &Metrics{sink: m}
|
||||
|
||||
k := []string{"test"}
|
||||
v := float32(42.0)
|
||||
AddSample(k, v)
|
||||
|
||||
if !reflect.DeepEqual(m.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if !reflect.DeepEqual(m.vals[0], v) {
|
||||
t.Fatalf("val not equal")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_GlobalMetrics_MeasureSince(t *testing.T) {
|
||||
m := &MockSink{}
|
||||
globalMetrics = &Metrics{sink: m}
|
||||
globalMetrics.TimerGranularity = time.Millisecond
|
||||
|
||||
k := []string{"test"}
|
||||
now := time.Now()
|
||||
MeasureSince(k, now)
|
||||
|
||||
if !reflect.DeepEqual(m.keys[0], k) {
|
||||
t.Fatalf("key not equal")
|
||||
}
|
||||
if m.vals[0] > 0.1 {
|
||||
t.Fatalf("val too large %v", m.vals[0])
|
||||
}
|
||||
}
|
||||
154
vendor/github.com/armon/go-metrics/statsd.go
generated
vendored
Normal file
154
vendor/github.com/armon/go-metrics/statsd.go
generated
vendored
Normal file
@@ -0,0 +1,154 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// statsdMaxLen is the maximum size of a packet
|
||||
// to send to statsd
|
||||
statsdMaxLen = 1400
|
||||
)
|
||||
|
||||
// StatsdSink provides a MetricSink that can be used
|
||||
// with a statsite or statsd metrics server. It uses
|
||||
// only UDP packets, while StatsiteSink uses TCP.
|
||||
type StatsdSink struct {
|
||||
addr string
|
||||
metricQueue chan string
|
||||
}
|
||||
|
||||
// NewStatsdSink is used to create a new StatsdSink
|
||||
func NewStatsdSink(addr string) (*StatsdSink, error) {
|
||||
s := &StatsdSink{
|
||||
addr: addr,
|
||||
metricQueue: make(chan string, 4096),
|
||||
}
|
||||
go s.flushMetrics()
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Close is used to stop flushing to statsd
|
||||
func (s *StatsdSink) Shutdown() {
|
||||
close(s.metricQueue)
|
||||
}
|
||||
|
||||
func (s *StatsdSink) SetGauge(key []string, val float32) {
|
||||
flatKey := s.flattenKey(key)
|
||||
s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
|
||||
}
|
||||
|
||||
func (s *StatsdSink) EmitKey(key []string, val float32) {
|
||||
flatKey := s.flattenKey(key)
|
||||
s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
|
||||
}
|
||||
|
||||
func (s *StatsdSink) IncrCounter(key []string, val float32) {
|
||||
flatKey := s.flattenKey(key)
|
||||
s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
|
||||
}
|
||||
|
||||
func (s *StatsdSink) AddSample(key []string, val float32) {
|
||||
flatKey := s.flattenKey(key)
|
||||
s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
|
||||
}
|
||||
|
||||
// Flattens the key for formatting, removes spaces
|
||||
func (s *StatsdSink) flattenKey(parts []string) string {
|
||||
joined := strings.Join(parts, ".")
|
||||
return strings.Map(func(r rune) rune {
|
||||
switch r {
|
||||
case ':':
|
||||
fallthrough
|
||||
case ' ':
|
||||
return '_'
|
||||
default:
|
||||
return r
|
||||
}
|
||||
}, joined)
|
||||
}
|
||||
|
||||
// Does a non-blocking push to the metrics queue
|
||||
func (s *StatsdSink) pushMetric(m string) {
|
||||
select {
|
||||
case s.metricQueue <- m:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Flushes metrics
|
||||
func (s *StatsdSink) flushMetrics() {
|
||||
var sock net.Conn
|
||||
var err error
|
||||
var wait <-chan time.Time
|
||||
ticker := time.NewTicker(flushInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
CONNECT:
|
||||
// Create a buffer
|
||||
buf := bytes.NewBuffer(nil)
|
||||
|
||||
// Attempt to connect
|
||||
sock, err = net.Dial("udp", s.addr)
|
||||
if err != nil {
|
||||
log.Printf("[ERR] Error connecting to statsd! Err: %s", err)
|
||||
goto WAIT
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case metric, ok := <-s.metricQueue:
|
||||
// Get a metric from the queue
|
||||
if !ok {
|
||||
goto QUIT
|
||||
}
|
||||
|
||||
// Check if this would overflow the packet size
|
||||
if len(metric)+buf.Len() > statsdMaxLen {
|
||||
_, err := sock.Write(buf.Bytes())
|
||||
buf.Reset()
|
||||
if err != nil {
|
||||
log.Printf("[ERR] Error writing to statsd! Err: %s", err)
|
||||
goto WAIT
|
||||
}
|
||||
}
|
||||
|
||||
// Append to the buffer
|
||||
buf.WriteString(metric)
|
||||
|
||||
case <-ticker.C:
|
||||
if buf.Len() == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
_, err := sock.Write(buf.Bytes())
|
||||
buf.Reset()
|
||||
if err != nil {
|
||||
log.Printf("[ERR] Error flushing to statsd! Err: %s", err)
|
||||
goto WAIT
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
WAIT:
|
||||
// Wait for a while
|
||||
wait = time.After(time.Duration(5) * time.Second)
|
||||
for {
|
||||
select {
|
||||
// Dequeue the messages to avoid backlog
|
||||
case _, ok := <-s.metricQueue:
|
||||
if !ok {
|
||||
goto QUIT
|
||||
}
|
||||
case <-wait:
|
||||
goto CONNECT
|
||||
}
|
||||
}
|
||||
QUIT:
|
||||
s.metricQueue = nil
|
||||
}
|
||||
105
vendor/github.com/armon/go-metrics/statsd_test.go
generated
vendored
Normal file
105
vendor/github.com/armon/go-metrics/statsd_test.go
generated
vendored
Normal file
@@ -0,0 +1,105 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestStatsd_Flatten(t *testing.T) {
|
||||
s := &StatsdSink{}
|
||||
flat := s.flattenKey([]string{"a", "b", "c", "d"})
|
||||
if flat != "a.b.c.d" {
|
||||
t.Fatalf("Bad flat")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatsd_PushFullQueue(t *testing.T) {
|
||||
q := make(chan string, 1)
|
||||
q <- "full"
|
||||
|
||||
s := &StatsdSink{metricQueue: q}
|
||||
s.pushMetric("omit")
|
||||
|
||||
out := <-q
|
||||
if out != "full" {
|
||||
t.Fatalf("bad val %v", out)
|
||||
}
|
||||
|
||||
select {
|
||||
case v := <-q:
|
||||
t.Fatalf("bad val %v", v)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatsd_Conn(t *testing.T) {
|
||||
addr := "127.0.0.1:7524"
|
||||
done := make(chan bool)
|
||||
go func() {
|
||||
list, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 7524})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer list.Close()
|
||||
buf := make([]byte, 1500)
|
||||
n, err := list.Read(buf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
buf = buf[:n]
|
||||
reader := bufio.NewReader(bytes.NewReader(buf))
|
||||
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %s", err)
|
||||
}
|
||||
if line != "gauge.val:1.000000|g\n" {
|
||||
t.Fatalf("bad line %s", line)
|
||||
}
|
||||
|
||||
line, err = reader.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %s", err)
|
||||
}
|
||||
if line != "key.other:2.000000|kv\n" {
|
||||
t.Fatalf("bad line %s", line)
|
||||
}
|
||||
|
||||
line, err = reader.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %s", err)
|
||||
}
|
||||
if line != "counter.me:3.000000|c\n" {
|
||||
t.Fatalf("bad line %s", line)
|
||||
}
|
||||
|
||||
line, err = reader.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %s", err)
|
||||
}
|
||||
if line != "sample.slow_thingy:4.000000|ms\n" {
|
||||
t.Fatalf("bad line %s", line)
|
||||
}
|
||||
|
||||
done <- true
|
||||
}()
|
||||
s, err := NewStatsdSink(addr)
|
||||
if err != nil {
|
||||
t.Fatalf("bad error")
|
||||
}
|
||||
|
||||
s.SetGauge([]string{"gauge", "val"}, float32(1))
|
||||
s.EmitKey([]string{"key", "other"}, float32(2))
|
||||
s.IncrCounter([]string{"counter", "me"}, float32(3))
|
||||
s.AddSample([]string{"sample", "slow thingy"}, float32(4))
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
s.Shutdown()
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
142
vendor/github.com/armon/go-metrics/statsite.go
generated
vendored
Normal file
142
vendor/github.com/armon/go-metrics/statsite.go
generated
vendored
Normal file
@@ -0,0 +1,142 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// We force flush the statsite metrics after this period of
|
||||
// inactivity. Prevents stats from getting stuck in a buffer
|
||||
// forever.
|
||||
flushInterval = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
// StatsiteSink provides a MetricSink that can be used with a
|
||||
// statsite metrics server
|
||||
type StatsiteSink struct {
|
||||
addr string
|
||||
metricQueue chan string
|
||||
}
|
||||
|
||||
// NewStatsiteSink is used to create a new StatsiteSink
|
||||
func NewStatsiteSink(addr string) (*StatsiteSink, error) {
|
||||
s := &StatsiteSink{
|
||||
addr: addr,
|
||||
metricQueue: make(chan string, 4096),
|
||||
}
|
||||
go s.flushMetrics()
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Close is used to stop flushing to statsite
|
||||
func (s *StatsiteSink) Shutdown() {
|
||||
close(s.metricQueue)
|
||||
}
|
||||
|
||||
func (s *StatsiteSink) SetGauge(key []string, val float32) {
|
||||
flatKey := s.flattenKey(key)
|
||||
s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
|
||||
}
|
||||
|
||||
func (s *StatsiteSink) EmitKey(key []string, val float32) {
|
||||
flatKey := s.flattenKey(key)
|
||||
s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
|
||||
}
|
||||
|
||||
func (s *StatsiteSink) IncrCounter(key []string, val float32) {
|
||||
flatKey := s.flattenKey(key)
|
||||
s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
|
||||
}
|
||||
|
||||
func (s *StatsiteSink) AddSample(key []string, val float32) {
|
||||
flatKey := s.flattenKey(key)
|
||||
s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
|
||||
}
|
||||
|
||||
// Flattens the key for formatting, removes spaces
|
||||
func (s *StatsiteSink) flattenKey(parts []string) string {
|
||||
joined := strings.Join(parts, ".")
|
||||
return strings.Map(func(r rune) rune {
|
||||
switch r {
|
||||
case ':':
|
||||
fallthrough
|
||||
case ' ':
|
||||
return '_'
|
||||
default:
|
||||
return r
|
||||
}
|
||||
}, joined)
|
||||
}
|
||||
|
||||
// Does a non-blocking push to the metrics queue
|
||||
func (s *StatsiteSink) pushMetric(m string) {
|
||||
select {
|
||||
case s.metricQueue <- m:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Flushes metrics
|
||||
func (s *StatsiteSink) flushMetrics() {
|
||||
var sock net.Conn
|
||||
var err error
|
||||
var wait <-chan time.Time
|
||||
var buffered *bufio.Writer
|
||||
ticker := time.NewTicker(flushInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
CONNECT:
|
||||
// Attempt to connect
|
||||
sock, err = net.Dial("tcp", s.addr)
|
||||
if err != nil {
|
||||
log.Printf("[ERR] Error connecting to statsite! Err: %s", err)
|
||||
goto WAIT
|
||||
}
|
||||
|
||||
// Create a buffered writer
|
||||
buffered = bufio.NewWriter(sock)
|
||||
|
||||
for {
|
||||
select {
|
||||
case metric, ok := <-s.metricQueue:
|
||||
// Get a metric from the queue
|
||||
if !ok {
|
||||
goto QUIT
|
||||
}
|
||||
|
||||
// Try to send to statsite
|
||||
_, err := buffered.Write([]byte(metric))
|
||||
if err != nil {
|
||||
log.Printf("[ERR] Error writing to statsite! Err: %s", err)
|
||||
goto WAIT
|
||||
}
|
||||
case <-ticker.C:
|
||||
if err := buffered.Flush(); err != nil {
|
||||
log.Printf("[ERR] Error flushing to statsite! Err: %s", err)
|
||||
goto WAIT
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
WAIT:
|
||||
// Wait for a while
|
||||
wait = time.After(time.Duration(5) * time.Second)
|
||||
for {
|
||||
select {
|
||||
// Dequeue the messages to avoid backlog
|
||||
case _, ok := <-s.metricQueue:
|
||||
if !ok {
|
||||
goto QUIT
|
||||
}
|
||||
case <-wait:
|
||||
goto CONNECT
|
||||
}
|
||||
}
|
||||
QUIT:
|
||||
s.metricQueue = nil
|
||||
}
|
||||
101
vendor/github.com/armon/go-metrics/statsite_test.go
generated
vendored
Normal file
101
vendor/github.com/armon/go-metrics/statsite_test.go
generated
vendored
Normal file
@@ -0,0 +1,101 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func acceptConn(addr string) net.Conn {
|
||||
ln, _ := net.Listen("tcp", addr)
|
||||
conn, _ := ln.Accept()
|
||||
return conn
|
||||
}
|
||||
|
||||
func TestStatsite_Flatten(t *testing.T) {
|
||||
s := &StatsiteSink{}
|
||||
flat := s.flattenKey([]string{"a", "b", "c", "d"})
|
||||
if flat != "a.b.c.d" {
|
||||
t.Fatalf("Bad flat")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatsite_PushFullQueue(t *testing.T) {
|
||||
q := make(chan string, 1)
|
||||
q <- "full"
|
||||
|
||||
s := &StatsiteSink{metricQueue: q}
|
||||
s.pushMetric("omit")
|
||||
|
||||
out := <-q
|
||||
if out != "full" {
|
||||
t.Fatalf("bad val %v", out)
|
||||
}
|
||||
|
||||
select {
|
||||
case v := <-q:
|
||||
t.Fatalf("bad val %v", v)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatsite_Conn(t *testing.T) {
|
||||
addr := "localhost:7523"
|
||||
done := make(chan bool)
|
||||
go func() {
|
||||
conn := acceptConn(addr)
|
||||
reader := bufio.NewReader(conn)
|
||||
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %s", err)
|
||||
}
|
||||
if line != "gauge.val:1.000000|g\n" {
|
||||
t.Fatalf("bad line %s", line)
|
||||
}
|
||||
|
||||
line, err = reader.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %s", err)
|
||||
}
|
||||
if line != "key.other:2.000000|kv\n" {
|
||||
t.Fatalf("bad line %s", line)
|
||||
}
|
||||
|
||||
line, err = reader.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %s", err)
|
||||
}
|
||||
if line != "counter.me:3.000000|c\n" {
|
||||
t.Fatalf("bad line %s", line)
|
||||
}
|
||||
|
||||
line, err = reader.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err %s", err)
|
||||
}
|
||||
if line != "sample.slow_thingy:4.000000|ms\n" {
|
||||
t.Fatalf("bad line %s", line)
|
||||
}
|
||||
|
||||
conn.Close()
|
||||
done <- true
|
||||
}()
|
||||
s, err := NewStatsiteSink(addr)
|
||||
if err != nil {
|
||||
t.Fatalf("bad error")
|
||||
}
|
||||
|
||||
s.SetGauge([]string{"gauge", "val"}, float32(1))
|
||||
s.EmitKey([]string{"key", "other"}, float32(2))
|
||||
s.IncrCounter([]string{"counter", "me"}, float32(3))
|
||||
s.AddSample([]string{"sample", "slow thingy"}, float32(4))
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
s.Shutdown()
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
13
vendor/manifest
vendored
13
vendor/manifest
vendored
@@ -7,6 +7,13 @@
|
||||
"revision": "75cd24fc2f2c",
|
||||
"branch": "default"
|
||||
},
|
||||
{
|
||||
"importpath": "github.com/DataDog/datadog-go/statsd",
|
||||
"repository": "https://github.com/DataDog/datadog-go",
|
||||
"revision": "b050cd8f4d7c394545fd7d966c8e2909ce89d552",
|
||||
"branch": "master",
|
||||
"path": "/statsd"
|
||||
},
|
||||
{
|
||||
"importpath": "github.com/PuerkitoBio/ghost",
|
||||
"repository": "https://github.com/PuerkitoBio/ghost",
|
||||
@@ -20,6 +27,12 @@
|
||||
"branch": "master",
|
||||
"path": "/handlers"
|
||||
},
|
||||
{
|
||||
"importpath": "github.com/armon/go-metrics",
|
||||
"repository": "https://github.com/armon/go-metrics",
|
||||
"revision": "6c5fa0d8f48f4661c9ba8709799c88d425ad20f0",
|
||||
"branch": "master"
|
||||
},
|
||||
{
|
||||
"importpath": "github.com/beorn7/perks/quantile",
|
||||
"repository": "https://github.com/beorn7/perks",
|
||||
|
||||
Reference in New Issue
Block a user