Merge pull request #2359 from weaveworks/kinesis-summary-emitter

Fluent Billing Emitter
This commit is contained in:
Paul Bellamy
2017-03-22 12:05:54 +00:00
committed by GitHub
7 changed files with 424 additions and 0 deletions

View File

@@ -0,0 +1,116 @@
package multitenant
import (
"crypto/sha256"
"encoding/base64"
"flag"
"strings"
"time"
log "github.com/Sirupsen/logrus"
billing "github.com/weaveworks/billing-client"
"golang.org/x/net/context"
"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/report"
)
// BillingEmitterConfig has everything we need to make a billing emitter
type BillingEmitterConfig struct {
Enabled bool
DefaultInterval time.Duration
UserIDer UserIDer
}
// RegisterFlags registers the billing emitter flags with the main flag set.
func (cfg *BillingEmitterConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "app.billing.enabled", false, "enable emitting billing info")
f.DurationVar(&cfg.DefaultInterval, "app.billing.default-publish-interval", 3*time.Second, "default publish interval to assume for reports")
}
// BillingEmitter is the billing emitter
type BillingEmitter struct {
app.Collector
BillingEmitterConfig
billing *billing.Client
}
// NewBillingEmitter changes a new billing emitter which emits billing events
func NewBillingEmitter(upstream app.Collector, billingClient *billing.Client, cfg BillingEmitterConfig) (*BillingEmitter, error) {
return &BillingEmitter{
Collector: upstream,
billing: billingClient,
BillingEmitterConfig: cfg,
}, nil
}
// Add implements app.Collector
func (e *BillingEmitter) Add(ctx context.Context, rep report.Report, buf []byte) error {
now := time.Now().UTC()
userID, err := e.UserIDer(ctx)
if err != nil {
return err
}
rowKey, colKey := calculateDynamoKeys(userID, now)
containerCount := int64(len(rep.Container.Nodes))
interval := e.reportInterval(rep)
hasher := sha256.New()
hasher.Write(buf)
hash := "sha256:" + base64.URLEncoding.EncodeToString(hasher.Sum(nil))
amounts := billing.Amounts{billing.ContainerSeconds: int64(interval) * containerCount}
metadata := map[string]string{
"row_key": rowKey,
"col_key": colKey,
}
err = e.billing.AddAmounts(
hash,
userID,
now,
amounts,
metadata,
)
if err != nil {
log.Errorf("Failed emitting billing data: %v", err)
return err
}
return e.Collector.Add(ctx, rep, buf)
}
// reportInterval tries to find the custom report interval of this report. If
// it is malformed, or not set, it returns false.
func (e *BillingEmitter) reportInterval(r report.Report) time.Duration {
var inter string
for _, c := range r.Process.Nodes {
cmd, ok := c.Latest.Lookup("cmdline")
if !ok {
continue
}
if strings.Contains(cmd, "scope-probe") &&
strings.Contains(cmd, "probe.publish.interval") {
cmds := strings.SplitAfter(cmd, "probe.publish.interval")
aft := strings.Split(cmds[1], " ")
if aft[0] == "" {
inter = aft[1]
} else {
inter = aft[0][1:]
}
}
}
if inter == "" {
return e.DefaultInterval
}
d, err := time.ParseDuration(inter)
if err != nil {
return e.DefaultInterval
}
return d
}
// Close shuts down the billing emitter and billing client flushing events.
func (e *BillingEmitter) Close() error {
return e.billing.Close()
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/weaveworks/go-checkpoint"
"github.com/weaveworks/weave/common"
billing "github.com/weaveworks/billing-client"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/network"
"github.com/weaveworks/scope/app"
@@ -45,6 +46,7 @@ var (
func init() {
prometheus.MustRegister(requestDuration)
billing.MustRegisterMetrics()
}
// Router creates the mux for all the various app components.
@@ -155,6 +157,19 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo
return nil, fmt.Errorf("Invalid collector '%s'", collectorURL)
}
func emitterFactory(collector app.Collector, clientCfg billing.Config, userIDer multitenant.UserIDer, emitterCfg multitenant.BillingEmitterConfig) (*multitenant.BillingEmitter, error) {
billingClient, err := billing.NewClient(clientCfg)
if err != nil {
return nil, err
}
emitterCfg.UserIDer = userIDer
return multitenant.NewBillingEmitter(
collector,
billingClient,
emitterCfg,
)
}
func controlRouterFactory(userIDer multitenant.UserIDer, controlRouterURL string) (app.ControlRouter, error) {
if controlRouterURL == "local" {
return app.NewLocalControlRouter(), nil
@@ -230,6 +245,16 @@ func appMain(flags appFlags) {
return
}
if flags.BillingEmitterConfig.Enabled {
billingEmitter, err := emitterFactory(collector, flags.BillingClientConfig, userIDer, flags.BillingEmitterConfig)
if err != nil {
log.Fatalf("Error creating emitter: %v", err)
return
}
defer billingEmitter.Close()
collector = billingEmitter
}
controlRouter, err := controlRouterFactory(userIDer, flags.controlRouterURL)
if err != nil {
log.Fatalf("Error creating control router: %v", err)

View File

@@ -14,7 +14,9 @@ import (
log "github.com/Sirupsen/logrus"
billing "github.com/weaveworks/billing-client"
"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/app/multitenant"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/appclient"
"github.com/weaveworks/scope/probe/host"
@@ -151,6 +153,9 @@ type appFlags struct {
awsCreateTables bool
consulInf string
multitenant.BillingEmitterConfig
BillingClientConfig billing.Config
}
type containerLabelFiltersFlag struct {
@@ -352,6 +357,8 @@ func main() {
flag.BoolVar(&flags.app.awsCreateTables, "app.aws.create.tables", false, "Create the tables in DynamoDB")
flag.StringVar(&flags.app.consulInf, "app.consul.inf", "", "The interface who's address I should advertise myself under in consul")
flags.app.BillingEmitterConfig.RegisterFlags(flag.CommandLine)
flags.app.BillingClientConfig.RegisterFlags(flag.CommandLine)
flag.Parse()
app.AddContainerFilters(append(containerLabelFilterFlags.apiTopologyOptions, containerLabelFilterFlagsExclude.apiTopologyOptions...)...)

206
vendor/github.com/weaveworks/billing-client/client.go generated vendored Normal file
View File

@@ -0,0 +1,206 @@
package billing
import (
"fmt"
"net"
"strconv"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/fluent/fluent-logger-golang/fluent"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"github.com/weaveworks/common/instrument"
)
var (
// RequestDuration is the duration of billing client requests
RequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "billing_client",
Name: "request_duration_seconds",
Help: "Time in seconds spent emitting billing info.",
Buckets: prometheus.DefBuckets,
}, []string{"method", "status_code"})
// EventsCounter is the count of billing events
EventsCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "billing_client",
Name: "events",
Help: "Number of billing events",
}, []string{"status"})
// AmountsCounter is the total of the billing amounts
AmountsCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "billing_client",
Name: "amounts",
Help: "Number and type of billing amounts",
}, []string{"status", "amount_type"})
)
// MustRegisterMetrics is a convenience function for registering all the metrics from this package
func MustRegisterMetrics() {
prometheus.MustRegister(RequestDuration)
prometheus.MustRegister(EventsCounter)
prometheus.MustRegister(AmountsCounter)
}
// Client is a billing client for sending usage information to the billing system.
type Client struct {
stop chan struct{}
wg sync.WaitGroup
events chan Event
logger *fluent.Fluent
Config
}
// New creates a new billing client.
func NewClient(cfg Config) (*Client, error) {
host, port, err := net.SplitHostPort(cfg.IngesterHostPort)
if err != nil {
return nil, err
}
intPort, err := strconv.Atoi(port)
if err != nil {
return nil, err
}
logger, err := fluent.New(fluent.Config{
FluentPort: intPort,
FluentHost: host,
AsyncConnect: true,
MaxRetry: -1,
MarshalAsJSON: true,
})
if err != nil {
return nil, err
}
c := &Client{
stop: make(chan struct{}),
events: make(chan Event, cfg.MaxBufferedEvents),
logger: logger,
Config: cfg,
}
c.wg.Add(1)
go c.loop()
return c, nil
}
// AddAmounts writes unit increments into the billing system. If the call does
// not complete (due to a crash, etc), then data may or may not have been
// written successfully.
//
// Requests with the same `uniqueKey` can be retried indefinitely until they
// succeed, and the results will be deduped.
//
// `uniqueKey` must be set, and not blank. If in doubt, generate a uuid and set
// that as the uniqueKey. Consider that hashing the raw input data may not be
// good enough since identical data may be sent from the client multiple times.
//
// `internalInstanceID`, is *not* the external instance ID (e.g.
// "fluffy-bunny-47"), it is the numeric internal instance ID (e.g. "1234").
//
// `timestamp` is used to determine which time bucket the usage occurred in, it
// is included so that the result is independent of how long processing takes.
// Note, in the event of buffering this timestamp may *not* agree with when the
// charge will be billed to the customer.
//
// `amounts` is a map with all the various amounts we wish to charge the user
// for.
//
// `metadata` is a general dumping ground for other metadata you may wish to
// include for auditability. In general, be careful about the size of data put
// here. Prefer including a lookup address over whole data. For example,
// include a report id or s3 address instead of the information in the report.
func (c *Client) AddAmounts(uniqueKey, internalInstanceID string, timestamp time.Time, amounts Amounts, metadata map[string]string) error {
return instrument.TimeRequestHistogram(context.Background(), "Billing.AddAmounts", RequestDuration, func(_ context.Context) error {
if uniqueKey == "" {
return fmt.Errorf("billing units uniqueKey cannot be blank")
}
e := Event{
UniqueKey: uniqueKey,
InternalInstanceID: internalInstanceID,
OccurredAt: timestamp,
Amounts: amounts,
Metadata: metadata,
}
select {
case <-c.stop:
trackEvent("stopping", e)
return fmt.Errorf("stopping, discarding event: %v", e)
default:
}
select {
case c.events <- e: // Put event in the channel unless it is full
return nil
default:
// full
}
trackEvent("buffer_full", e)
return fmt.Errorf("reached billing event buffer limit (%d), discarding event: %v", c.MaxBufferedEvents, e)
})
}
func (c *Client) loop() {
defer c.wg.Done()
for done := false; !done; {
select {
case event := <-c.events:
c.post(event)
case <-c.stop:
done = true
}
}
// flush remaining events
for done := false; !done; {
select {
case event := <-c.events:
c.post(event)
default:
done = true
}
}
}
func (c *Client) post(e Event) error {
for {
var err error
for _, r := range e.toRecords() {
if err = c.logger.Post("billing", r); err != nil {
break
}
}
if err == nil {
trackEvent("success", e)
return nil
}
select {
case <-c.stop:
// We're quitting, no retries.
trackEvent("stopping", e)
log.Errorf("billing: failed to log event: %v: %v, stopping", e, err)
return err
default:
trackEvent("retrying", e)
log.Errorf("billing: failed to log event: %v: %v, retrying in %v", e, err, c.RetryDelay)
time.Sleep(c.RetryDelay)
}
}
}
func trackEvent(status string, e Event) {
EventsCounter.WithLabelValues(status).Inc()
for t, v := range e.Amounts {
AmountsCounter.WithLabelValues(status, string(t)).Add(float64(v))
}
}
// Close shuts down the client and attempts to flush remaining events.
func (c *Client) Close() error {
close(c.stop)
c.wg.Wait()
return c.logger.Close()
}

20
vendor/github.com/weaveworks/billing-client/config.go generated vendored Normal file
View File

@@ -0,0 +1,20 @@
package billing
import (
"flag"
"time"
)
// Config is the config for a billing client
type Config struct {
MaxBufferedEvents int
RetryDelay time.Duration
IngesterHostPort string
}
// RegisterFlags register the billing client flags with the main flag set
func (c *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&c.MaxBufferedEvents, "billing.max-buffered-events", 1024, "Maximum number of billing events to buffer in memory")
f.DurationVar(&c.RetryDelay, "billing.retry-delay", 500*time.Millisecond, "How often to retry sending events to the billing ingester.")
f.StringVar(&c.IngesterHostPort, "billing.ingester", "localhost:24225", "points to the billing ingester sidecar (should be on localhost)")
}

42
vendor/github.com/weaveworks/billing-client/event.go generated vendored Normal file
View File

@@ -0,0 +1,42 @@
package billing
import (
"time"
)
// Event is a record of some amount of billable usage for scope.
type Event struct {
UniqueKey string `json:"unique_key" msg:"unique_key"`
InternalInstanceID string `json:"internal_instance_id" msg:"internal_instance_id"`
OccurredAt time.Time `json:"occurred_at" msg:"occurred_at"`
Amounts Amounts `json:"amounts" msg:"amounts"`
Metadata map[string]string `json:"metadata" msg:"metadata"`
}
// msgpack (and therefore fluentd) requires the things we send to it to be
// map[string]interface{}, so we return them here, not a struct. :(
func (e Event) toRecords() []map[string]interface{} {
var records []map[string]interface{}
for t, v := range e.Amounts {
records = append(records, map[string]interface{}{
"unique_key": e.UniqueKey + ":" + string(t),
"internal_instance_id": e.InternalInstanceID,
"amount_type": string(t),
"amount_value": v,
"occurred_at": e.OccurredAt,
"metadata": e.Metadata,
})
}
return records
}
// AmountType is a type-cast of the enum for the diferent amount types
type AmountType string
const (
// ContainerSeconds is one of the billable metrics
ContainerSeconds AmountType = "container-seconds"
)
// Amounts is a map of amount billable metrics to their values
type Amounts map[AmountType]int64

8
vendor/manifest vendored
View File

@@ -1410,6 +1410,14 @@
"branch": "master",
"notests": true
},
{
"importpath": "github.com/weaveworks/billing-client",
"repository": "https://github.com/weaveworks/billing-client",
"vcs": "git",
"revision": "bf803baae0177625efc34699d79696644210a022",
"branch": "master",
"notests": true
},
{
"importpath": "github.com/weaveworks/common",
"repository": "https://github.com/weaveworks/common",