diff --git a/app/multitenant/aws_collector.go b/app/multitenant/aws_collector.go index c0abba89d..a3fe95b12 100644 --- a/app/multitenant/aws_collector.go +++ b/app/multitenant/aws_collector.go @@ -15,7 +15,6 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/bluele/gcache" - "github.com/nats-io/nats" opentracing "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" @@ -110,11 +109,6 @@ func registerAWSCollectorMetrics() { prometheus.MustRegister(dynamoValueSize) prometheus.MustRegister(inProcessCacheRequests) prometheus.MustRegister(inProcessCacheHits) - prometheus.MustRegister(reportSizeHistogram) - prometheus.MustRegister(reportsPerUser) - prometheus.MustRegister(reportSizePerUser) - prometheus.MustRegister(topologiesDropped) - prometheus.MustRegister(natsRequests) flushDuration.Register() } @@ -133,32 +127,16 @@ type ReportStore interface { // AWSCollectorConfig has everything we need to make an AWS collector. type AWSCollectorConfig struct { - UserIDer UserIDer DynamoDBConfig *aws.Config DynamoTable string S3Store *S3Store - StoreInterval time.Duration - NatsHost string - MemcacheClient *MemcacheClient - Window time.Duration - MaxTopNodes int - CollectorAddr string } type awsCollector struct { - cfg AWSCollectorConfig + liveCollector + awsCfg AWSCollectorConfig db *dynamodb.DynamoDB - merger app.Merger inProcess inProcessStore - pending sync.Map - ticker *time.Ticker - - nats *nats.Conn - waitersLock sync.Mutex - waiters map[watchKey]*nats.Subscription - - collectors []string - lastResolved time.Time } // Shortcut reports: @@ -177,45 +155,25 @@ type watchKey struct { // NewAWSCollector the elastic reaper of souls // https://github.com/aws/aws-sdk-go/wiki/common-examples -func NewAWSCollector(config AWSCollectorConfig) (AWSCollector, error) { +func NewAWSCollector(liveConfig LiveCollectorConfig, config AWSCollectorConfig) (AWSCollector, error) { registerAWSCollectorMetricsOnce.Do(registerAWSCollectorMetrics) - var nc *nats.Conn - if config.NatsHost != "" { - if config.MemcacheClient == nil { - return nil, fmt.Errorf("Must supply memcache client when using nats") - } - var err error - nc, err = nats.Connect(config.NatsHost) - if err != nil { - return nil, err - } - } // (window * report rate) * number of hosts per user * number of users - reportCacheSize := (int(config.Window.Seconds()) / 3) * 10 * 5 + reportCacheSize := (int(liveConfig.Window.Seconds()) / 3) * 10 * 5 c := &awsCollector{ - cfg: config, - db: dynamodb.New(session.New(config.DynamoDBConfig)), - merger: app.NewFastMerger(), - inProcess: newInProcessStore(reportCacheSize, config.Window+reportQuantisationInterval), - nats: nc, - waiters: map[watchKey]*nats.Subscription{}, + liveCollector: liveCollector{cfg: liveConfig}, + awsCfg: config, + db: dynamodb.New(session.New(config.DynamoDBConfig)), + inProcess: newInProcessStore(reportCacheSize, liveConfig.Window+reportQuantisationInterval), } - - // If given a StoreInterval we will be storing periodically; if not we only answer queries - if c.isCollector() { - c.ticker = time.NewTicker(config.StoreInterval) - go c.flushLoop() + err := c.liveCollector.init() + if err != nil { + return nil, err } + c.tickCallbacks = append(c.tickCallbacks, c.flushPending) return c, nil } -func (c *awsCollector) flushLoop() { - for range c.ticker.C { - c.flushPending(context.Background()) - } -} - // Range over all users (instances) that have pending reports and send to store func (c *awsCollector) flushPending(ctx context.Context) { instrument.CollectedRequest(ctx, "FlushPending", flushDuration, nil, func(ctx context.Context) error { @@ -245,14 +203,7 @@ func (c *awsCollector) flushPending(ctx context.Context) { entry := value.(*pendingEntry) entry.Lock() - rpt := entry.report - entry.report = nil - if entry.older == nil { - entry.older = make([]*report.Report, c.cfg.Window/c.cfg.StoreInterval) - } else { - copy(entry.older[1:], entry.older) // move everything down one - } - entry.older[0] = rpt + rpt := entry.older[0] entry.Unlock() if rpt != nil { @@ -274,7 +225,7 @@ func (c *awsCollector) flushPending(ctx context.Context) { // Close will flush pending data func (c *awsCollector) Close() { - c.ticker.Stop() // note this doesn't close the chan; goroutine keeps running + c.liveCollector.Close() c.flushPending(context.Background()) } @@ -288,13 +239,13 @@ func (c *awsCollector) CreateTables() error { return err } for _, s := range resp.TableNames { - if *s == c.cfg.DynamoTable { + if *s == c.awsCfg.DynamoTable { return nil } } params := &dynamodb.CreateTableInput{ - TableName: aws.String(c.cfg.DynamoTable), + TableName: aws.String(c.awsCfg.DynamoTable), AttributeDefinitions: []*dynamodb.AttributeDefinition{ { AttributeName: aws.String(hourField), @@ -325,7 +276,7 @@ func (c *awsCollector) CreateTables() error { WriteCapacityUnits: aws.Int64(5), }, } - log.Infof("Creating table %s", c.cfg.DynamoTable) + log.Infof("Creating table %s", c.awsCfg.DynamoTable) _, err = c.db.CreateTable(params) return err } @@ -342,7 +293,7 @@ func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row err := instrument.TimeRequestHistogram(ctx, "DynamoDB.Query", dynamoRequestDuration, func(_ context.Context) error { var err error resp, err = c.db.Query(&dynamodb.QueryInput{ - TableName: aws.String(c.cfg.DynamoTable), + TableName: aws.String(c.awsCfg.DynamoTable), KeyConditions: map[string]*dynamodb.Condition{ hourField: { AttributeValueList: []*dynamodb.AttributeValue{ @@ -423,7 +374,7 @@ func (c *awsCollector) getReports(ctx context.Context, userid string, reportKeys if c.cfg.MemcacheClient != nil { stores = append(stores, c.cfg.MemcacheClient) } - stores = append(stores, c.cfg.S3Store) + stores = append(stores, c.awsCfg.S3Store) var reports []report.Report for _, store := range stores { @@ -451,23 +402,6 @@ func (c *awsCollector) getReports(ctx context.Context, userid string, reportKeys return reports, nil } -// process a report from a probe which may be at an older version or overloaded -func (c *awsCollector) massageReport(userid string, report report.Report) report.Report { - if c.cfg.MaxTopNodes > 0 { - max := c.cfg.MaxTopNodes - if len(report.Host.Nodes) > 1 { - max = max * len(report.Host.Nodes) // higher limit for merged reports - } - var dropped []string - report, dropped = report.DropTopologiesOver(max) - for _, name := range dropped { - topologiesDropped.WithLabelValues(userid, name).Inc() - } - } - report = report.Upgrade() - return report -} - // If we are running as a Query service, fetch data and merge into a report // If we are running as a Collector and the request is for live data, merge in-memory data and return func (c *awsCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) { @@ -583,7 +517,7 @@ func (c *awsCollector) HasHistoricReports() bool { // AdminSummary returns a string with some internal information about // the report, which may be useful to troubleshoot. func (c *awsCollector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.Report") + span, ctx := opentracing.StartSpanFromContext(ctx, "awsCollector.AdminSummary") defer span.Finish() userid, err := c.cfg.UserIDer(ctx) if err != nil { @@ -625,7 +559,7 @@ func calculateReportKeys(userid string, now time.Time) (string, string, string) func (c *awsCollector) persistReport(ctx context.Context, userid, rowKey, colKey, reportKey string, buf []byte) error { // Put in S3 and cache before index, so it is fetchable before it is discoverable - reportSize, err := c.cfg.S3Store.StoreReportBytes(ctx, reportKey, buf) + reportSize, err := c.awsCfg.S3Store.StoreReportBytes(ctx, reportKey, buf) if err != nil { return err } @@ -674,7 +608,7 @@ func (c *awsCollector) putItemInDynamo(rowKey, colKey, reportKey string) (*dynam ) for { resp, err = c.db.PutItem(&dynamodb.PutItemInput{ - TableName: aws.String(c.cfg.DynamoTable), + TableName: aws.String(c.awsCfg.DynamoTable), Item: map[string]*dynamodb.AttributeValue{ hourField: { S: aws.String(rowKey), @@ -701,106 +635,6 @@ func (c *awsCollector) putItemInDynamo(rowKey, colKey, reportKey string) (*dynam return resp, err } -func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) error { - userid, err := c.cfg.UserIDer(ctx) - if err != nil { - return err - } - if c.cfg.StoreInterval == 0 { - return fmt.Errorf("--app.collector.store-interval must be non-zero") - } - - // Shortcut reports are published to nats but not persisted - - // we'll get a full report from the same probe in a few seconds - if rep.Shortcut { - if c.nats != nil { - _, _, reportKey := calculateReportKeys(userid, time.Now()) - _, err = c.cfg.MemcacheClient.StoreReportBytes(ctx, reportKey, buf) - if err != nil { - log.Warningf("Could not store shortcut %v in memcache: %v", reportKey, err) - // No point publishing on nats if cache store failed - return nil - } - err := c.nats.Publish(userid, []byte(reportKey)) - natsRequests.WithLabelValues("Publish", instrument.ErrorCode(err)).Add(1) - if err != nil { - log.Errorf("Error sending shortcut report: %v", err) - } - } - return nil - } - - rep = c.massageReport(userid, rep) - c.addToLive(ctx, userid, rep) - - return nil -} - -func (c *awsCollector) WaitOn(ctx context.Context, waiter chan struct{}) { - userid, err := c.cfg.UserIDer(ctx) - if err != nil { - log.Errorf("Error getting user id in WaitOn: %v", err) - return - } - - if c.nats == nil { - return - } - - sub, err := c.nats.SubscribeSync(userid) - natsRequests.WithLabelValues("SubscribeSync", instrument.ErrorCode(err)).Add(1) - if err != nil { - log.Errorf("Error subscribing for shortcuts: %v", err) - return - } - - c.waitersLock.Lock() - c.waiters[watchKey{userid, waiter}] = sub - c.waitersLock.Unlock() - - go func() { - for { - _, err := sub.NextMsg(natsTimeout) - if err == nats.ErrTimeout { - continue - } - natsRequests.WithLabelValues("NextMsg", instrument.ErrorCode(err)).Add(1) - if err != nil { - log.Debugf("NextMsg error: %v", err) - return - } - select { - case waiter <- struct{}{}: - default: - } - } - }() -} - -func (c *awsCollector) UnWait(ctx context.Context, waiter chan struct{}) { - userid, err := c.cfg.UserIDer(ctx) - if err != nil { - log.Errorf("Error getting user id in WaitOn: %v", err) - return - } - - if c.nats == nil { - return - } - - c.waitersLock.Lock() - key := watchKey{userid, waiter} - sub := c.waiters[key] - delete(c.waiters, key) - c.waitersLock.Unlock() - - err = sub.Unsubscribe() - natsRequests.WithLabelValues("Unsubscribe", instrument.ErrorCode(err)).Add(1) - if err != nil { - log.Errorf("Error on unsubscribe: %v", err) - } -} - type inProcessStore struct { cache gcache.Cache } diff --git a/app/multitenant/collector.go b/app/multitenant/collector.go index 3c23373d6..32b378fd1 100644 --- a/app/multitenant/collector.go +++ b/app/multitenant/collector.go @@ -14,14 +14,55 @@ import ( "context" + "github.com/nats-io/nats" "github.com/opentracing-contrib/go-stdlib/nethttp" opentracing "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" + "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/user" + "github.com/weaveworks/scope/app" "github.com/weaveworks/scope/report" "golang.org/x/sync/errgroup" ) +func registerLiveCollectorMetrics() { + prometheus.MustRegister(reportSizeHistogram) + prometheus.MustRegister(reportsPerUser) + prometheus.MustRegister(reportSizePerUser) + prometheus.MustRegister(topologiesDropped) + prometheus.MustRegister(natsRequests) +} + +var registerLiveCollectorMetricsOnce sync.Once + +// LiveCollectorConfig has everything we need to make a collector for live multitenant data. +type LiveCollectorConfig struct { + UserIDer UserIDer + NatsHost string + MemcacheClient *MemcacheClient + Window time.Duration + TickInterval time.Duration + MaxTopNodes int + CollectorAddr string +} + +type liveCollector struct { + cfg LiveCollectorConfig + merger app.Merger + pending sync.Map + ticker *time.Ticker + tickCallbacks []func(context.Context) + + nats *nats.Conn + waitersLock sync.Mutex + waiters map[watchKey]*nats.Subscription + + collectors []string + lastResolved time.Time +} + // if StoreInterval is set, reports are merged into here and held until flushed to store type pendingEntry struct { sync.Mutex @@ -29,9 +70,140 @@ type pendingEntry struct { older []*report.Report } -// We are building up a report in memory; merge into that and it will be saved shortly +func NewLiveCollector(config LiveCollectorConfig) (app.Collector, error) { + registerLiveCollectorMetricsOnce.Do(registerLiveCollectorMetrics) + c := &liveCollector{ + cfg: config, + } + return c, c.init() +} + +func (c *liveCollector) init() error { + var nc *nats.Conn + if c.cfg.NatsHost != "" { + if c.cfg.MemcacheClient == nil { + return fmt.Errorf("Must supply memcache client when using nats") + } + var err error + nc, err = nats.Connect(c.cfg.NatsHost) + if err != nil { + return err + } + } + c.nats = nc + c.merger = app.NewFastMerger() + c.waiters = make(map[watchKey]*nats.Subscription) + if c.isCollector() { + if c.cfg.TickInterval == 0 { + return fmt.Errorf("--app.collector.tick-interval or --app.collector.store-interval must be non-zero for a collector") + } + c.ticker = time.NewTicker(c.cfg.TickInterval) + go c.tickLoop() + } + c.tickCallbacks = append(c.tickCallbacks, c.bumpPending) + return nil +} + +func (c *liveCollector) tickLoop() { + for range c.ticker.C { + for _, f := range c.tickCallbacks { + f(context.Background()) + } + } +} + +// Close will close things down +func (c *liveCollector) Close() { + c.ticker.Stop() // note this doesn't close the chan; goroutine keeps running +} + +// Range over all users (instances) that have pending reports and shift the data back in the array +func (c *liveCollector) bumpPending(ctx context.Context) { + c.pending.Range(func(key, value interface{}) bool { + entry := value.(*pendingEntry) + + entry.Lock() + rpt := entry.report + entry.report = nil + if entry.older == nil { + entry.older = make([]*report.Report, c.cfg.Window/c.cfg.TickInterval) + } else { + copy(entry.older[1:], entry.older) // move everything down one + } + entry.older[0] = rpt + entry.Unlock() + return true + }) +} + +func (c *liveCollector) HasHistoricReports() bool { + return false +} + +func (c *liveCollector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) { + userid, err := c.cfg.UserIDer(ctx) + if err != nil { + return false, err + } + if time.Since(timestamp) < c.cfg.Window { + has, err := c.hasReportsFromLive(ctx, userid) + return has, err + } + return false, nil +} + +func (c *liveCollector) Add(ctx context.Context, rep report.Report, buf []byte) error { + userid, err := c.cfg.UserIDer(ctx) + if err != nil { + return err + } + + // Shortcut reports are published to nats but not persisted - + // we'll get a full report from the same probe in a few seconds + if rep.Shortcut { + if c.nats != nil { + _, _, reportKey := calculateReportKeys(userid, time.Now()) + _, err = c.cfg.MemcacheClient.StoreReportBytes(ctx, reportKey, buf) + if err != nil { + log.Warningf("Could not store shortcut %v in memcache: %v", reportKey, err) + // No point publishing on nats if cache store failed + return nil + } + err := c.nats.Publish(userid, []byte(reportKey)) + natsRequests.WithLabelValues("Publish", instrument.ErrorCode(err)).Add(1) + if err != nil { + log.Errorf("Error sending shortcut report: %v", err) + } + } + return nil + } + + rep = c.massageReport(userid, rep) + c.addToLive(ctx, userid, rep) + + return nil +} + +// process a report from a probe which may be at an older version or overloaded +func (c *liveCollector) massageReport(userid string, report report.Report) report.Report { + if c.cfg.MaxTopNodes > 0 { + max := c.cfg.MaxTopNodes + if len(report.Host.Nodes) > 1 { + max = max * len(report.Host.Nodes) // higher limit for merged reports + } + var dropped []string + report, dropped = report.DropTopologiesOver(max) + for _, name := range dropped { + topologiesDropped.WithLabelValues(userid, name).Inc() + } + } + report = report.Upgrade() + return report +} + +// We are building up a report in memory; merge into that (for awsCollector it will be saved shortly) // NOTE: may retain a reference to rep; must not be used by caller after this. -func (c *awsCollector) addToLive(ctx context.Context, userid string, rep report.Report) { +func (c *liveCollector) addToLive(ctx context.Context, userid string, rep report.Report) { entry := &pendingEntry{} if e, found := c.pending.LoadOrStore(userid, entry); found { entry = e.(*pendingEntry) @@ -45,11 +217,11 @@ func (c *awsCollector) addToLive(ctx context.Context, userid string, rep report. entry.Unlock() } -func (c *awsCollector) isCollector() bool { - return c.cfg.StoreInterval != 0 +func (c *liveCollector) isCollector() bool { + return c.cfg.CollectorAddr == "" } -func (c *awsCollector) hasReportsFromLive(ctx context.Context, userid string) (bool, error) { +func (c *liveCollector) hasReportsFromLive(ctx context.Context, userid string) (bool, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "hasReportsFromLive") defer span.Finish() if c.isCollector() { @@ -91,7 +263,26 @@ func (c *awsCollector) hasReportsFromLive(ctx context.Context, userid string) (b return false, nil } -func (c *awsCollector) reportsFromLive(ctx context.Context, userid string) ([]report.Report, error) { +func (c *liveCollector) Report(ctx context.Context, timestamp time.Time) (report.Report, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "liveCollector.Report") + defer span.Finish() + userid, err := c.cfg.UserIDer(ctx) + if err != nil { + return report.MakeReport(), err + } + span.SetTag("userid", userid) + var reports []report.Report + if time.Since(timestamp) < c.cfg.Window { + reports, err = c.reportsFromLive(ctx, userid) + } + if err != nil { + return report.MakeReport(), err + } + span.LogFields(otlog.Int("merging", len(reports))) + return c.merger.Merge(reports), nil +} + +func (c *liveCollector) reportsFromLive(ctx context.Context, userid string) ([]report.Report, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "reportsFromLive") defer span.Finish() if c.isCollector() { @@ -196,3 +387,82 @@ func oneCall(ctx context.Context, endpoint, path, userid string) (io.ReadCloser, return res.Body, nil } + +func (c *liveCollector) WaitOn(ctx context.Context, waiter chan struct{}) { + userid, err := c.cfg.UserIDer(ctx) + if err != nil { + log.Errorf("Error getting user id in WaitOn: %v", err) + return + } + + if c.nats == nil { + return + } + + sub, err := c.nats.SubscribeSync(userid) + natsRequests.WithLabelValues("SubscribeSync", instrument.ErrorCode(err)).Add(1) + if err != nil { + log.Errorf("Error subscribing for shortcuts: %v", err) + return + } + + c.waitersLock.Lock() + c.waiters[watchKey{userid, waiter}] = sub + c.waitersLock.Unlock() + + go func() { + for { + _, err := sub.NextMsg(natsTimeout) + if err == nats.ErrTimeout { + continue + } + natsRequests.WithLabelValues("NextMsg", instrument.ErrorCode(err)).Add(1) + if err != nil { + log.Debugf("NextMsg error: %v", err) + return + } + select { + case waiter <- struct{}{}: + default: + } + } + }() +} + +func (c *liveCollector) UnWait(ctx context.Context, waiter chan struct{}) { + userid, err := c.cfg.UserIDer(ctx) + if err != nil { + log.Errorf("Error getting user id in WaitOn: %v", err) + return + } + + if c.nats == nil { + return + } + + c.waitersLock.Lock() + key := watchKey{userid, waiter} + sub := c.waiters[key] + delete(c.waiters, key) + c.waitersLock.Unlock() + + err = sub.Unsubscribe() + natsRequests.WithLabelValues("Unsubscribe", instrument.ErrorCode(err)).Add(1) + if err != nil { + log.Errorf("Error on unsubscribe: %v", err) + } +} + +// AdminSummary returns a string with some internal information about +// the report, which may be useful to troubleshoot. +func (c *liveCollector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "liveCollector.AdminSummary") + defer span.Finish() + userid, err := c.cfg.UserIDer(ctx) + if err != nil { + return "", err + } + _ = userid + // TODO: finish implementation + return "TODO", nil +} diff --git a/prog/app.go b/prog/app.go index dff8a88da..2ed4afbca 100644 --- a/prog/app.go +++ b/prog/app.go @@ -89,12 +89,29 @@ func router(collector app.Collector, controlRouter app.ControlRouter, pipeRouter return middlewares.Wrap(router) } -func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL string, storeInterval time.Duration, natsHostname string, +func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL string, tickInterval time.Duration, natsHostname string, memcacheConfig multitenant.MemcacheConfig, window time.Duration, maxTopNodes int, createTables bool, collectorAddr string) (app.Collector, error) { if collectorURL == "local" { return app.NewCollector(window), nil } + var memcacheClient *multitenant.MemcacheClient + if memcacheConfig.Host != "" { + memcacheClient = multitenant.NewMemcacheClient(memcacheConfig) + } + liveConfig := multitenant.LiveCollectorConfig{ + UserIDer: userIDer, + NatsHost: natsHostname, + MemcacheClient: memcacheClient, + Window: window, + TickInterval: tickInterval, + MaxTopNodes: maxTopNodes, + CollectorAddr: collectorAddr, + } + if collectorURL == "multitenant-live" { + return multitenant.NewLiveCollector(liveConfig) + } + parsed, err := url.Parse(collectorURL) if err != nil { return nil, err @@ -119,22 +136,12 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL string, bucketName := strings.TrimPrefix(s3.Path, "/") tableName := strings.TrimPrefix(parsed.Path, "/") s3Store := multitenant.NewS3Client(s3Config, bucketName) - var memcacheClient *multitenant.MemcacheClient - if memcacheConfig.Host != "" { - memcacheClient = multitenant.NewMemcacheClient(memcacheConfig) - } awsCollector, err := multitenant.NewAWSCollector( + liveConfig, multitenant.AWSCollectorConfig{ - UserIDer: userIDer, DynamoDBConfig: dynamoDBConfig, DynamoTable: tableName, S3Store: &s3Store, - StoreInterval: storeInterval, - NatsHost: natsHostname, - MemcacheClient: memcacheClient, - Window: window, - MaxTopNodes: maxTopNodes, - CollectorAddr: collectorAddr, }, ) if err != nil { @@ -239,8 +246,12 @@ func appMain(flags appFlags) { userIDer = multitenant.UserIDHeader(flags.userIDHeader) } + tickInterval := flags.tickInterval + if tickInterval == 0 { + tickInterval = flags.storeInterval + } collector, err := collectorFactory( - userIDer, flags.collectorURL, flags.s3URL, flags.storeInterval, flags.natsHostname, + userIDer, flags.collectorURL, flags.s3URL, tickInterval, flags.natsHostname, multitenant.MemcacheConfig{ Host: flags.memcachedHostname, Timeout: flags.memcachedTimeout, diff --git a/prog/main.go b/prog/main.go index 66eb5c4c7..4012e3d2d 100644 --- a/prog/main.go +++ b/prog/main.go @@ -166,6 +166,7 @@ type appFlags struct { collectorAddr string // how to find collectors if deployed as microservices s3URL string storeInterval time.Duration + tickInterval time.Duration controlRouterURL string controlRPCTimeout time.Duration pipeRouterURL string @@ -376,10 +377,11 @@ func setupFlags(flags *flags) { flag.Var(&flags.containerLabelFilterFlags, "app.container-label-filter", "Add container label-based view filter, specified as title:label. Multiple flags are accepted. Example: --app.container-label-filter='Database Containers:role=db'") flag.Var(&flags.containerLabelFilterFlagsExclude, "app.container-label-filter-exclude", "Add container label-based view filter that excludes containers with the given label, specified as title:label. Multiple flags are accepted. Example: --app.container-label-filter-exclude='Database Containers:role=db'") - flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, dynamodb, or file/directory)") + flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, multitenant-live, dynamodb, or file/directory)") flag.StringVar(&flags.app.collectorAddr, "app.collector-addr", "", "Address to look up collectors when deployed as microservices") flag.StringVar(&flags.app.s3URL, "app.collector.s3", "local", "S3 URL to use (when collector is dynamodb)") - flag.DurationVar(&flags.app.storeInterval, "app.collector.store-interval", 0, "How often to store merged incoming reports. If 0, reports are stored unmerged as they arrive.") + flag.DurationVar(&flags.app.storeInterval, "app.collector.store-interval", 0, "How often to store merged incoming reports.") + flag.DurationVar(&flags.app.tickInterval, "app.collector.tick-interval", 0, "How often to start a new group of recent reports, for the multitenant collector") flag.StringVar(&flags.app.controlRouterURL, "app.control.router", "local", "Control router to use (local or sqs)") flag.DurationVar(&flags.app.controlRPCTimeout, "app.control.rpctimeout", time.Minute, "Timeout for control RPC") flag.StringVar(&flags.app.pipeRouterURL, "app.pipe.router", "local", "Pipe router to use (local)")