Files
weave-scope/app/multitenant/billing_emitter.go
Bryan Boreham aa0aab4094 multitenant: only count real hosts for billing
PR #3822 made the kubernetes probe send host nodes, so we need to
exclude them from counting when billing.
2021-05-20 14:29:39 +00:00

213 lines
5.9 KiB
Go

package multitenant
import (
"context"
"crypto/sha256"
"encoding/base64"
"flag"
"math"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
billing "github.com/weaveworks/billing-client"
"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/report"
)
// BillingEmitterConfig has everything we need to make a billing emitter
type BillingEmitterConfig struct {
Enabled bool
DefaultInterval time.Duration
UserIDer UserIDer
}
// RegisterFlags registers the billing emitter flags with the main flag set.
func (cfg *BillingEmitterConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "app.billing.enabled", false, "enable emitting billing info")
f.DurationVar(&cfg.DefaultInterval, "app.billing.default-publish-interval", 3*time.Second, "default publish interval to assume for reports")
}
// BillingEmitter is the billing emitter
type BillingEmitter struct {
app.Collector
BillingEmitterConfig
billing *billing.Client
sync.Mutex
intervalCache map[string]time.Duration
rounding map[string]float64
}
// NewBillingEmitter changes a new billing emitter which emits billing events
func NewBillingEmitter(upstream app.Collector, billingClient *billing.Client, cfg BillingEmitterConfig) (*BillingEmitter, error) {
return &BillingEmitter{
Collector: upstream,
billing: billingClient,
BillingEmitterConfig: cfg,
intervalCache: make(map[string]time.Duration),
rounding: make(map[string]float64),
}, nil
}
// Add implements app.Collector
func (e *BillingEmitter) Add(ctx context.Context, rep report.Report, buf []byte) error {
now := time.Now().UTC()
userID, err := e.UserIDer(ctx)
if err != nil {
// Underlying collector needs to get userID too, so it's OK to abort
// here. If this fails, so will underlying collector so no point
// proceeding.
return err
}
rowKey, colKey := calculateDynamoKeys(userID, now)
interval, nodes := e.scanReport(rep)
// Cache the last-known value of interval for this user, and use
// it if we didn't find one in this report.
e.Lock()
if interval != 0 {
e.intervalCache[userID] = interval
} else {
if lastKnown, found := e.intervalCache[userID]; found {
interval = lastKnown
} else {
interval = e.DefaultInterval
}
}
// Billing takes an integer number of seconds, so keep track of the amount lost to rounding
nodeSeconds := interval.Seconds()*float64(nodes) + e.rounding[userID]
rounding := nodeSeconds - math.Floor(nodeSeconds)
e.rounding[userID] = rounding
e.Unlock()
hasher := sha256.New()
hasher.Write(buf)
hash := "sha256:" + base64.URLEncoding.EncodeToString(hasher.Sum(nil))
weaveNetCount := 0
if hasWeaveNet(rep) {
weaveNetCount = 1
}
amounts := billing.Amounts{
billing.ContainerSeconds: int64(interval/time.Second) * int64(len(rep.Container.Nodes)),
billing.NodeSeconds: int64(nodeSeconds),
billing.WeaveNetSeconds: int64(interval/time.Second) * int64(weaveNetCount),
}
metadata := map[string]string{
"row_key": rowKey,
"col_key": colKey,
}
err = e.billing.AddAmounts(
hash,
userID,
now,
amounts,
metadata,
)
if err != nil {
// No return, because we want to proceed even if we fail to emit
// billing data, so that defects in the billing system don't break
// report collection. Just log the fact & carry on.
log.Errorf("Failed emitting billing data: %v", err)
}
return e.Collector.Add(ctx, rep, buf)
}
func commandParameter(cmd, flag string) (string, bool) {
i := strings.Index(cmd, flag)
if i != -1 {
// here we expect the command looks like `-foo=bar` or `-foo bar`
aft := strings.Fields(cmd[i+len(flag):])
if len(aft) > 0 && len(aft[0]) > 0 {
if aft[0][0] == '=' {
return aft[0][1:], true
}
return aft[0], true
}
}
return "", false
}
func intervalFromCommand(cmd string) string {
if strings.Contains(cmd, "scope") {
if publishInterval, ok := commandParameter(cmd, "probe.publish.interval"); ok {
// If spy interval is higher than publish interval, some reports will have no process data
if spyInterval, ok := commandParameter(cmd, "spy.interval"); ok {
pubDuration, err1 := time.ParseDuration(publishInterval)
spyDuration, err2 := time.ParseDuration(spyInterval)
if err1 == nil && err2 == nil && spyDuration > pubDuration {
return spyInterval
}
}
return publishInterval
}
}
return ""
}
// scanReport counts the nodes tries to find any custom report interval
// of this report. If it is malformed, or not set, it returns zero.
func (e *BillingEmitter) scanReport(r report.Report) (time.Duration, int) {
nHosts := 0
// We scan the host nodes looking for ones reported by a per-node probe;
// the Kubernetes cluster probe also makes host nodes but they only have a few fields set
for _, h := range r.Host.Nodes {
// Relying here on Uptime being something that changes in each report, hence will be in a delta report
if _, ok := h.Latest.Lookup(report.Uptime); ok {
nHosts++
}
}
if r.Window != 0 {
return r.Window, nHosts
}
var inter string
for _, c := range r.Container.Nodes {
if cmd, ok := c.Latest.Lookup(report.DockerContainerCommand); ok {
if inter = intervalFromCommand(cmd); inter != "" {
break
}
}
}
if inter == "" { // not found in containers: look in processes
for _, c := range r.Process.Nodes {
if cmd, ok := c.Latest.Lookup(report.Cmdline); ok {
if inter = intervalFromCommand(cmd); inter != "" {
break
}
}
}
}
if inter == "" {
return 0, nHosts
}
d, err := time.ParseDuration(inter)
if err != nil {
return 0, nHosts
}
return d, nHosts
}
// Tries to determine if this report came from a host running Weave Net
func hasWeaveNet(r report.Report) bool {
for _, n := range r.Overlay.Nodes {
overlayType, _ := report.ParseOverlayNodeID(n.ID)
if overlayType == report.WeaveOverlayPeerPrefix {
return true
}
}
return false
}
// Close shuts down the billing emitter and billing client flushing events.
func (e *BillingEmitter) Close() {
e.Collector.Close()
_ = e.billing.Close()
}