Files
weave-scope/probe/probe.go
Bryan Boreham 951d7a1412 fix (probe): skip publishing empty reports
If the probe is configured with `spy.interval` greater than
`publish.interval`, the report to be published can be completely
blank.  In that case, skip publishing it so the next time we do have
some data the time-window is correct.
2020-03-27 09:59:09 +00:00

284 lines
6.7 KiB
Go

package probe
import (
"context"
"sync"
"time"
"github.com/armon/go-metrics"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/common/mtime"
"golang.org/x/time/rate"
"github.com/weaveworks/scope/report"
)
const (
spiedReportBufferSize = 16
shortcutReportBufferSize = 1024
)
// ReportPublisher publishes reports, probably to a remote collector.
type ReportPublisher interface {
Publish(r report.Report) error
}
// Probe sits there, generating and publishing reports.
type Probe struct {
spyInterval, publishInterval time.Duration
publisher ReportPublisher
rateLimiter *rate.Limiter
ticksPerFullReport int
noControls bool
tickers []Ticker
reporters []Reporter
taggers []Tagger
quit chan struct{}
done sync.WaitGroup
spiedReports chan report.Report
shortcutReports chan report.Report
}
// Tagger tags nodes with value-add node metadata.
type Tagger interface {
Name() string
Tag(r report.Report) (report.Report, error)
}
// Reporter generates Reports.
type Reporter interface {
Name() string
Report() (report.Report, error)
}
// ReporterFunc uses a function to implement a Reporter
func ReporterFunc(name string, f func() (report.Report, error)) Reporter {
return reporterFunc{name, f}
}
type reporterFunc struct {
name string
f func() (report.Report, error)
}
func (r reporterFunc) Name() string { return r.name }
func (r reporterFunc) Report() (report.Report, error) { return r.f() }
// Ticker is something which will be invoked every spyDuration.
// It's useful for things that should be updated on that interval.
// For example, cached shared state between Taggers and Reporters.
type Ticker interface {
Name() string
Tick() error
}
// New makes a new Probe.
func New(
spyInterval, publishInterval time.Duration,
publisher ReportPublisher,
ticksPerFullReport int,
noControls bool,
) *Probe {
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
rateLimiter: rate.NewLimiter(rate.Every(publishInterval/100), 1),
ticksPerFullReport: ticksPerFullReport,
noControls: noControls,
quit: make(chan struct{}),
spiedReports: make(chan report.Report, spiedReportBufferSize),
shortcutReports: make(chan report.Report, shortcutReportBufferSize),
}
return result
}
// AddTagger adds a new Tagger to the Probe
func (p *Probe) AddTagger(ts ...Tagger) {
p.taggers = append(p.taggers, ts...)
}
// AddReporter adds a new Reported to the Probe
func (p *Probe) AddReporter(rs ...Reporter) {
p.reporters = append(p.reporters, rs...)
}
// AddTicker adds a new Ticker to the Probe
func (p *Probe) AddTicker(ts ...Ticker) {
p.tickers = append(p.tickers, ts...)
}
// Start starts the probe
func (p *Probe) Start() {
p.done.Add(2)
go p.spyLoop()
go p.publishLoop()
}
// Stop stops the probe
func (p *Probe) Stop() error {
close(p.quit)
p.done.Wait()
return nil
}
// Publish will queue a report for immediate publication,
// bypassing the spy tick
func (p *Probe) Publish(rpt report.Report) {
rpt = p.tag(rpt)
p.shortcutReports <- rpt
}
func (p *Probe) spyLoop() {
defer p.done.Done()
spyTick := time.Tick(p.spyInterval)
for {
select {
case <-spyTick:
p.tick()
rpt := p.report()
rpt = p.tag(rpt)
p.spiedReports <- rpt
case <-p.quit:
return
}
}
}
func (p *Probe) tick() {
for _, ticker := range p.tickers {
t := time.Now()
err := ticker.Tick()
metrics.MeasureSinceWithLabels([]string{"duration", "seconds"}, t, []metrics.Label{
{Name: "operation", Value: "ticker"},
{Name: "module", Value: ticker.Name()},
})
if err != nil {
log.Errorf("Error doing ticker: %v", err)
}
}
}
func (p *Probe) report() report.Report {
reports := make(chan report.Report, len(p.reporters))
for _, rep := range p.reporters {
go func(rep Reporter) {
t := time.Now()
timer := time.AfterFunc(p.spyInterval, func() { log.Warningf("%v reporter took longer than %v", rep.Name(), p.spyInterval) })
newReport, err := rep.Report()
if !timer.Stop() {
log.Warningf("%v reporter took %v (longer than %v)", rep.Name(), time.Now().Sub(t), p.spyInterval)
}
metrics.MeasureSinceWithLabels([]string{"duration", "seconds"}, t, []metrics.Label{
{Name: "operation", Value: "reporter"},
{Name: "module", Value: rep.Name()},
})
if err != nil {
log.Errorf("Error generating %s report: %v", rep.Name(), err)
newReport = report.MakeReport() // empty is OK to merge
}
reports <- newReport
}(rep)
}
result := report.MakeReport()
result.TS = mtime.Now()
for i := 0; i < cap(reports); i++ {
result.UnsafeMerge(<-reports)
}
return result
}
func (p *Probe) tag(r report.Report) report.Report {
var err error
for _, tagger := range p.taggers {
t := time.Now()
timer := time.AfterFunc(p.spyInterval, func() { log.Warningf("%v tagger took longer than %v", tagger.Name(), p.spyInterval) })
r, err = tagger.Tag(r)
if !timer.Stop() {
log.Warningf("%v tagger took %v (longer than %v)", tagger.Name(), time.Now().Sub(t), p.spyInterval)
}
metrics.MeasureSinceWithLabels([]string{"duration", "seconds"}, t, []metrics.Label{
{Name: "operation", Value: "tagger"},
{Name: "module", Value: tagger.Name()},
})
if err != nil {
log.Errorf("Error applying tagger: %v", err)
}
}
return r
}
func (p *Probe) drainAndSanitise(rpt report.Report, rs chan report.Report) (report.Report, int) {
p.rateLimiter.Wait(context.Background())
rpt = rpt.Copy()
count := 0
ForLoop:
for {
select {
case r := <-rs:
rpt.UnsafeMerge(r)
count++
default:
break ForLoop
}
}
if p.noControls {
rpt.WalkTopologies(func(t *report.Topology) {
t.Controls = report.Controls{}
})
}
return rpt, count
}
func (p *Probe) publishLoop() {
defer p.done.Done()
startTime := mtime.Now()
pubTick := time.Tick(p.publishInterval)
publishCount := 0
var lastFullReport report.Report
for {
var err error
select {
case <-pubTick:
rpt, count := p.drainAndSanitise(report.MakeReport(), p.spiedReports)
if count == 0 {
continue // No data has been collected - don't bother publishing.
}
fullReport := (publishCount % p.ticksPerFullReport) == 0
if !fullReport {
rpt.UnsafeUnMerge(lastFullReport)
}
rpt.Window = mtime.Now().Sub(startTime)
startTime = mtime.Now()
err = p.publisher.Publish(rpt)
if err == nil {
if fullReport {
lastFullReport = rpt
}
publishCount++
} else {
// If we failed to send then drop back to full report next time
publishCount = 0
}
case rpt := <-p.shortcutReports:
rpt, _ = p.drainAndSanitise(rpt, p.shortcutReports)
err = p.publisher.Publish(rpt)
case <-p.quit:
return
}
if err != nil {
log.Infof("Publish: %v", err)
}
}
}