Merge pull request #3384 from weaveworks/drop-big-topologies

In multitenant app, drop all nodes for big topologies
This commit is contained in:
Bryan Boreham
2018-11-01 17:21:55 +00:00
committed by GitHub
4 changed files with 39 additions and 28 deletions

View File

@@ -106,17 +106,14 @@ type AWSCollectorConfig struct {
NatsHost string
MemcacheClient *MemcacheClient
Window time.Duration
MaxTopNodes int
}
type awsCollector struct {
userIDer UserIDer
cfg AWSCollectorConfig
db *dynamodb.DynamoDB
s3 *S3Store
tableName string
merger app.Merger
inProcess inProcessStore
memcache *MemcacheClient
window time.Duration
nats *nats.Conn
waitersLock sync.Mutex
@@ -152,14 +149,10 @@ func NewAWSCollector(config AWSCollectorConfig) (AWSCollector, error) {
// (window * report rate) * number of hosts per user * number of users
reportCacheSize := (int(config.Window.Seconds()) / 3) * 10 * 5
return &awsCollector{
cfg: config,
db: dynamodb.New(session.New(config.DynamoDBConfig)),
s3: config.S3Store,
userIDer: config.UserIDer,
tableName: config.DynamoTable,
merger: app.NewFastMerger(),
inProcess: newInProcessStore(reportCacheSize, config.Window),
memcache: config.MemcacheClient,
window: config.Window,
nats: nc,
waiters: map[watchKey]*nats.Subscription{},
}, nil
@@ -175,13 +168,13 @@ func (c *awsCollector) CreateTables() error {
return err
}
for _, s := range resp.TableNames {
if *s == c.tableName {
if *s == c.cfg.DynamoTable {
return nil
}
}
params := &dynamodb.CreateTableInput{
TableName: aws.String(c.tableName),
TableName: aws.String(c.cfg.DynamoTable),
AttributeDefinitions: []*dynamodb.AttributeDefinition{
{
AttributeName: aws.String(hourField),
@@ -212,7 +205,7 @@ func (c *awsCollector) CreateTables() error {
WriteCapacityUnits: aws.Int64(5),
},
}
log.Infof("Creating table %s", c.tableName)
log.Infof("Creating table %s", c.cfg.DynamoTable)
_, err = c.db.CreateTable(params)
return err
}
@@ -224,7 +217,7 @@ func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row
err := instrument.TimeRequestHistogram(ctx, "DynamoDB.Query", dynamoRequestDuration, func(_ context.Context) error {
var err error
resp, err = c.db.Query(&dynamodb.QueryInput{
TableName: aws.String(c.tableName),
TableName: aws.String(c.cfg.DynamoTable),
KeyConditions: map[string]*dynamodb.Condition{
hourField: {
AttributeValueList: []*dynamodb.AttributeValue{
@@ -270,12 +263,12 @@ func (c *awsCollector) reportKeysInRange(ctx context.Context, userid string, row
func (c *awsCollector) getReportKeys(ctx context.Context, timestamp time.Time) ([]string, error) {
var (
end = timestamp
start = end.Add(-c.window)
start = end.Add(-c.cfg.Window)
rowStart = start.UnixNano() / time.Hour.Nanoseconds()
rowEnd = end.UnixNano() / time.Hour.Nanoseconds()
)
userid, err := c.userIDer(ctx)
userid, err := c.cfg.UserIDer(ctx)
if err != nil {
return nil, err
}
@@ -306,10 +299,10 @@ func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]r
missing := reportKeys
stores := []ReportStore{c.inProcess}
if c.memcache != nil {
stores = append(stores, c.memcache)
if c.cfg.MemcacheClient != nil {
stores = append(stores, c.cfg.MemcacheClient)
}
stores = append(stores, c.s3)
stores = append(stores, c.cfg.S3Store)
var reports []report.Report
for _, store := range stores {
@@ -322,6 +315,9 @@ func (c *awsCollector) getReports(ctx context.Context, reportKeys []string) ([]r
log.Warningf("Error fetching from cache: %v", err)
}
for key, report := range found {
if c.cfg.MaxTopNodes > 0 {
report = report.DropTopologiesOver(c.cfg.MaxTopNodes)
}
report = report.Upgrade()
c.inProcess.StoreReport(key, report)
reports = append(reports, report)
@@ -393,7 +389,7 @@ func (c *awsCollector) putItemInDynamo(rowKey, colKey, reportKey string) (*dynam
)
for {
resp, err = c.db.PutItem(&dynamodb.PutItemInput{
TableName: aws.String(c.tableName),
TableName: aws.String(c.cfg.DynamoTable),
Item: map[string]*dynamodb.AttributeValue{
hourField: {
S: aws.String(rowKey),
@@ -421,7 +417,7 @@ func (c *awsCollector) putItemInDynamo(rowKey, colKey, reportKey string) (*dynam
}
func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) error {
userid, err := c.userIDer(ctx)
userid, err := c.cfg.UserIDer(ctx)
if err != nil {
return err
}
@@ -433,15 +429,15 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) e
return err
}
reportSize, err := c.s3.StoreReportBytes(ctx, reportKey, buf)
reportSize, err := c.cfg.S3Store.StoreReportBytes(ctx, reportKey, buf)
if err != nil {
return err
}
reportSizeHistogram.Observe(float64(reportSize))
// third, put it in memcache
if c.memcache != nil {
_, err = c.memcache.StoreReportBytes(ctx, reportKey, buf)
if c.cfg.MemcacheClient != nil {
_, err = c.cfg.MemcacheClient.StoreReportBytes(ctx, reportKey, buf)
if err != nil {
// NOTE: We don't abort here because failing to store in memcache
// doesn't actually break anything else -- it's just an
@@ -481,7 +477,7 @@ func (c *awsCollector) Add(ctx context.Context, rep report.Report, buf []byte) e
}
func (c *awsCollector) WaitOn(ctx context.Context, waiter chan struct{}) {
userid, err := c.userIDer(ctx)
userid, err := c.cfg.UserIDer(ctx)
if err != nil {
log.Errorf("Error getting user id in WaitOn: %v", err)
return
@@ -522,7 +518,7 @@ func (c *awsCollector) WaitOn(ctx context.Context, waiter chan struct{}) {
}
func (c *awsCollector) UnWait(ctx context.Context, waiter chan struct{}) {
userid, err := c.userIDer(ctx)
userid, err := c.cfg.UserIDer(ctx)
if err != nil {
log.Errorf("Error getting user id in WaitOn: %v", err)
return

View File

@@ -82,7 +82,7 @@ func router(collector app.Collector, controlRouter app.ControlRouter, pipeRouter
}
func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHostname string,
memcacheConfig multitenant.MemcacheConfig, window time.Duration, createTables bool) (app.Collector, error) {
memcacheConfig multitenant.MemcacheConfig, window time.Duration, maxTopNodes int, createTables bool) (app.Collector, error) {
if collectorURL == "local" {
return app.NewCollector(window), nil
}
@@ -124,6 +124,7 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo
NatsHost: natsHostname,
MemcacheClient: memcacheClient,
Window: window,
MaxTopNodes: maxTopNodes,
},
)
if err != nil {
@@ -232,7 +233,7 @@ func appMain(flags appFlags) {
Service: flags.memcachedService,
CompressionLevel: flags.memcachedCompressionLevel,
},
flags.window, flags.awsCreateTables)
flags.window, flags.maxTopNodes, flags.awsCreateTables)
if err != nil {
log.Fatalf("Error creating collector: %v", err)
return

View File

@@ -141,6 +141,7 @@ type probeFlags struct {
type appFlags struct {
window time.Duration
maxTopNodes int
listen string
stopTimeout time.Duration
logLevel string
@@ -344,6 +345,7 @@ func setupFlags(flags *flags) {
// App flags
flag.DurationVar(&flags.app.window, "app.window", 15*time.Second, "window")
flag.IntVar(&flags.app.maxTopNodes, "app.max-topology-nodes", 10000, "drop topologies with more than this many nodes (0 to disable)")
flag.StringVar(&flags.app.listen, "app.http.address", ":"+strconv.Itoa(xfer.AppPort), "webserver listen address")
flag.DurationVar(&flags.app.stopTimeout, "app.stopTimeout", 5*time.Second, "How long to wait for http requests to finish when shutting down")
flag.StringVar(&flags.app.logLevel, "app.log.level", "info", "logging threshold level: debug|info|warn|error|fatal|panic")

View File

@@ -442,6 +442,18 @@ func (r Report) Validate() error {
return nil
}
// DropTopologiesOver - as a protection against overloading the app
// server, drop topologies that have really large node counts. In
// practice we only see this with runaway numbers of zombie processes.
func (r Report) DropTopologiesOver(limit int) Report {
r.WalkNamedTopologies(func(name string, topology *Topology) {
if topology != nil && len(topology.Nodes) > limit {
topology.Nodes = Nodes{}
}
})
return r
}
// Upgrade returns a new report based on a report received from the old probe.
//
func (r Report) Upgrade() Report {