mirror of
https://github.com/weaveworks/scope.git
synced 2026-02-14 10:00:13 +00:00
Add multitenant-live collector
For when we want to collect reports in memory, but not save them to store. Extract this functionality out of awsCollector to create new liveCollector object.
This commit is contained in:
@@ -15,7 +15,6 @@ import (
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||
"github.com/bluele/gcache"
|
||||
"github.com/nats-io/nats"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
otlog "github.com/opentracing/opentracing-go/log"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -110,11 +109,6 @@ func registerAWSCollectorMetrics() {
|
||||
prometheus.MustRegister(dynamoValueSize)
|
||||
prometheus.MustRegister(inProcessCacheRequests)
|
||||
prometheus.MustRegister(inProcessCacheHits)
|
||||
prometheus.MustRegister(reportSizeHistogram)
|
||||
prometheus.MustRegister(reportsPerUser)
|
||||
prometheus.MustRegister(reportSizePerUser)
|
||||
prometheus.MustRegister(topologiesDropped)
|
||||
prometheus.MustRegister(natsRequests)
|
||||
flushDuration.Register()
|
||||
}
|
||||
|
||||
@@ -133,32 +127,16 @@ type ReportStore interface {
|
||||
|
||||
// AWSCollectorConfig has everything we need to make an AWS collector.
|
||||
type AWSCollectorConfig struct {
|
||||
UserIDer UserIDer
|
||||
DynamoDBConfig *aws.Config
|
||||
DynamoTable string
|
||||
S3Store *S3Store
|
||||
StoreInterval time.Duration
|
||||
NatsHost string
|
||||
MemcacheClient *MemcacheClient
|
||||
Window time.Duration
|
||||
MaxTopNodes int
|
||||
CollectorAddr string
|
||||
}
|
||||
|
||||
type awsCollector struct {
|
||||
cfg AWSCollectorConfig
|
||||
liveCollector
|
||||
awsCfg AWSCollectorConfig
|
||||
db *dynamodb.DynamoDB
|
||||
merger app.Merger
|
||||
inProcess inProcessStore
|
||||
pending sync.Map
|
||||
ticker *time.Ticker
|
||||
|
||||
nats *nats.Conn
|
||||
waitersLock sync.Mutex
|
||||
waiters map[watchKey]*nats.Subscription
|
||||
|
||||
collectors []string
|
||||
lastResolved time.Time
|
||||
}
|
||||
|
||||
// Shortcut reports:
|
||||
@@ -177,45 +155,25 @@ type watchKey struct {
|
||||
|
||||
// NewAWSCollector the elastic reaper of souls
|
||||
// https://github.com/aws/aws-sdk-go/wiki/common-examples
|
||||
func NewAWSCollector(config AWSCollectorConfig) (AWSCollector, error) {
|
||||
func NewAWSCollector(liveConfig LiveCollectorConfig, config AWSCollectorConfig) (AWSCollector, error) {
|
||||
registerAWSCollectorMetricsOnce.Do(registerAWSCollectorMetrics)
|
||||
var nc *nats.Conn
|
||||
if config.NatsHost != "" {
|
||||
if config.MemcacheClient == nil {
|
||||
return nil, fmt.Errorf("Must supply memcache client when using nats")
|
||||
}
|
||||
var err error
|
||||
nc, err = nats.Connect(config.NatsHost)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// (window * report rate) * number of hosts per user * number of users
|
||||
reportCacheSize := (int(config.Window.Seconds()) / 3) * 10 * 5
|
||||
reportCacheSize := (int(liveConfig.Window.Seconds()) / 3) * 10 * 5
|
||||
c := &awsCollector{
|
||||
cfg: config,
|
||||
db: dynamodb.New(session.New(config.DynamoDBConfig)),
|
||||
merger: app.NewFastMerger(),
|
||||
inProcess: newInProcessStore(reportCacheSize, config.Window+reportQuantisationInterval),
|
||||
nats: nc,
|
||||
waiters: map[watchKey]*nats.Subscription{},
|
||||
liveCollector: liveCollector{cfg: liveConfig},
|
||||
awsCfg: config,
|
||||
db: dynamodb.New(session.New(config.DynamoDBConfig)),
|
||||
inProcess: newInProcessStore(reportCacheSize, liveConfig.Window+reportQuantisationInterval),
|
||||
}
|
||||
|
||||
// If given a StoreInterval we will be storing periodically; if not we only answer queries
|
||||
if c.isCollector() {
|
||||
c.ticker = time.NewTicker(config.StoreInterval)
|
||||
go c.flushLoop()
|
||||
err := c.liveCollector.init()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.tickCallbacks = append(c.tickCallbacks, c.flushPending)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *awsCollector) flushLoop() {
|
||||
for range c.ticker.C {
|
||||
c.flushPending(context.Background())
|
||||
}
|
||||
}
|
||||
|
||||
// Range over all users (instances) that have pending reports and send to store
|
||||
func (c *awsCollector) flushPending(ctx context.Context) {
|
||||
instrument.CollectedRequest(ctx, "FlushPending", flushDuration, nil, func(ctx context.Context) error {
|
||||
@@ -245,14 +203,7 @@ func (c *awsCollector) flushPending(ctx context.Context) {
|
||||
entry := value.(*pendingEntry)
|
||||
|
||||
entry.Lock()
|
||||
rpt := entry.report
|
||||
entry.report = nil
|
||||
if entry.older == nil {
|
||||
entry.older = make([]*report.Report, c.cfg.Window/c.cfg.StoreInterval)
|
||||
} else {
|
||||
copy(entry.older[1:], entry.older) // move everything down one
|
||||
}
|
||||
entry.older[0] = rpt
|
||||
rpt := entry.older[0]
|
||||
entry.Unlock()
|
||||
|
||||
if rpt != nil {
|
||||
@@ -274,7 +225,7 @@ func (c *awsCollector) flushPending(ctx context.Context) {
|
||||
|
||||
// Close will flush pending data
|
||||
func (c *awsCollector) Close() {
|
||||
c.ticker.Stop() // note this doesn't close the chan; goroutine keeps running
|
||||
c.liveCollector.Close()
|
||||
c.flushPending(context.Background())
|
||||
}
|
||||
|
||||
@@ -288,13 +239,13 @@ func (c *awsCollector) CreateTables() error {
|
||||
return err
|
||||
}
|
||||
for _, s := range resp.TableNames {
|
||||
if *s == c.cfg.DynamoTable {
|
||||
if *s == c.awsCfg.DynamoTable {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
params := &dynamodb.CreateTableInput{
|
||||
TableName: aws.String(c.cfg.DynamoTable),
|
||||
TableName: aws.String(c.awsCfg.DynamoTable),
|
||||
AttributeDefinitions: []*dynamodb.AttributeDefinition{
|
||||
{
|
||||
AttributeName: aws.String(hourField),
|
||||
@@ -325,7 +276,7 @@ func (c *awsCollector) CreateTables() error {
|
||||
WriteCapacityUnits: aws.Int64(5),
|
||||
},
|
||||
}
|
||||
log.Infof("Creating table %s", c.cfg.DynamoTable)
|
||||
log.Infof("Creating table %s", c.awsCfg.DynamoTable)
|
||||
_, err = c.db.CreateTable(params)
|
||||
return err
|
||||
}
|
||||
@@ -342,7 +293,7 @@ func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row
|
||||
err := instrument.TimeRequestHistogram(ctx, "DynamoDB.Query", dynamoRequestDuration, func(_ context.Context) error {
|
||||
var err error
|
||||
resp, err = c.db.Query(&dynamodb.QueryInput{
|
||||
TableName: aws.String(c.cfg.DynamoTable),
|
||||
TableName: aws.String(c.awsCfg.DynamoTable),
|
||||
KeyConditions: map[string]*dynamodb.Condition{
|
||||
hourField: {
|
||||
AttributeValueList: []*dynamodb.AttributeValue{
|
||||
@@ -423,7 +374,7 @@ func (c *awsCollector) getReports(ctx context.Context, userid string, reportKeys
|
||||
if c.cfg.MemcacheClient != nil {
|
||||
stores = append(stores, c.cfg.MemcacheClient)
|
||||
}
|
||||
stores = append(stores, c.cfg.S3Store)
|
||||
stores = append(stores, c.awsCfg.S3Store)
|
||||
|
||||
var reports []report.Report
|
||||
for _, store := range stores {
|
||||
@@ -451,23 +402,6 @@ func (c *awsCollector) getReports(ctx context.Context, userid string, reportKeys
|
||||
return reports, nil
|
||||
}
|
||||
|
||||
// process a report from a probe which may be at an older version or overloaded
|
||||
func (c *awsCollector) massageReport(userid string, report report.Report) report.Report {
|
||||
if c.cfg.MaxTopNodes > 0 {
|
||||
max := c.cfg.MaxTopNodes
|
||||
if len(report.Host.Nodes) > 1 {
|
||||
max = max * len(report.Host.Nodes) // higher limit for merged reports
|
||||
}
|
||||
var dropped []string
|
||||
report, dropped = report.DropTopologiesOver(max)
|
||||
for _, name := range dropped {
|
||||
topologiesDropped.WithLabelValues(userid, name).Inc()
|
||||
}
|
||||
}
|
||||
report = report.Upgrade()
|
||||
return report
|
||||
}
|
||||
|
||||
// If we are running as a Query service, fetch data and merge into a report
|
||||
// If we are running as a Collector and the request is for live data, merge in-memory data and return
|
||||
func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) {
|
||||
@@ -583,7 +517,7 @@ func (c *awsCollector) HasHistoricReports() bool {
|
||||
// AdminSummary returns a string with some internal information about
|
||||
// the report, which may be useful to troubleshoot.
|
||||
func (c *awsCollector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.Report")
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.AdminSummary")
|
||||
defer span.Finish()
|
||||
userid, err := c.cfg.UserIDer(ctx)
|
||||
if err != nil {
|
||||
@@ -625,7 +559,7 @@ func calculateReportKeys(userid string, now time.Time) (string, string, string)
|
||||
|
||||
func (c *awsCollector) persistReport(ctx context.Context, userid, rowKey, colKey, reportKey string, buf []byte) error {
|
||||
// Put in S3 and cache before index, so it is fetchable before it is discoverable
|
||||
reportSize, err := c.cfg.S3Store.StoreReportBytes(ctx, reportKey, buf)
|
||||
reportSize, err := c.awsCfg.S3Store.StoreReportBytes(ctx, reportKey, buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -674,7 +608,7 @@ func (c *awsCollector) putItemInDynamo(rowKey, colKey, reportKey string) (*dynam
|
||||
)
|
||||
for {
|
||||
resp, err = c.db.PutItem(&dynamodb.PutItemInput{
|
||||
TableName: aws.String(c.cfg.DynamoTable),
|
||||
TableName: aws.String(c.awsCfg.DynamoTable),
|
||||
Item: map[string]*dynamodb.AttributeValue{
|
||||
hourField: {
|
||||
S: aws.String(rowKey),
|
||||
@@ -701,106 +635,6 @@ func (c *awsCollector) putItemInDynamo(rowKey, colKey, reportKey string) (*dynam
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) error {
|
||||
userid, err := c.cfg.UserIDer(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if c.cfg.StoreInterval == 0 {
|
||||
return fmt.Errorf("--app.collector.store-interval must be non-zero")
|
||||
}
|
||||
|
||||
// Shortcut reports are published to nats but not persisted -
|
||||
// we'll get a full report from the same probe in a few seconds
|
||||
if rep.Shortcut {
|
||||
if c.nats != nil {
|
||||
_, _, reportKey := calculateReportKeys(userid, time.Now())
|
||||
_, err = c.cfg.MemcacheClient.StoreReportBytes(ctx, reportKey, buf)
|
||||
if err != nil {
|
||||
log.Warningf("Could not store shortcut %v in memcache: %v", reportKey, err)
|
||||
// No point publishing on nats if cache store failed
|
||||
return nil
|
||||
}
|
||||
err := c.nats.Publish(userid, []byte(reportKey))
|
||||
natsRequests.WithLabelValues("Publish", instrument.ErrorCode(err)).Add(1)
|
||||
if err != nil {
|
||||
log.Errorf("Error sending shortcut report: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
rep = c.massageReport(userid, rep)
|
||||
c.addToLive(ctx, userid, rep)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *awsCollector) WaitOn(ctx context.Context, waiter chan struct{}) {
|
||||
userid, err := c.cfg.UserIDer(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("Error getting user id in WaitOn: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if c.nats == nil {
|
||||
return
|
||||
}
|
||||
|
||||
sub, err := c.nats.SubscribeSync(userid)
|
||||
natsRequests.WithLabelValues("SubscribeSync", instrument.ErrorCode(err)).Add(1)
|
||||
if err != nil {
|
||||
log.Errorf("Error subscribing for shortcuts: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
c.waitersLock.Lock()
|
||||
c.waiters[watchKey{userid, waiter}] = sub
|
||||
c.waitersLock.Unlock()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
_, err := sub.NextMsg(natsTimeout)
|
||||
if err == nats.ErrTimeout {
|
||||
continue
|
||||
}
|
||||
natsRequests.WithLabelValues("NextMsg", instrument.ErrorCode(err)).Add(1)
|
||||
if err != nil {
|
||||
log.Debugf("NextMsg error: %v", err)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case waiter <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *awsCollector) UnWait(ctx context.Context, waiter chan struct{}) {
|
||||
userid, err := c.cfg.UserIDer(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("Error getting user id in WaitOn: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if c.nats == nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.waitersLock.Lock()
|
||||
key := watchKey{userid, waiter}
|
||||
sub := c.waiters[key]
|
||||
delete(c.waiters, key)
|
||||
c.waitersLock.Unlock()
|
||||
|
||||
err = sub.Unsubscribe()
|
||||
natsRequests.WithLabelValues("Unsubscribe", instrument.ErrorCode(err)).Add(1)
|
||||
if err != nil {
|
||||
log.Errorf("Error on unsubscribe: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
type inProcessStore struct {
|
||||
cache gcache.Cache
|
||||
}
|
||||
|
||||
@@ -14,14 +14,55 @@ import (
|
||||
|
||||
"context"
|
||||
|
||||
"github.com/nats-io/nats"
|
||||
"github.com/opentracing-contrib/go-stdlib/nethttp"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
otlog "github.com/opentracing/opentracing-go/log"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/weaveworks/common/instrument"
|
||||
"github.com/weaveworks/common/user"
|
||||
"github.com/weaveworks/scope/app"
|
||||
"github.com/weaveworks/scope/report"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func registerLiveCollectorMetrics() {
|
||||
prometheus.MustRegister(reportSizeHistogram)
|
||||
prometheus.MustRegister(reportsPerUser)
|
||||
prometheus.MustRegister(reportSizePerUser)
|
||||
prometheus.MustRegister(topologiesDropped)
|
||||
prometheus.MustRegister(natsRequests)
|
||||
}
|
||||
|
||||
var registerLiveCollectorMetricsOnce sync.Once
|
||||
|
||||
// LiveCollectorConfig has everything we need to make a collector for live multitenant data.
|
||||
type LiveCollectorConfig struct {
|
||||
UserIDer UserIDer
|
||||
NatsHost string
|
||||
MemcacheClient *MemcacheClient
|
||||
Window time.Duration
|
||||
TickInterval time.Duration
|
||||
MaxTopNodes int
|
||||
CollectorAddr string
|
||||
}
|
||||
|
||||
type liveCollector struct {
|
||||
cfg LiveCollectorConfig
|
||||
merger app.Merger
|
||||
pending sync.Map
|
||||
ticker *time.Ticker
|
||||
tickCallbacks []func(context.Context)
|
||||
|
||||
nats *nats.Conn
|
||||
waitersLock sync.Mutex
|
||||
waiters map[watchKey]*nats.Subscription
|
||||
|
||||
collectors []string
|
||||
lastResolved time.Time
|
||||
}
|
||||
|
||||
// if StoreInterval is set, reports are merged into here and held until flushed to store
|
||||
type pendingEntry struct {
|
||||
sync.Mutex
|
||||
@@ -29,9 +70,140 @@ type pendingEntry struct {
|
||||
older []*report.Report
|
||||
}
|
||||
|
||||
// We are building up a report in memory; merge into that and it will be saved shortly
|
||||
func NewLiveCollector(config LiveCollectorConfig) (app.Collector, error) {
|
||||
registerLiveCollectorMetricsOnce.Do(registerLiveCollectorMetrics)
|
||||
c := &liveCollector{
|
||||
cfg: config,
|
||||
}
|
||||
return c, c.init()
|
||||
}
|
||||
|
||||
func (c *liveCollector) init() error {
|
||||
var nc *nats.Conn
|
||||
if c.cfg.NatsHost != "" {
|
||||
if c.cfg.MemcacheClient == nil {
|
||||
return fmt.Errorf("Must supply memcache client when using nats")
|
||||
}
|
||||
var err error
|
||||
nc, err = nats.Connect(c.cfg.NatsHost)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
c.nats = nc
|
||||
c.merger = app.NewFastMerger()
|
||||
c.waiters = make(map[watchKey]*nats.Subscription)
|
||||
if c.isCollector() {
|
||||
if c.cfg.TickInterval == 0 {
|
||||
return fmt.Errorf("--app.collector.tick-interval or --app.collector.store-interval must be non-zero for a collector")
|
||||
}
|
||||
c.ticker = time.NewTicker(c.cfg.TickInterval)
|
||||
go c.tickLoop()
|
||||
}
|
||||
c.tickCallbacks = append(c.tickCallbacks, c.bumpPending)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *liveCollector) tickLoop() {
|
||||
for range c.ticker.C {
|
||||
for _, f := range c.tickCallbacks {
|
||||
f(context.Background())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close will close things down
|
||||
func (c *liveCollector) Close() {
|
||||
c.ticker.Stop() // note this doesn't close the chan; goroutine keeps running
|
||||
}
|
||||
|
||||
// Range over all users (instances) that have pending reports and shift the data back in the array
|
||||
func (c *liveCollector) bumpPending(ctx context.Context) {
|
||||
c.pending.Range(func(key, value interface{}) bool {
|
||||
entry := value.(*pendingEntry)
|
||||
|
||||
entry.Lock()
|
||||
rpt := entry.report
|
||||
entry.report = nil
|
||||
if entry.older == nil {
|
||||
entry.older = make([]*report.Report, c.cfg.Window/c.cfg.TickInterval)
|
||||
} else {
|
||||
copy(entry.older[1:], entry.older) // move everything down one
|
||||
}
|
||||
entry.older[0] = rpt
|
||||
entry.Unlock()
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (c *liveCollector) HasHistoricReports() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *liveCollector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
|
||||
userid, err := c.cfg.UserIDer(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if time.Since(timestamp) < c.cfg.Window {
|
||||
has, err := c.hasReportsFromLive(ctx, userid)
|
||||
return has, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (c *liveCollector) Add(ctx context.Context, rep report.Report, buf []byte) error {
|
||||
userid, err := c.cfg.UserIDer(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Shortcut reports are published to nats but not persisted -
|
||||
// we'll get a full report from the same probe in a few seconds
|
||||
if rep.Shortcut {
|
||||
if c.nats != nil {
|
||||
_, _, reportKey := calculateReportKeys(userid, time.Now())
|
||||
_, err = c.cfg.MemcacheClient.StoreReportBytes(ctx, reportKey, buf)
|
||||
if err != nil {
|
||||
log.Warningf("Could not store shortcut %v in memcache: %v", reportKey, err)
|
||||
// No point publishing on nats if cache store failed
|
||||
return nil
|
||||
}
|
||||
err := c.nats.Publish(userid, []byte(reportKey))
|
||||
natsRequests.WithLabelValues("Publish", instrument.ErrorCode(err)).Add(1)
|
||||
if err != nil {
|
||||
log.Errorf("Error sending shortcut report: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
rep = c.massageReport(userid, rep)
|
||||
c.addToLive(ctx, userid, rep)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// process a report from a probe which may be at an older version or overloaded
|
||||
func (c *liveCollector) massageReport(userid string, report report.Report) report.Report {
|
||||
if c.cfg.MaxTopNodes > 0 {
|
||||
max := c.cfg.MaxTopNodes
|
||||
if len(report.Host.Nodes) > 1 {
|
||||
max = max * len(report.Host.Nodes) // higher limit for merged reports
|
||||
}
|
||||
var dropped []string
|
||||
report, dropped = report.DropTopologiesOver(max)
|
||||
for _, name := range dropped {
|
||||
topologiesDropped.WithLabelValues(userid, name).Inc()
|
||||
}
|
||||
}
|
||||
report = report.Upgrade()
|
||||
return report
|
||||
}
|
||||
|
||||
// We are building up a report in memory; merge into that (for awsCollector it will be saved shortly)
|
||||
// NOTE: may retain a reference to rep; must not be used by caller after this.
|
||||
func (c *awsCollector) addToLive(ctx context.Context, userid string, rep report.Report) {
|
||||
func (c *liveCollector) addToLive(ctx context.Context, userid string, rep report.Report) {
|
||||
entry := &pendingEntry{}
|
||||
if e, found := c.pending.LoadOrStore(userid, entry); found {
|
||||
entry = e.(*pendingEntry)
|
||||
@@ -45,11 +217,11 @@ func (c *awsCollector) addToLive(ctx context.Context, userid string, rep report.
|
||||
entry.Unlock()
|
||||
}
|
||||
|
||||
func (c *awsCollector) isCollector() bool {
|
||||
return c.cfg.StoreInterval != 0
|
||||
func (c *liveCollector) isCollector() bool {
|
||||
return c.cfg.CollectorAddr == ""
|
||||
}
|
||||
|
||||
func (c *awsCollector) hasReportsFromLive(ctx context.Context, userid string) (bool, error) {
|
||||
func (c *liveCollector) hasReportsFromLive(ctx context.Context, userid string) (bool, error) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "hasReportsFromLive")
|
||||
defer span.Finish()
|
||||
if c.isCollector() {
|
||||
@@ -91,7 +263,26 @@ func (c *awsCollector) hasReportsFromLive(ctx context.Context, userid string) (b
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (c *awsCollector) reportsFromLive(ctx context.Context, userid string) ([]report.Report, error) {
|
||||
func (c *liveCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "liveCollector.Report")
|
||||
defer span.Finish()
|
||||
userid, err := c.cfg.UserIDer(ctx)
|
||||
if err != nil {
|
||||
return report.MakeReport(), err
|
||||
}
|
||||
span.SetTag("userid", userid)
|
||||
var reports []report.Report
|
||||
if time.Since(timestamp) < c.cfg.Window {
|
||||
reports, err = c.reportsFromLive(ctx, userid)
|
||||
}
|
||||
if err != nil {
|
||||
return report.MakeReport(), err
|
||||
}
|
||||
span.LogFields(otlog.Int("merging", len(reports)))
|
||||
return c.merger.Merge(reports), nil
|
||||
}
|
||||
|
||||
func (c *liveCollector) reportsFromLive(ctx context.Context, userid string) ([]report.Report, error) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "reportsFromLive")
|
||||
defer span.Finish()
|
||||
if c.isCollector() {
|
||||
@@ -196,3 +387,82 @@ func oneCall(ctx context.Context, endpoint, path, userid string) (io.ReadCloser,
|
||||
|
||||
return res.Body, nil
|
||||
}
|
||||
|
||||
func (c *liveCollector) WaitOn(ctx context.Context, waiter chan struct{}) {
|
||||
userid, err := c.cfg.UserIDer(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("Error getting user id in WaitOn: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if c.nats == nil {
|
||||
return
|
||||
}
|
||||
|
||||
sub, err := c.nats.SubscribeSync(userid)
|
||||
natsRequests.WithLabelValues("SubscribeSync", instrument.ErrorCode(err)).Add(1)
|
||||
if err != nil {
|
||||
log.Errorf("Error subscribing for shortcuts: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
c.waitersLock.Lock()
|
||||
c.waiters[watchKey{userid, waiter}] = sub
|
||||
c.waitersLock.Unlock()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
_, err := sub.NextMsg(natsTimeout)
|
||||
if err == nats.ErrTimeout {
|
||||
continue
|
||||
}
|
||||
natsRequests.WithLabelValues("NextMsg", instrument.ErrorCode(err)).Add(1)
|
||||
if err != nil {
|
||||
log.Debugf("NextMsg error: %v", err)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case waiter <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *liveCollector) UnWait(ctx context.Context, waiter chan struct{}) {
|
||||
userid, err := c.cfg.UserIDer(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("Error getting user id in WaitOn: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if c.nats == nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.waitersLock.Lock()
|
||||
key := watchKey{userid, waiter}
|
||||
sub := c.waiters[key]
|
||||
delete(c.waiters, key)
|
||||
c.waitersLock.Unlock()
|
||||
|
||||
err = sub.Unsubscribe()
|
||||
natsRequests.WithLabelValues("Unsubscribe", instrument.ErrorCode(err)).Add(1)
|
||||
if err != nil {
|
||||
log.Errorf("Error on unsubscribe: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// AdminSummary returns a string with some internal information about
|
||||
// the report, which may be useful to troubleshoot.
|
||||
func (c *liveCollector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "liveCollector.AdminSummary")
|
||||
defer span.Finish()
|
||||
userid, err := c.cfg.UserIDer(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
_ = userid
|
||||
// TODO: finish implementation
|
||||
return "TODO", nil
|
||||
}
|
||||
|
||||
37
prog/app.go
37
prog/app.go
@@ -89,12 +89,29 @@ func router(collector app.Collector, controlRouter app.ControlRouter, pipeRouter
|
||||
return middlewares.Wrap(router)
|
||||
}
|
||||
|
||||
func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL string, storeInterval time.Duration, natsHostname string,
|
||||
func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL string, tickInterval time.Duration, natsHostname string,
|
||||
memcacheConfig multitenant.MemcacheConfig, window time.Duration, maxTopNodes int, createTables bool, collectorAddr string) (app.Collector, error) {
|
||||
if collectorURL == "local" {
|
||||
return app.NewCollector(window), nil
|
||||
}
|
||||
|
||||
var memcacheClient *multitenant.MemcacheClient
|
||||
if memcacheConfig.Host != "" {
|
||||
memcacheClient = multitenant.NewMemcacheClient(memcacheConfig)
|
||||
}
|
||||
liveConfig := multitenant.LiveCollectorConfig{
|
||||
UserIDer: userIDer,
|
||||
NatsHost: natsHostname,
|
||||
MemcacheClient: memcacheClient,
|
||||
Window: window,
|
||||
TickInterval: tickInterval,
|
||||
MaxTopNodes: maxTopNodes,
|
||||
CollectorAddr: collectorAddr,
|
||||
}
|
||||
if collectorURL == "multitenant-live" {
|
||||
return multitenant.NewLiveCollector(liveConfig)
|
||||
}
|
||||
|
||||
parsed, err := url.Parse(collectorURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -119,22 +136,12 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL string,
|
||||
bucketName := strings.TrimPrefix(s3.Path, "/")
|
||||
tableName := strings.TrimPrefix(parsed.Path, "/")
|
||||
s3Store := multitenant.NewS3Client(s3Config, bucketName)
|
||||
var memcacheClient *multitenant.MemcacheClient
|
||||
if memcacheConfig.Host != "" {
|
||||
memcacheClient = multitenant.NewMemcacheClient(memcacheConfig)
|
||||
}
|
||||
awsCollector, err := multitenant.NewAWSCollector(
|
||||
liveConfig,
|
||||
multitenant.AWSCollectorConfig{
|
||||
UserIDer: userIDer,
|
||||
DynamoDBConfig: dynamoDBConfig,
|
||||
DynamoTable: tableName,
|
||||
S3Store: &s3Store,
|
||||
StoreInterval: storeInterval,
|
||||
NatsHost: natsHostname,
|
||||
MemcacheClient: memcacheClient,
|
||||
Window: window,
|
||||
MaxTopNodes: maxTopNodes,
|
||||
CollectorAddr: collectorAddr,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
@@ -239,8 +246,12 @@ func appMain(flags appFlags) {
|
||||
userIDer = multitenant.UserIDHeader(flags.userIDHeader)
|
||||
}
|
||||
|
||||
tickInterval := flags.tickInterval
|
||||
if tickInterval == 0 {
|
||||
tickInterval = flags.storeInterval
|
||||
}
|
||||
collector, err := collectorFactory(
|
||||
userIDer, flags.collectorURL, flags.s3URL, flags.storeInterval, flags.natsHostname,
|
||||
userIDer, flags.collectorURL, flags.s3URL, tickInterval, flags.natsHostname,
|
||||
multitenant.MemcacheConfig{
|
||||
Host: flags.memcachedHostname,
|
||||
Timeout: flags.memcachedTimeout,
|
||||
|
||||
@@ -166,6 +166,7 @@ type appFlags struct {
|
||||
collectorAddr string // how to find collectors if deployed as microservices
|
||||
s3URL string
|
||||
storeInterval time.Duration
|
||||
tickInterval time.Duration
|
||||
controlRouterURL string
|
||||
controlRPCTimeout time.Duration
|
||||
pipeRouterURL string
|
||||
@@ -376,10 +377,11 @@ func setupFlags(flags *flags) {
|
||||
flag.Var(&flags.containerLabelFilterFlags, "app.container-label-filter", "Add container label-based view filter, specified as title:label. Multiple flags are accepted. Example: --app.container-label-filter='Database Containers:role=db'")
|
||||
flag.Var(&flags.containerLabelFilterFlagsExclude, "app.container-label-filter-exclude", "Add container label-based view filter that excludes containers with the given label, specified as title:label. Multiple flags are accepted. Example: --app.container-label-filter-exclude='Database Containers:role=db'")
|
||||
|
||||
flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, dynamodb, or file/directory)")
|
||||
flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, multitenant-live, dynamodb, or file/directory)")
|
||||
flag.StringVar(&flags.app.collectorAddr, "app.collector-addr", "", "Address to look up collectors when deployed as microservices")
|
||||
flag.StringVar(&flags.app.s3URL, "app.collector.s3", "local", "S3 URL to use (when collector is dynamodb)")
|
||||
flag.DurationVar(&flags.app.storeInterval, "app.collector.store-interval", 0, "How often to store merged incoming reports. If 0, reports are stored unmerged as they arrive.")
|
||||
flag.DurationVar(&flags.app.storeInterval, "app.collector.store-interval", 0, "How often to store merged incoming reports.")
|
||||
flag.DurationVar(&flags.app.tickInterval, "app.collector.tick-interval", 0, "How often to start a new group of recent reports, for the multitenant collector")
|
||||
flag.StringVar(&flags.app.controlRouterURL, "app.control.router", "local", "Control router to use (local or sqs)")
|
||||
flag.DurationVar(&flags.app.controlRPCTimeout, "app.control.rpctimeout", time.Minute, "Timeout for control RPC")
|
||||
flag.StringVar(&flags.app.pipeRouterURL, "app.pipe.router", "local", "Pipe router to use (local)")
|
||||
|
||||
Reference in New Issue
Block a user