From e68ffd467e942c268598f260ff3fe88940144bb5 Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Thu, 9 Mar 2017 17:43:48 +0000 Subject: [PATCH] Add app/multitenant/billing_emitter to emit billing events --- app/multitenant/billing_emitter.go | 116 ++++++++++ prog/app.go | 25 +++ prog/main.go | 7 + .../weaveworks/billing-client/client.go | 206 ++++++++++++++++++ .../weaveworks/billing-client/config.go | 20 ++ .../weaveworks/billing-client/event.go | 42 ++++ vendor/manifest | 8 + 7 files changed, 424 insertions(+) create mode 100644 app/multitenant/billing_emitter.go create mode 100644 vendor/github.com/weaveworks/billing-client/client.go create mode 100644 vendor/github.com/weaveworks/billing-client/config.go create mode 100644 vendor/github.com/weaveworks/billing-client/event.go diff --git a/app/multitenant/billing_emitter.go b/app/multitenant/billing_emitter.go new file mode 100644 index 000000000..2cadb76c8 --- /dev/null +++ b/app/multitenant/billing_emitter.go @@ -0,0 +1,116 @@ +package multitenant + +import ( + "crypto/sha256" + "encoding/base64" + "flag" + "strings" + "time" + + log "github.com/Sirupsen/logrus" + billing "github.com/weaveworks/billing-client" + "golang.org/x/net/context" + + "github.com/weaveworks/scope/app" + "github.com/weaveworks/scope/report" +) + +// BillingEmitterConfig has everything we need to make a billing emitter +type BillingEmitterConfig struct { + Enabled bool + DefaultInterval time.Duration + UserIDer UserIDer +} + +// RegisterFlags registers the billing emitter flags with the main flag set. +func (cfg *BillingEmitterConfig) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.Enabled, "app.billing.enabled", false, "enable emitting billing info") + f.DurationVar(&cfg.DefaultInterval, "app.billing.default-publish-interval", 3*time.Second, "default publish interval to assume for reports") +} + +// BillingEmitter is the billing emitter +type BillingEmitter struct { + app.Collector + BillingEmitterConfig + billing *billing.Client +} + +// NewBillingEmitter changes a new billing emitter which emits billing events +func NewBillingEmitter(upstream app.Collector, billingClient *billing.Client, cfg BillingEmitterConfig) (*BillingEmitter, error) { + return &BillingEmitter{ + Collector: upstream, + billing: billingClient, + BillingEmitterConfig: cfg, + }, nil +} + +// Add implements app.Collector +func (e *BillingEmitter) Add(ctx context.Context, rep report.Report, buf []byte) error { + now := time.Now().UTC() + userID, err := e.UserIDer(ctx) + if err != nil { + return err + } + rowKey, colKey := calculateDynamoKeys(userID, now) + + containerCount := int64(len(rep.Container.Nodes)) + interval := e.reportInterval(rep) + hasher := sha256.New() + hasher.Write(buf) + hash := "sha256:" + base64.URLEncoding.EncodeToString(hasher.Sum(nil)) + amounts := billing.Amounts{billing.ContainerSeconds: int64(interval) * containerCount} + metadata := map[string]string{ + "row_key": rowKey, + "col_key": colKey, + } + + err = e.billing.AddAmounts( + hash, + userID, + now, + amounts, + metadata, + ) + if err != nil { + log.Errorf("Failed emitting billing data: %v", err) + return err + } + + return e.Collector.Add(ctx, rep, buf) +} + +// reportInterval tries to find the custom report interval of this report. If +// it is malformed, or not set, it returns false. +func (e *BillingEmitter) reportInterval(r report.Report) time.Duration { + var inter string + for _, c := range r.Process.Nodes { + cmd, ok := c.Latest.Lookup("cmdline") + if !ok { + continue + } + if strings.Contains(cmd, "scope-probe") && + strings.Contains(cmd, "probe.publish.interval") { + cmds := strings.SplitAfter(cmd, "probe.publish.interval") + aft := strings.Split(cmds[1], " ") + if aft[0] == "" { + inter = aft[1] + } else { + inter = aft[0][1:] + } + + } + } + if inter == "" { + return e.DefaultInterval + } + d, err := time.ParseDuration(inter) + if err != nil { + return e.DefaultInterval + } + return d +} + +// Close shuts down the billing emitter and billing client flushing events. +func (e *BillingEmitter) Close() error { + return e.billing.Close() +} diff --git a/prog/app.go b/prog/app.go index c15ba9c93..eaa80b219 100644 --- a/prog/app.go +++ b/prog/app.go @@ -21,6 +21,7 @@ import ( "github.com/weaveworks/go-checkpoint" "github.com/weaveworks/weave/common" + billing "github.com/weaveworks/billing-client" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/network" "github.com/weaveworks/scope/app" @@ -45,6 +46,7 @@ var ( func init() { prometheus.MustRegister(requestDuration) + billing.MustRegisterMetrics() } // Router creates the mux for all the various app components. @@ -155,6 +157,19 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo return nil, fmt.Errorf("Invalid collector '%s'", collectorURL) } +func emitterFactory(collector app.Collector, clientCfg billing.Config, userIDer multitenant.UserIDer, emitterCfg multitenant.BillingEmitterConfig) (*multitenant.BillingEmitter, error) { + billingClient, err := billing.NewClient(clientCfg) + if err != nil { + return nil, err + } + emitterCfg.UserIDer = userIDer + return multitenant.NewBillingEmitter( + collector, + billingClient, + emitterCfg, + ) +} + func controlRouterFactory(userIDer multitenant.UserIDer, controlRouterURL string) (app.ControlRouter, error) { if controlRouterURL == "local" { return app.NewLocalControlRouter(), nil @@ -230,6 +245,16 @@ func appMain(flags appFlags) { return } + if flags.BillingEmitterConfig.Enabled { + billingEmitter, err := emitterFactory(collector, flags.BillingClientConfig, userIDer, flags.BillingEmitterConfig) + if err != nil { + log.Fatalf("Error creating emitter: %v", err) + return + } + defer billingEmitter.Close() + collector = billingEmitter + } + controlRouter, err := controlRouterFactory(userIDer, flags.controlRouterURL) if err != nil { log.Fatalf("Error creating control router: %v", err) diff --git a/prog/main.go b/prog/main.go index 1efe1cf93..4d5c9cc24 100644 --- a/prog/main.go +++ b/prog/main.go @@ -14,7 +14,9 @@ import ( log "github.com/Sirupsen/logrus" + billing "github.com/weaveworks/billing-client" "github.com/weaveworks/scope/app" + "github.com/weaveworks/scope/app/multitenant" "github.com/weaveworks/scope/common/xfer" "github.com/weaveworks/scope/probe/appclient" "github.com/weaveworks/scope/probe/host" @@ -151,6 +153,9 @@ type appFlags struct { awsCreateTables bool consulInf string + + multitenant.BillingEmitterConfig + BillingClientConfig billing.Config } type containerLabelFiltersFlag struct { @@ -352,6 +357,8 @@ func main() { flag.BoolVar(&flags.app.awsCreateTables, "app.aws.create.tables", false, "Create the tables in DynamoDB") flag.StringVar(&flags.app.consulInf, "app.consul.inf", "", "The interface who's address I should advertise myself under in consul") + flags.app.BillingEmitterConfig.RegisterFlags(flag.CommandLine) + flags.app.BillingClientConfig.RegisterFlags(flag.CommandLine) flag.Parse() app.AddContainerFilters(append(containerLabelFilterFlags.apiTopologyOptions, containerLabelFilterFlagsExclude.apiTopologyOptions...)...) diff --git a/vendor/github.com/weaveworks/billing-client/client.go b/vendor/github.com/weaveworks/billing-client/client.go new file mode 100644 index 000000000..366ca1f68 --- /dev/null +++ b/vendor/github.com/weaveworks/billing-client/client.go @@ -0,0 +1,206 @@ +package billing + +import ( + "fmt" + "net" + "strconv" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/fluent/fluent-logger-golang/fluent" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/net/context" + + "github.com/weaveworks/common/instrument" +) + +var ( + // RequestDuration is the duration of billing client requests + RequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "billing_client", + Name: "request_duration_seconds", + Help: "Time in seconds spent emitting billing info.", + Buckets: prometheus.DefBuckets, + }, []string{"method", "status_code"}) + // EventsCounter is the count of billing events + EventsCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "billing_client", + Name: "events", + Help: "Number of billing events", + }, []string{"status"}) + // AmountsCounter is the total of the billing amounts + AmountsCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "billing_client", + Name: "amounts", + Help: "Number and type of billing amounts", + }, []string{"status", "amount_type"}) +) + +// MustRegisterMetrics is a convenience function for registering all the metrics from this package +func MustRegisterMetrics() { + prometheus.MustRegister(RequestDuration) + prometheus.MustRegister(EventsCounter) + prometheus.MustRegister(AmountsCounter) +} + +// Client is a billing client for sending usage information to the billing system. +type Client struct { + stop chan struct{} + wg sync.WaitGroup + events chan Event + logger *fluent.Fluent + Config +} + +// New creates a new billing client. +func NewClient(cfg Config) (*Client, error) { + host, port, err := net.SplitHostPort(cfg.IngesterHostPort) + if err != nil { + return nil, err + } + intPort, err := strconv.Atoi(port) + if err != nil { + return nil, err + } + logger, err := fluent.New(fluent.Config{ + FluentPort: intPort, + FluentHost: host, + AsyncConnect: true, + MaxRetry: -1, + MarshalAsJSON: true, + }) + if err != nil { + return nil, err + } + + c := &Client{ + stop: make(chan struct{}), + events: make(chan Event, cfg.MaxBufferedEvents), + logger: logger, + Config: cfg, + } + c.wg.Add(1) + go c.loop() + return c, nil +} + +// AddAmounts writes unit increments into the billing system. If the call does +// not complete (due to a crash, etc), then data may or may not have been +// written successfully. +// +// Requests with the same `uniqueKey` can be retried indefinitely until they +// succeed, and the results will be deduped. +// +// `uniqueKey` must be set, and not blank. If in doubt, generate a uuid and set +// that as the uniqueKey. Consider that hashing the raw input data may not be +// good enough since identical data may be sent from the client multiple times. +// +// `internalInstanceID`, is *not* the external instance ID (e.g. +// "fluffy-bunny-47"), it is the numeric internal instance ID (e.g. "1234"). +// +// `timestamp` is used to determine which time bucket the usage occurred in, it +// is included so that the result is independent of how long processing takes. +// Note, in the event of buffering this timestamp may *not* agree with when the +// charge will be billed to the customer. +// +// `amounts` is a map with all the various amounts we wish to charge the user +// for. +// +// `metadata` is a general dumping ground for other metadata you may wish to +// include for auditability. In general, be careful about the size of data put +// here. Prefer including a lookup address over whole data. For example, +// include a report id or s3 address instead of the information in the report. +func (c *Client) AddAmounts(uniqueKey, internalInstanceID string, timestamp time.Time, amounts Amounts, metadata map[string]string) error { + return instrument.TimeRequestHistogram(context.Background(), "Billing.AddAmounts", RequestDuration, func(_ context.Context) error { + if uniqueKey == "" { + return fmt.Errorf("billing units uniqueKey cannot be blank") + } + + e := Event{ + UniqueKey: uniqueKey, + InternalInstanceID: internalInstanceID, + OccurredAt: timestamp, + Amounts: amounts, + Metadata: metadata, + } + + select { + case <-c.stop: + trackEvent("stopping", e) + return fmt.Errorf("stopping, discarding event: %v", e) + default: + } + + select { + case c.events <- e: // Put event in the channel unless it is full + return nil + default: + // full + } + trackEvent("buffer_full", e) + return fmt.Errorf("reached billing event buffer limit (%d), discarding event: %v", c.MaxBufferedEvents, e) + }) +} + +func (c *Client) loop() { + defer c.wg.Done() + for done := false; !done; { + select { + case event := <-c.events: + c.post(event) + case <-c.stop: + done = true + } + } + + // flush remaining events + for done := false; !done; { + select { + case event := <-c.events: + c.post(event) + default: + done = true + } + } +} + +func (c *Client) post(e Event) error { + for { + var err error + for _, r := range e.toRecords() { + if err = c.logger.Post("billing", r); err != nil { + break + } + } + if err == nil { + trackEvent("success", e) + return nil + } + select { + case <-c.stop: + // We're quitting, no retries. + trackEvent("stopping", e) + log.Errorf("billing: failed to log event: %v: %v, stopping", e, err) + return err + default: + trackEvent("retrying", e) + log.Errorf("billing: failed to log event: %v: %v, retrying in %v", e, err, c.RetryDelay) + time.Sleep(c.RetryDelay) + } + } +} + +func trackEvent(status string, e Event) { + EventsCounter.WithLabelValues(status).Inc() + for t, v := range e.Amounts { + AmountsCounter.WithLabelValues(status, string(t)).Add(float64(v)) + } +} + +// Close shuts down the client and attempts to flush remaining events. +func (c *Client) Close() error { + close(c.stop) + c.wg.Wait() + return c.logger.Close() +} diff --git a/vendor/github.com/weaveworks/billing-client/config.go b/vendor/github.com/weaveworks/billing-client/config.go new file mode 100644 index 000000000..16e7b07d5 --- /dev/null +++ b/vendor/github.com/weaveworks/billing-client/config.go @@ -0,0 +1,20 @@ +package billing + +import ( + "flag" + "time" +) + +// Config is the config for a billing client +type Config struct { + MaxBufferedEvents int + RetryDelay time.Duration + IngesterHostPort string +} + +// RegisterFlags register the billing client flags with the main flag set +func (c *Config) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&c.MaxBufferedEvents, "billing.max-buffered-events", 1024, "Maximum number of billing events to buffer in memory") + f.DurationVar(&c.RetryDelay, "billing.retry-delay", 500*time.Millisecond, "How often to retry sending events to the billing ingester.") + f.StringVar(&c.IngesterHostPort, "billing.ingester", "localhost:24225", "points to the billing ingester sidecar (should be on localhost)") +} diff --git a/vendor/github.com/weaveworks/billing-client/event.go b/vendor/github.com/weaveworks/billing-client/event.go new file mode 100644 index 000000000..c840cb99f --- /dev/null +++ b/vendor/github.com/weaveworks/billing-client/event.go @@ -0,0 +1,42 @@ +package billing + +import ( + "time" +) + +// Event is a record of some amount of billable usage for scope. +type Event struct { + UniqueKey string `json:"unique_key" msg:"unique_key"` + InternalInstanceID string `json:"internal_instance_id" msg:"internal_instance_id"` + OccurredAt time.Time `json:"occurred_at" msg:"occurred_at"` + Amounts Amounts `json:"amounts" msg:"amounts"` + Metadata map[string]string `json:"metadata" msg:"metadata"` +} + +// msgpack (and therefore fluentd) requires the things we send to it to be +// map[string]interface{}, so we return them here, not a struct. :( +func (e Event) toRecords() []map[string]interface{} { + var records []map[string]interface{} + for t, v := range e.Amounts { + records = append(records, map[string]interface{}{ + "unique_key": e.UniqueKey + ":" + string(t), + "internal_instance_id": e.InternalInstanceID, + "amount_type": string(t), + "amount_value": v, + "occurred_at": e.OccurredAt, + "metadata": e.Metadata, + }) + } + return records +} + +// AmountType is a type-cast of the enum for the diferent amount types +type AmountType string + +const ( + // ContainerSeconds is one of the billable metrics + ContainerSeconds AmountType = "container-seconds" +) + +// Amounts is a map of amount billable metrics to their values +type Amounts map[AmountType]int64 diff --git a/vendor/manifest b/vendor/manifest index 3d6641a86..5867d83b2 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -1410,6 +1410,14 @@ "branch": "master", "notests": true }, + { + "importpath": "github.com/weaveworks/billing-client", + "repository": "https://github.com/weaveworks/billing-client", + "vcs": "git", + "revision": "bf803baae0177625efc34699d79696644210a022", + "branch": "master", + "notests": true + }, { "importpath": "github.com/weaveworks/common", "repository": "https://github.com/weaveworks/common",