Files
weave-scope/vendor/github.com/performancecopilot/speed/client.go
Marc Carré 652cc90f98 Update weaveworks/common to latest version
```
$ gvt delete github.com/weaveworks/common
$ gvt fetch --revision 4d96fd8dcf2c7b417912c6219b310112cb4a4626 github.com/weaveworks/common
2018/07/23 15:31:11 Fetching: github.com/weaveworks/common
2018/07/23 15:31:14 · Skipping (existing): github.com/golang/protobuf/ptypes/any
2018/07/23 15:31:14 · Fetching recursive dependency: github.com/pkg/errors
2018/07/23 15:31:16 · Skipping (existing): github.com/aws/aws-sdk-go/aws
2018/07/23 15:31:16 · Fetching recursive dependency: github.com/sirupsen/logrus
2018/07/23 15:31:18 ·· Skipping (existing): golang.org/x/sys/unix
2018/07/23 15:31:18 ·· Skipping (existing): golang.org/x/crypto/ssh/terminal
2018/07/23 15:31:18 · Skipping (existing): google.golang.org/grpc/status
2018/07/23 15:31:18 · Skipping (existing): github.com/gorilla/mux
2018/07/23 15:31:18 · Fetching recursive dependency: github.com/opentracing-contrib/go-stdlib/nethttp
2018/07/23 15:31:20 ·· Skipping (existing): github.com/opentracing/opentracing-go/ext
2018/07/23 15:31:20 ·· Skipping (existing): github.com/opentracing/opentracing-go/log
2018/07/23 15:31:20 ·· Skipping (existing): github.com/opentracing/opentracing-go
2018/07/23 15:31:20 · Skipping (existing): github.com/prometheus/client_golang/prometheus
2018/07/23 15:31:20 · Skipping (existing): google.golang.org/grpc
2018/07/23 15:31:20 · Skipping (existing): github.com/pmezard/go-difflib/difflib
2018/07/23 15:31:20 · Fetching recursive dependency: github.com/go-kit/kit/log
2018/07/23 15:31:23 ·· Fetching recursive dependency: github.com/go-logfmt/logfmt
2018/07/23 15:31:25 ··· Fetching recursive dependency: github.com/kr/logfmt
2018/07/23 15:31:27 ·· Fetching recursive dependency: github.com/go-stack/stack
2018/07/23 15:31:29 · Fetching recursive dependency: google.golang.org/genproto/googleapis/rpc/status
2018/07/23 15:31:37 ·· Skipping (existing): github.com/golang/protobuf/proto
2018/07/23 15:31:37 ·· Skipping (existing): github.com/golang/protobuf/ptypes/any
2018/07/23 15:31:37 · Skipping (existing): github.com/opentracing/opentracing-go/log
2018/07/23 15:31:37 · Fetching recursive dependency: github.com/sercand/kuberesolver
2018/07/23 15:31:39 ·· Skipping (existing): google.golang.org/grpc/grpclog
2018/07/23 15:31:39 ·· Skipping (existing): google.golang.org/grpc/resolver
2018/07/23 15:31:39 ·· Skipping (existing): golang.org/x/net/context
2018/07/23 15:31:39 · Skipping (existing): google.golang.org/grpc/metadata
2018/07/23 15:31:39 · Skipping (existing): github.com/opentracing/opentracing-go/ext
2018/07/23 15:31:39 · Skipping (existing): github.com/armon/go-socks5
2018/07/23 15:31:39 · Skipping (existing): github.com/opentracing/opentracing-go
2018/07/23 15:31:39 · Skipping (existing): github.com/davecgh/go-spew/spew
2018/07/23 15:31:39 · Skipping (existing): github.com/golang/protobuf/ptypes
2018/07/23 15:31:39 · Skipping (existing): github.com/golang/protobuf/proto
2018/07/23 15:31:39 · Fetching recursive dependency: github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc
2018/07/23 15:31:41 ·· Skipping (existing): github.com/opentracing/opentracing-go/log
2018/07/23 15:31:41 ·· Skipping (existing): golang.org/x/net/context
2018/07/23 15:31:41 ·· Skipping (existing): google.golang.org/grpc/codes
2018/07/23 15:31:41 ·· Skipping (existing): github.com/golang/protobuf/proto
2018/07/23 15:31:41 ·· Skipping (existing): github.com/opentracing/opentracing-go
2018/07/23 15:31:41 ·· Skipping (existing): github.com/opentracing/opentracing-go/ext
2018/07/23 15:31:41 ·· Skipping (existing): google.golang.org/grpc
2018/07/23 15:31:41 ·· Skipping (existing): google.golang.org/grpc/metadata
2018/07/23 15:31:41 ·· Skipping (existing): google.golang.org/grpc/status
2018/07/23 15:31:41 · Fetching recursive dependency: github.com/uber/jaeger-client-go/config
2018/07/23 15:31:44 ·· Fetching recursive dependency: github.com/uber/jaeger-client-go/internal/throttler/remote
2018/07/23 15:31:44 ··· Fetching recursive dependency: github.com/uber/jaeger-client-go/utils
2018/07/23 15:31:44 ···· Fetching recursive dependency: github.com/uber/jaeger-client-go/thrift
2018/07/23 15:31:44 ···· Fetching recursive dependency: github.com/uber/jaeger-client-go/thrift-gen/agent
2018/07/23 15:31:44 ····· Fetching recursive dependency: github.com/uber/jaeger-client-go/thrift-gen/jaeger
2018/07/23 15:31:44 ····· Fetching recursive dependency: github.com/uber/jaeger-client-go/thrift-gen/zipkincore
2018/07/23 15:31:44 ··· Fetching recursive dependency: github.com/uber/jaeger-client-go
2018/07/23 15:31:44 ···· Fetching recursive dependency: github.com/crossdock/crossdock-go
2018/07/23 15:31:46 ····· Skipping (existing): github.com/davecgh/go-spew/spew
2018/07/23 15:31:46 ····· Skipping (existing): golang.org/x/net/context/ctxhttp
2018/07/23 15:31:46 ····· Skipping (existing): golang.org/x/net/context
2018/07/23 15:31:46 ····· Skipping (existing): github.com/pmezard/go-difflib/difflib
2018/07/23 15:31:46 ···· Skipping (existing): github.com/opentracing/opentracing-go/log
2018/07/23 15:31:46 ···· Fetching recursive dependency: go.uber.org/zap/zapcore
2018/07/23 15:31:49 ····· Fetching recursive dependency: go.uber.org/atomic
2018/07/23 15:31:51 ····· Fetching recursive dependency: go.uber.org/zap/internal/bufferpool
2018/07/23 15:31:51 ······ Fetching recursive dependency: go.uber.org/zap/buffer
2018/07/23 15:31:51 ····· Fetching recursive dependency: go.uber.org/multierr
2018/07/23 15:31:54 ····· Fetching recursive dependency: go.uber.org/zap/internal/exit
2018/07/23 15:31:54 ····· Fetching recursive dependency: go.uber.org/zap/internal/color
2018/07/23 15:31:54 ···· Fetching recursive dependency: go.uber.org/zap
2018/07/23 15:31:54 ···· Skipping (existing): github.com/opentracing/opentracing-go
2018/07/23 15:31:54 ···· Skipping (existing): github.com/opentracing/opentracing-go/ext
2018/07/23 15:31:54 ···· Fetching recursive dependency: github.com/uber/jaeger-lib/metrics
2018/07/23 15:31:56 ····· Fetching recursive dependency: github.com/uber-go/tally
2018/07/23 15:31:58 ······ Fetching recursive dependency: github.com/m3db/prometheus_client_golang/prometheus/promhttp
2018/07/23 15:32:00 ······· Skipping (existing): github.com/prometheus/client_golang/prometheus
2018/07/23 15:32:00 ······· Skipping (existing): github.com/prometheus/common/expfmt
2018/07/23 15:32:00 ······· Skipping (existing): github.com/prometheus/client_model/go
2018/07/23 15:32:00 ······ Fetching recursive dependency: gopkg.in/validator.v2
2018/07/23 15:32:06 ······ Fetching recursive dependency: github.com/cactus/go-statsd-client/statsd
2018/07/23 15:32:08 ······ Skipping (existing): gopkg.in/yaml.v2
2018/07/23 15:32:08 ······ Fetching recursive dependency: github.com/m3db/prometheus_client_golang/prometheus
2018/07/23 15:32:08 ······· Skipping (existing): github.com/prometheus/procfs
2018/07/23 15:32:08 ······· Skipping (existing): github.com/prometheus/client_model/go
2018/07/23 15:32:08 ······· Skipping (existing): github.com/prometheus/common/expfmt
2018/07/23 15:32:08 ······· Skipping (existing): golang.org/x/net/context
2018/07/23 15:32:08 ······· Skipping (existing): github.com/beorn7/perks/quantile
2018/07/23 15:32:08 ······· Skipping (existing): github.com/golang/protobuf/proto
2018/07/23 15:32:08 ······· Skipping (existing): github.com/prometheus/common/model
2018/07/23 15:32:08 ······· Skipping (existing): github.com/prometheus/client_golang/prometheus
2018/07/23 15:32:08 ······ Fetching recursive dependency: github.com/apache/thrift/lib/go/thrift
2018/07/23 15:32:13 ····· Skipping (existing): github.com/stretchr/testify/assert
2018/07/23 15:32:13 ····· Fetching recursive dependency: github.com/go-kit/kit/metrics/influx
2018/07/23 15:32:13 ······ Fetching recursive dependency: github.com/influxdata/influxdb/client/v2
2018/07/23 15:32:17 ······· Fetching recursive dependency: github.com/influxdata/influxdb/models
2018/07/23 15:32:17 ········ Fetching recursive dependency: github.com/influxdata/influxdb/pkg/escape
2018/07/23 15:32:17 ······ Fetching recursive dependency: github.com/go-kit/kit/metrics
2018/07/23 15:32:17 ······· Fetching recursive dependency: github.com/performancecopilot/speed
2018/07/23 15:32:19 ······· Fetching recursive dependency: github.com/aws/aws-sdk-go-v2/aws
2018/07/23 15:32:28 ········ Fetching recursive dependency: github.com/aws/aws-sdk-go-v2/internal/sdk
2018/07/23 15:32:28 ········ Skipping (existing): github.com/go-ini/ini
2018/07/23 15:32:28 ········ Fetching recursive dependency: github.com/aws/aws-sdk-go-v2/service/sts
2018/07/23 15:32:28 ········· Fetching recursive dependency: github.com/aws/aws-sdk-go-v2/private/protocol/query
2018/07/23 15:32:28 ·········· Fetching recursive dependency: github.com/aws/aws-sdk-go-v2/private/protocol
2018/07/23 15:32:28 ········· Fetching recursive dependency: github.com/aws/aws-sdk-go-v2/internal/awsutil
2018/07/23 15:32:28 ·········· Skipping (existing): github.com/jmespath/go-jmespath
2018/07/23 15:32:28 ······· Fetching recursive dependency: github.com/aws/aws-sdk-go-v2/service/cloudwatch
2018/07/23 15:32:29 ······· Skipping (existing): github.com/aws/aws-sdk-go/aws
2018/07/23 15:32:29 ······· Skipping (existing): github.com/prometheus/client_golang/prometheus
2018/07/23 15:32:29 ······· Skipping (existing): github.com/aws/aws-sdk-go/service/cloudwatch
2018/07/23 15:32:29 ······· Skipping (existing): github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface
2018/07/23 15:32:29 ······· Fetching recursive dependency: golang.org/x/sync/errgroup
2018/07/23 15:32:31 ········ Skipping (existing): golang.org/x/net/context
2018/07/23 15:32:31 ······· Fetching recursive dependency: github.com/go-kit/kit/util/conn
2018/07/23 15:32:31 ······· Fetching recursive dependency: github.com/VividCortex/gohistogram
2018/07/23 15:32:33 ····· Skipping (existing): github.com/prometheus/client_golang/prometheus
2018/07/23 15:32:33 ····· Fetching recursive dependency: github.com/codahale/hdrhistogram
2018/07/23 15:32:35 ·· Skipping (existing): github.com/opentracing/opentracing-go
2018/07/23 15:32:35 · Fetching recursive dependency: github.com/mwitkow/go-grpc-middleware
2018/07/23 15:32:37 ·· Fetching recursive dependency: github.com/grpc-ecosystem/go-grpc-middleware/logging
2018/07/23 15:32:39 ··· Fetching recursive dependency: github.com/grpc-ecosystem/go-grpc-middleware
2018/07/23 15:32:39 ···· Fetching recursive dependency: github.com/golang/protobuf/jsonpb
2018/07/23 15:32:42 ····· Skipping (existing): github.com/golang/protobuf/ptypes/timestamp
2018/07/23 15:32:42 ····· Skipping (existing): github.com/golang/protobuf/proto
2018/07/23 15:32:42 ····· Skipping (existing): github.com/golang/protobuf/ptypes/duration
2018/07/23 15:32:42 ····· Skipping (existing): github.com/golang/protobuf/ptypes/any
2018/07/23 15:32:42 ····· Skipping (existing): github.com/golang/protobuf/ptypes/struct
2018/07/23 15:32:42 ····· Skipping (existing): github.com/golang/protobuf/ptypes/wrappers
2018/07/23 15:32:42 ···· Skipping (existing): google.golang.org/grpc/metadata
2018/07/23 15:32:42 ···· Fetching recursive dependency: github.com/stretchr/testify/suite
2018/07/23 15:32:45 ····· Skipping (existing): github.com/stretchr/testify/assert
2018/07/23 15:32:45 ····· Fetching recursive dependency: github.com/stretchr/testify/require
2018/07/23 15:32:45 ······ Skipping (existing): github.com/stretchr/testify/assert
2018/07/23 15:32:45 ···· Skipping (existing): google.golang.org/grpc/peer
2018/07/23 15:32:45 ···· Skipping (existing): golang.org/x/net/context
2018/07/23 15:32:45 ···· Skipping (existing): golang.org/x/net/trace
2018/07/23 15:32:45 ···· Fetching recursive dependency: github.com/gogo/protobuf/gogoproto
2018/07/23 15:32:48 ····· Fetching recursive dependency: github.com/gogo/protobuf/protoc-gen-gogo/descriptor
2018/07/23 15:32:48 ······ Skipping (existing): github.com/gogo/protobuf/proto
2018/07/23 15:32:48 ····· Skipping (existing): github.com/gogo/protobuf/proto
2018/07/23 15:32:48 ···· Skipping (existing): google.golang.org/grpc/credentials
2018/07/23 15:32:48 ···· Skipping (existing): google.golang.org/grpc
2018/07/23 15:32:48 ···· Skipping (existing): github.com/opentracing/opentracing-go
2018/07/23 15:32:48 ···· Skipping (existing): google.golang.org/grpc/codes
2018/07/23 15:32:48 ···· Skipping (existing): github.com/golang/protobuf/proto
2018/07/23 15:32:48 ···· Skipping (existing): google.golang.org/grpc/grpclog
2018/07/23 15:32:48 ···· Skipping (existing): github.com/opentracing/opentracing-go/ext
2018/07/23 15:32:48 ···· Skipping (existing): github.com/opentracing/opentracing-go/log
2018/07/23 15:32:48 ··· Skipping (existing): golang.org/x/net/context
2018/07/23 15:32:48 ··· Skipping (existing): google.golang.org/grpc
2018/07/23 15:32:48 ··· Skipping (existing): google.golang.org/grpc/grpclog
2018/07/23 15:32:48 ··· Skipping (existing): google.golang.org/grpc/codes
2018/07/23 15:32:48 ··· Skipping (existing): github.com/golang/protobuf/proto
2018/07/23 15:32:48 ·· Skipping (existing): github.com/opentracing/opentracing-go
2018/07/23 15:32:48 ·· Skipping (existing): google.golang.org/grpc
2018/07/23 15:32:48 ·· Skipping (existing): golang.org/x/net/context
2018/07/23 15:32:48 ·· Skipping (existing): google.golang.org/grpc/codes
2018/07/23 15:32:48 ·· Skipping (existing): google.golang.org/grpc/grpclog
2018/07/23 15:32:48 ·· Skipping (existing): github.com/opentracing/opentracing-go/log
2018/07/23 15:32:48 ·· Skipping (existing): google.golang.org/grpc/metadata
2018/07/23 15:32:48 ·· Skipping (existing): google.golang.org/grpc/peer
2018/07/23 15:32:48 ·· Skipping (existing): google.golang.org/grpc/credentials
2018/07/23 15:32:48 ·· Skipping (existing): github.com/golang/protobuf/proto
2018/07/23 15:32:48 ·· Skipping (existing): golang.org/x/net/trace
2018/07/23 15:32:48 ·· Skipping (existing): github.com/opentracing/opentracing-go/ext
2018/07/23 15:32:48 · Fetching recursive dependency: github.com/weaveworks/promrus
2018/07/23 15:32:53 ·· Skipping (existing): gopkg.in/yaml.v2
2018/07/23 15:32:53 ·· Skipping (existing): golang.org/x/net/context/ctxhttp
2018/07/23 15:32:53 ·· Fetching recursive dependency: github.com/stretchr/objx
2018/07/23 15:32:55 ·· Fetching recursive dependency: gopkg.in/alecthomas/kingpin.v2
2018/07/23 15:32:58 ··· Fetching recursive dependency: github.com/alecthomas/units
2018/07/23 15:33:00 ··· Fetching recursive dependency: github.com/alecthomas/template
2018/07/23 15:33:02 ·· Fetching recursive dependency: github.com/julienschmidt/httprouter
2018/07/23 15:33:05 ·· Skipping (existing): golang.org/x/net/context
2018/07/23 15:33:05 · Skipping (existing): github.com/aws/aws-sdk-go/aws/credentials
2018/07/23 15:33:05 · Skipping (existing): github.com/golang/protobuf/ptypes/empty
2018/07/23 15:33:05 · Skipping (existing): golang.org/x/net/context
2018/07/23 15:33:05 · Skipping (existing): golang.org/x/tools/cover
2018/07/23 15:33:05 · Skipping (existing): github.com/mgutz/ansi
```
2018-07-23 20:10:13 +02:00

698 lines
16 KiB
Go

package speed
import (
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/performancecopilot/speed/bytewriter"
)
// byte lengths of different components in an mmv file
const (
HeaderLength = 40
TocLength = 16
Metric1Length = 104
Metric2Length = 48
ValueLength = 32
Instance1Length = 80
Instance2Length = 24
InstanceDomainLength = 32
StringLength = 256
)
// MaxV1NameLength is the maximum length for a metric/instance name
// under MMV format 1
const MaxV1NameLength = 63
// MaxDataValueSize is the maximum byte length for a stored metric value, unless it is a string
const MaxDataValueSize = 16
// EraseFileOnStop if set to true, will also delete the memory mapped file
var EraseFileOnStop = false
// Client defines the interface for a type that can talk to an instrumentation agent
type Client interface {
// a client must contain a registry of metrics
Registry() Registry
// starts monitoring
Start() error
// Start that will panic on failure
MustStart()
// stop monitoring
Stop() error
// Stop that will panic on failure
MustStop()
// adds a metric to be monitored
Register(Metric) error
// tries to add a metric to be written and panics on error
MustRegister(Metric)
// adds metric from a string
RegisterString(string, interface{}, MetricType, MetricSemantics, MetricUnit) (Metric, error)
// tries to add a metric from a string and panics on an error
MustRegisterString(string, interface{}, MetricType, MetricSemantics, MetricUnit) Metric
}
///////////////////////////////////////////////////////////////////////////////
func mmvFileLocation(name string) (string, error) {
if strings.ContainsRune(name, os.PathSeparator) {
return "", errors.New("name cannot have path separator")
}
tdir, present := config["PCP_TMP_DIR"]
var loc string
if present {
loc = filepath.Join(rootPath, tdir)
} else {
loc = os.TempDir()
}
return filepath.Join(loc, "mmv", name), nil
}
// PCPClusterIDBitLength is the bit length of the cluster id
// for a set of PCP metrics
const PCPClusterIDBitLength = 12
// MMVFlag represents an enumerated type to represent mmv flag values
type MMVFlag int
// values for MMVFlag
const (
NoPrefixFlag MMVFlag = 1 << iota
ProcessFlag
SentinelFlag
)
//go:generate stringer -type=MMVFlag
// PCPClient implements a client that can generate instrumentation for PCP
type PCPClient struct {
mutex sync.Mutex
loc string // absolute location of the mmv file
clusterID uint32 // cluster identifier for the writer
flag MMVFlag // write flag
r *PCPRegistry // current registry
writer bytewriter.Writer
instanceoffsetc chan int
indomoffsetc chan int
metricoffsetc chan int
valueoffsetc chan int
stringoffsetc chan int
}
// NewPCPClient initializes a new PCPClient object
func NewPCPClient(name string) (*PCPClient, error) {
return NewPCPClientWithRegistry(name, NewPCPRegistry())
}
// NewPCPClientWithRegistry initializes a new PCPClient object with the given registry
func NewPCPClientWithRegistry(name string, registry *PCPRegistry) (*PCPClient, error) {
fileLocation, err := mmvFileLocation(name)
if err != nil {
return nil, errors.Wrap(err, "could not get a location for storing MMV file")
}
return &PCPClient{
loc: fileLocation,
r: registry,
clusterID: hash(name, PCPClusterIDBitLength),
flag: ProcessFlag,
}, nil
}
// Registry returns a writer's registry
func (c *PCPClient) Registry() Registry {
return c.r
}
// SetFlag sets the MMVflag for the client
func (c *PCPClient) SetFlag(flag MMVFlag) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.r.mapped {
return errors.New("cannot set mmv flag for an active client")
}
c.flag = flag
return nil
}
func (c *PCPClient) tocCount() int {
ans := 2
if c.r.InstanceCount() > 0 {
ans += 2
}
if c.r.StringCount() > 0 {
ans++
}
return ans
}
// Length returns the byte length of data in the mmv file written by the current writer
func (c *PCPClient) Length() int {
var (
InstanceLength = Instance1Length
MetricLength = Metric1Length
)
if c.r.version2 {
InstanceLength = Instance2Length
MetricLength = Metric2Length
}
return HeaderLength +
(c.tocCount() * TocLength) +
(c.r.InstanceCount() * InstanceLength) +
(c.r.InstanceDomainCount() * InstanceDomainLength) +
(c.r.MetricCount() * MetricLength) +
(c.r.ValuesCount() * ValueLength) +
(c.r.StringCount() * StringLength)
}
// Start dumps existing registry data
func (c *PCPClient) Start() error {
c.mutex.Lock()
defer c.mutex.Unlock()
l := c.Length()
writer, err := bytewriter.NewMemoryMappedWriter(c.loc, l)
if err != nil {
return errors.Wrap(err, "cannot create MemoryMappedBuffer in client")
}
c.writer = writer
c.start()
c.r.mapped = true
return nil
}
func (c *PCPClient) start() {
var (
InstanceLength = Instance1Length
MetricLength = Metric1Length
)
if c.r.version2 {
InstanceLength = Instance2Length
MetricLength = Metric2Length
}
c.r.indomoffset = HeaderLength + TocLength*c.tocCount()
c.r.instanceoffset = c.r.indomoffset + InstanceDomainLength*c.r.InstanceDomainCount()
c.r.metricsoffset = c.r.instanceoffset + InstanceLength*c.r.InstanceCount()
c.r.valuesoffset = c.r.metricsoffset + MetricLength*c.r.MetricCount()
c.r.stringsoffset = c.r.valuesoffset + ValueLength*c.r.ValuesCount()
if c.r.InstanceDomainCount() > 0 {
c.instanceoffsetc, c.indomoffsetc = make(chan int, 1), make(chan int, 1)
c.instanceoffsetc <- c.r.instanceoffset
c.indomoffsetc <- c.r.indomoffset
}
if c.r.MetricCount() > 0 {
c.metricoffsetc, c.valueoffsetc = make(chan int, 1), make(chan int, 1)
c.metricoffsetc <- c.r.metricsoffset
c.valueoffsetc <- c.r.valuesoffset
}
if c.r.StringCount() > 0 {
c.stringoffsetc = make(chan int, 1)
c.stringoffsetc <- c.r.stringsoffset
}
genc, g2offc := make(chan int64), make(chan int)
go c.writeHeaderBlock(genc, g2offc)
var wg sync.WaitGroup
wg.Add(2)
go func() {
c.writeTocBlock()
wg.Done()
}()
go func() {
// instance domains **have** to be written before metrics
// as metrics need instance offsets and multiple metrics
// can have the same indom, so they need to be cached
c.writeInstanceDomains()
c.writeMetrics()
wg.Done()
}()
gen, g2off := <-genc, <-g2offc
wg.Wait()
// must *always* be the last thing to happen
_ = c.writer.MustWriteInt64(gen, g2off)
}
func (c *PCPClient) writeHeaderBlock(genc chan int64, g2offc chan int) {
// tag
c.writer.MustWriteString("MMV", 0)
var pos int
// version
if c.r.version2 {
pos = c.writer.MustWriteUint32(2, 4)
} else {
pos = c.writer.MustWriteUint32(1, 4)
}
// generation
gen := time.Now().Unix()
pos = c.writer.MustWriteInt64(gen, pos)
g2off := pos
pos = c.writer.MustWriteInt64(0, pos)
// tocCount
pos = c.writer.MustWriteInt32(int32(c.tocCount()), pos)
// flag mask
pos = c.writer.MustWriteInt32(int32(c.flag), pos)
// process identifier
pos = c.writer.MustWriteInt32(int32(os.Getpid()), pos)
// cluster identifier
_ = c.writer.MustWriteUint32(c.clusterID, pos)
// NOTE: the order here is important, should be same as in start()
// or deadlock
genc <- gen
g2offc <- g2off
return
}
func (c *PCPClient) writeTocBlock() {
var wg sync.WaitGroup
tocpos := HeaderLength
wg.Add(c.tocCount())
// instance domains toc
if c.r.InstanceDomainCount() > 0 {
go func(pos int) {
// 1 is the identifier for instance domains
c.writeSingleToc(pos, 1, c.r.InstanceDomainCount(), c.r.indomoffset)
wg.Done()
}(tocpos)
tocpos += TocLength
}
// instances toc
if c.r.InstanceCount() > 0 {
go func(pos int) {
// 2 is the identifier for instances
c.writeSingleToc(pos, 2, c.r.InstanceCount(), c.r.instanceoffset)
wg.Done()
}(tocpos)
tocpos += TocLength
}
// metrics and values toc
metricsoffset, valuesoffset := c.r.metricsoffset, c.r.valuesoffset
if c.r.MetricCount() == 0 {
metricsoffset, valuesoffset = 0, 0
}
go func(pos int) {
// 3 is the identifier for metrics
c.writeSingleToc(pos, 3, c.r.MetricCount(), metricsoffset)
wg.Done()
}(tocpos)
tocpos += TocLength
go func(pos int) {
// 4 is the identifier for values
c.writeSingleToc(pos, 4, c.r.ValuesCount(), valuesoffset)
wg.Done()
}(tocpos)
tocpos += TocLength
// strings toc
if c.r.StringCount() > 0 {
go func(pos int) {
// 5 is the identifier for strings
c.writeSingleToc(pos, 5, c.r.StringCount(), c.r.stringsoffset)
wg.Done()
}(tocpos)
}
wg.Wait()
}
func (c *PCPClient) writeSingleToc(pos, identifier, count, offset int) {
pos = c.writer.MustWriteInt32(int32(identifier), pos)
pos = c.writer.MustWriteInt32(int32(count), pos)
_ = c.writer.MustWriteUint64(uint64(offset), pos)
}
func (c *PCPClient) writeInstanceDomains() {
var wg sync.WaitGroup
wg.Add(c.r.InstanceDomainCount())
for _, indom := range c.r.instanceDomains {
go func(indom *PCPInstanceDomain) {
c.writeInstanceDomain(indom)
wg.Done()
}(indom)
}
wg.Wait()
}
func (c *PCPClient) writeInstanceDomain(indom *PCPInstanceDomain) {
off := <-c.indomoffsetc
c.indomoffsetc <- off + InstanceDomainLength
InstanceLength := Instance1Length
if c.r.version2 {
InstanceLength = Instance2Length
}
inoff := off
ioff := <-c.instanceoffsetc
c.instanceoffsetc <- ioff + InstanceLength*indom.InstanceCount()
var wg sync.WaitGroup
wg.Add(indom.InstanceCount())
off = c.writer.MustWriteUint32(indom.id, off)
off = c.writer.MustWriteInt32(int32(indom.InstanceCount()), off)
off = c.writer.MustWriteInt64(int64(ioff), off)
for _, i := range indom.instances {
go func(i *pcpInstance, offset int) {
c.writeInstance(i, inoff, offset)
wg.Done()
}(i, ioff)
ioff += InstanceLength
}
so, lo := 0, 0
if indom.shortDescription != "" {
so = <-c.stringoffsetc
c.stringoffsetc <- so + StringLength
c.writer.MustWriteString(indom.shortDescription, so)
}
if indom.longDescription != "" {
lo = <-c.stringoffsetc
c.stringoffsetc <- lo + StringLength
c.writer.MustWriteString(indom.longDescription, lo)
}
off = c.writer.MustWriteUint64(uint64(so), off)
_ = c.writer.MustWriteUint64(uint64(lo), off)
wg.Wait()
}
func (c *PCPClient) writeInstance(i *pcpInstance, indomoff int, off int) {
i.offset = off
off = c.writer.MustWriteInt64(int64(indomoff), off)
off = c.writer.MustWriteInt32(0, off)
off = c.writer.MustWriteUint32(i.id, off)
if c.r.version2 {
soff := <-c.stringoffsetc
c.stringoffsetc <- soff + StringLength
c.writer.MustWriteUint64(uint64(soff), off)
c.writer.MustWriteString(i.name, soff)
} else {
c.writer.MustWriteString(i.name, off)
}
}
func (c *PCPClient) writeMetrics() {
var wg sync.WaitGroup
launchSingletonMetric := func(metric *pcpSingletonMetric) {
go func() {
c.writeSingletonMetric(metric)
wg.Done()
}()
}
launchInstanceMetric := func(metric *pcpInstanceMetric) {
go func() {
c.writeInstanceMetric(metric)
wg.Done()
}()
}
wg.Add(c.r.MetricCount())
for _, m := range c.r.metrics {
switch metric := m.(type) {
case *PCPSingletonMetric:
launchSingletonMetric(metric.pcpSingletonMetric)
case *PCPCounter:
launchSingletonMetric(metric.pcpSingletonMetric)
case *PCPGauge:
launchSingletonMetric(metric.pcpSingletonMetric)
case *PCPTimer:
launchSingletonMetric(metric.pcpSingletonMetric)
case *PCPInstanceMetric:
launchInstanceMetric(metric.pcpInstanceMetric)
case *PCPCounterVector:
launchInstanceMetric(metric.pcpInstanceMetric)
case *PCPGaugeVector:
launchInstanceMetric(metric.pcpInstanceMetric)
case *PCPHistogram:
launchInstanceMetric(metric.pcpInstanceMetric)
}
}
wg.Wait()
}
func (c *PCPClient) writeSingletonMetric(m *pcpSingletonMetric) {
var wg sync.WaitGroup
wg.Add(2)
doff := <-c.metricoffsetc
go func() {
c.writeMetricDesc(m.pcpMetricDesc, m.Indom(), doff)
wg.Done()
}()
off := <-c.valueoffsetc
c.valueoffsetc <- off + ValueLength
go func(offset int) {
m.update = c.writeValue(m.t, m.val, offset)
wg.Done()
}(off)
off = c.writer.MustWriteInt64(int64(doff), off+MaxDataValueSize)
_ = c.writer.MustWriteInt64(0, off)
wg.Wait()
}
func (c *PCPClient) writeInstanceMetric(m *pcpInstanceMetric) {
var wg sync.WaitGroup
wg.Add(1 + m.Indom().InstanceCount())
doff := <-c.metricoffsetc
go func() {
c.writeMetricDesc(m.pcpMetricDesc, m.Indom(), doff)
wg.Done()
}()
for name, i := range m.indom.instances {
off := <-c.valueoffsetc
c.valueoffsetc <- off + ValueLength
go func(i *instanceValue, offset int) {
i.update = c.writeValue(m.t, i.val, offset)
wg.Done()
}(m.vals[name], off)
off = c.writer.MustWriteInt64(int64(doff), off+MaxDataValueSize)
_ = c.writer.MustWriteInt64(int64(i.offset), off)
}
wg.Wait()
}
func (c *PCPClient) writeMetricDesc(desc *pcpMetricDesc, indom *PCPInstanceDomain, off int) {
if c.r.version2 {
c.metricoffsetc <- off + Metric2Length
noff := <-c.stringoffsetc
c.stringoffsetc <- noff + StringLength
off = c.writer.MustWriteUint64(uint64(noff), off)
c.writer.MustWriteString(desc.name, noff)
} else {
c.metricoffsetc <- off + Metric1Length
c.writer.MustWriteString(desc.name, off)
off += MaxV1NameLength + 1
}
off = c.writer.MustWriteUint32(desc.id, off)
off = c.writer.MustWriteInt32(int32(desc.t), off)
off = c.writer.MustWriteInt32(int32(desc.sem), off)
off = c.writer.MustWriteUint32(desc.u.PMAPI(), off)
if indom != nil {
off = c.writer.MustWriteUint32(indom.ID(), off)
} else {
off = c.writer.MustWriteInt32(-1, off)
}
off = c.writer.MustWriteInt32(0, off)
so, lo := 0, 0
if desc.shortDescription != "" {
so = <-c.stringoffsetc
c.stringoffsetc <- so + StringLength
c.writer.MustWriteString(desc.shortDescription, so)
}
if desc.longDescription != "" {
lo = <-c.stringoffsetc
c.stringoffsetc <- lo + StringLength
c.writer.MustWriteString(desc.longDescription, lo)
}
off = c.writer.MustWriteUint64(uint64(so), off)
_ = c.writer.MustWriteUint64(uint64(lo), off)
}
func (c *PCPClient) writeValue(t MetricType, val interface{}, offset int) updateClosure {
if t == StringType {
pos := c.writer.MustWriteUint64(StringLength-1, offset)
offset = <-c.stringoffsetc
c.stringoffsetc <- offset + StringLength
c.writer.MustWriteUint64(uint64(offset), pos)
}
update := newupdateClosure(offset, c.writer)
_ = update(val)
return update
}
// MustStart is a start that panics
func (c *PCPClient) MustStart() {
if err := c.Start(); err != nil {
panic(err)
}
}
// Stop removes existing mapping and cleans up
func (c *PCPClient) Stop() error {
c.mutex.Lock()
defer c.mutex.Unlock()
if !c.r.mapped {
return errors.New("trying to stop an already stopped mapping")
}
c.stop()
c.r.mapped = false
err := c.writer.(*bytewriter.MemoryMappedWriter).Unmap(EraseFileOnStop)
c.writer = nil
if err != nil {
return errors.Wrap(err, "client: error unmapping MemoryMappedBuffer")
}
return nil
}
func (c *PCPClient) stop() {
c.instanceoffsetc, c.indomoffsetc = nil, nil
c.metricoffsetc, c.valueoffsetc = nil, nil
c.stringoffsetc = nil
}
// MustStop is a stop that panics
func (c *PCPClient) MustStop() {
if err := c.Stop(); err != nil {
panic(err)
}
}
// Register is simply a shorthand for Registry().AddMetric
func (c *PCPClient) Register(m Metric) error { return c.r.AddMetric(m) }
// MustRegister is simply a Register that can panic
func (c *PCPClient) MustRegister(m Metric) {
if err := c.Register(m); err != nil {
panic(err)
}
}
// RegisterIndom is simply a shorthand for Registry().AddInstanceDomain
func (c *PCPClient) RegisterIndom(indom InstanceDomain) error {
return c.r.AddInstanceDomain(indom)
}
// MustRegisterIndom is simply a RegisterIndom that can panic
func (c *PCPClient) MustRegisterIndom(indom InstanceDomain) {
if err := c.RegisterIndom(indom); err != nil {
panic(err)
}
}
// RegisterString is simply a shorthand for Registry().AddMetricByString
func (c *PCPClient) RegisterString(str string, val interface{}, t MetricType, s MetricSemantics, u MetricUnit) (Metric, error) {
return c.r.AddMetricByString(str, val, t, s, u)
}
// MustRegisterString is simply a RegisterString that panics
func (c *PCPClient) MustRegisterString(str string, val interface{}, t MetricType, s MetricSemantics, u MetricUnit) Metric {
if m, err := c.RegisterString(str, val, t, s, u); err != nil {
panic(err)
} else {
return m
}
}