mirror of
https://github.com/weaveworks/scope.git
synced 2026-02-14 18:09:59 +00:00
PR #3822 made the kubernetes probe send host nodes, so we need to exclude them from counting when billing.
213 lines
5.9 KiB
Go
213 lines
5.9 KiB
Go
package multitenant
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"flag"
|
|
"math"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
billing "github.com/weaveworks/billing-client"
|
|
|
|
"github.com/weaveworks/scope/app"
|
|
"github.com/weaveworks/scope/report"
|
|
)
|
|
|
|
// BillingEmitterConfig has everything we need to make a billing emitter
|
|
type BillingEmitterConfig struct {
|
|
Enabled bool
|
|
DefaultInterval time.Duration
|
|
UserIDer UserIDer
|
|
}
|
|
|
|
// RegisterFlags registers the billing emitter flags with the main flag set.
|
|
func (cfg *BillingEmitterConfig) RegisterFlags(f *flag.FlagSet) {
|
|
f.BoolVar(&cfg.Enabled, "app.billing.enabled", false, "enable emitting billing info")
|
|
f.DurationVar(&cfg.DefaultInterval, "app.billing.default-publish-interval", 3*time.Second, "default publish interval to assume for reports")
|
|
}
|
|
|
|
// BillingEmitter is the billing emitter
|
|
type BillingEmitter struct {
|
|
app.Collector
|
|
BillingEmitterConfig
|
|
billing *billing.Client
|
|
|
|
sync.Mutex
|
|
intervalCache map[string]time.Duration
|
|
rounding map[string]float64
|
|
}
|
|
|
|
// NewBillingEmitter changes a new billing emitter which emits billing events
|
|
func NewBillingEmitter(upstream app.Collector, billingClient *billing.Client, cfg BillingEmitterConfig) (*BillingEmitter, error) {
|
|
return &BillingEmitter{
|
|
Collector: upstream,
|
|
billing: billingClient,
|
|
BillingEmitterConfig: cfg,
|
|
intervalCache: make(map[string]time.Duration),
|
|
rounding: make(map[string]float64),
|
|
}, nil
|
|
}
|
|
|
|
// Add implements app.Collector
|
|
func (e *BillingEmitter) Add(ctx context.Context, rep report.Report, buf []byte) error {
|
|
now := time.Now().UTC()
|
|
userID, err := e.UserIDer(ctx)
|
|
if err != nil {
|
|
// Underlying collector needs to get userID too, so it's OK to abort
|
|
// here. If this fails, so will underlying collector so no point
|
|
// proceeding.
|
|
return err
|
|
}
|
|
rowKey, colKey := calculateDynamoKeys(userID, now)
|
|
|
|
interval, nodes := e.scanReport(rep)
|
|
// Cache the last-known value of interval for this user, and use
|
|
// it if we didn't find one in this report.
|
|
e.Lock()
|
|
if interval != 0 {
|
|
e.intervalCache[userID] = interval
|
|
} else {
|
|
if lastKnown, found := e.intervalCache[userID]; found {
|
|
interval = lastKnown
|
|
} else {
|
|
interval = e.DefaultInterval
|
|
}
|
|
}
|
|
// Billing takes an integer number of seconds, so keep track of the amount lost to rounding
|
|
nodeSeconds := interval.Seconds()*float64(nodes) + e.rounding[userID]
|
|
rounding := nodeSeconds - math.Floor(nodeSeconds)
|
|
e.rounding[userID] = rounding
|
|
e.Unlock()
|
|
|
|
hasher := sha256.New()
|
|
hasher.Write(buf)
|
|
hash := "sha256:" + base64.URLEncoding.EncodeToString(hasher.Sum(nil))
|
|
|
|
weaveNetCount := 0
|
|
if hasWeaveNet(rep) {
|
|
weaveNetCount = 1
|
|
}
|
|
|
|
amounts := billing.Amounts{
|
|
billing.ContainerSeconds: int64(interval/time.Second) * int64(len(rep.Container.Nodes)),
|
|
billing.NodeSeconds: int64(nodeSeconds),
|
|
billing.WeaveNetSeconds: int64(interval/time.Second) * int64(weaveNetCount),
|
|
}
|
|
metadata := map[string]string{
|
|
"row_key": rowKey,
|
|
"col_key": colKey,
|
|
}
|
|
|
|
err = e.billing.AddAmounts(
|
|
hash,
|
|
userID,
|
|
now,
|
|
amounts,
|
|
metadata,
|
|
)
|
|
if err != nil {
|
|
// No return, because we want to proceed even if we fail to emit
|
|
// billing data, so that defects in the billing system don't break
|
|
// report collection. Just log the fact & carry on.
|
|
log.Errorf("Failed emitting billing data: %v", err)
|
|
}
|
|
|
|
return e.Collector.Add(ctx, rep, buf)
|
|
}
|
|
|
|
func commandParameter(cmd, flag string) (string, bool) {
|
|
i := strings.Index(cmd, flag)
|
|
if i != -1 {
|
|
// here we expect the command looks like `-foo=bar` or `-foo bar`
|
|
aft := strings.Fields(cmd[i+len(flag):])
|
|
if len(aft) > 0 && len(aft[0]) > 0 {
|
|
if aft[0][0] == '=' {
|
|
return aft[0][1:], true
|
|
}
|
|
return aft[0], true
|
|
}
|
|
}
|
|
return "", false
|
|
}
|
|
|
|
func intervalFromCommand(cmd string) string {
|
|
if strings.Contains(cmd, "scope") {
|
|
if publishInterval, ok := commandParameter(cmd, "probe.publish.interval"); ok {
|
|
// If spy interval is higher than publish interval, some reports will have no process data
|
|
if spyInterval, ok := commandParameter(cmd, "spy.interval"); ok {
|
|
pubDuration, err1 := time.ParseDuration(publishInterval)
|
|
spyDuration, err2 := time.ParseDuration(spyInterval)
|
|
if err1 == nil && err2 == nil && spyDuration > pubDuration {
|
|
return spyInterval
|
|
}
|
|
}
|
|
return publishInterval
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// scanReport counts the nodes tries to find any custom report interval
|
|
// of this report. If it is malformed, or not set, it returns zero.
|
|
func (e *BillingEmitter) scanReport(r report.Report) (time.Duration, int) {
|
|
nHosts := 0
|
|
// We scan the host nodes looking for ones reported by a per-node probe;
|
|
// the Kubernetes cluster probe also makes host nodes but they only have a few fields set
|
|
for _, h := range r.Host.Nodes {
|
|
// Relying here on Uptime being something that changes in each report, hence will be in a delta report
|
|
if _, ok := h.Latest.Lookup(report.Uptime); ok {
|
|
nHosts++
|
|
}
|
|
}
|
|
if r.Window != 0 {
|
|
return r.Window, nHosts
|
|
}
|
|
var inter string
|
|
for _, c := range r.Container.Nodes {
|
|
if cmd, ok := c.Latest.Lookup(report.DockerContainerCommand); ok {
|
|
if inter = intervalFromCommand(cmd); inter != "" {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if inter == "" { // not found in containers: look in processes
|
|
for _, c := range r.Process.Nodes {
|
|
if cmd, ok := c.Latest.Lookup(report.Cmdline); ok {
|
|
if inter = intervalFromCommand(cmd); inter != "" {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if inter == "" {
|
|
return 0, nHosts
|
|
}
|
|
d, err := time.ParseDuration(inter)
|
|
if err != nil {
|
|
return 0, nHosts
|
|
}
|
|
return d, nHosts
|
|
}
|
|
|
|
// Tries to determine if this report came from a host running Weave Net
|
|
func hasWeaveNet(r report.Report) bool {
|
|
for _, n := range r.Overlay.Nodes {
|
|
overlayType, _ := report.ParseOverlayNodeID(n.ID)
|
|
if overlayType == report.WeaveOverlayPeerPrefix {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// Close shuts down the billing emitter and billing client flushing events.
|
|
func (e *BillingEmitter) Close() {
|
|
e.Collector.Close()
|
|
_ = e.billing.Close()
|
|
}
|