mirror of
https://github.com/weaveworks/scope.git
synced 2026-02-14 18:09:59 +00:00
We call `UnsafeRemovePartMergedNodes()` which modifies the data, so all implementations of Report() must ensure they return a new object, not one which is cached or shared across goroutines.
368 lines
10 KiB
Go
368 lines
10 KiB
Go
package app
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"context"
|
|
|
|
"github.com/weaveworks/common/mtime"
|
|
"github.com/weaveworks/scope/report"
|
|
)
|
|
|
|
// We merge all reports received within the specified interval, and
|
|
// discard the orignals. Higher figures improve the performance of
|
|
// Report(), but at the expense of lower time resolution, since time
|
|
// is effectively advancing in quantiles.
|
|
//
|
|
// The current figure is identical to the default
|
|
// probe.publishInterval, which results in performance improvements
|
|
// as soon as there is more than one probe.
|
|
const reportQuantisationInterval = 3 * time.Second
|
|
|
|
// Reporter is something that can produce reports on demand. It's a convenient
|
|
// interface for parts of the app, and several experimental components.
|
|
type Reporter interface {
|
|
Report(context.Context, time.Time) (report.Report, error) // must return an object that is OK to modify
|
|
HasReports(context.Context, time.Time) (bool, error)
|
|
HasHistoricReports() bool
|
|
AdminSummary(context.Context, time.Time) (string, error)
|
|
WaitOn(context.Context, chan struct{})
|
|
UnWait(context.Context, chan struct{})
|
|
}
|
|
|
|
// WebReporter is a reporter that creates reports whose data is eventually
|
|
// displayed on websites. It carries fields that will be forwarded to the
|
|
// detailed.RenderContext
|
|
type WebReporter struct {
|
|
Reporter
|
|
MetricsGraphURL string
|
|
}
|
|
|
|
// Adder is something that can accept reports. It's a convenient interface for
|
|
// parts of the app, and several experimental components. It takes the following
|
|
// arguments:
|
|
// - context.Context: the request context
|
|
// - report.Report: the deserialised report
|
|
// - []byte: the serialised report (as gzip'd msgpack)
|
|
type Adder interface {
|
|
Add(context.Context, report.Report, []byte) error
|
|
}
|
|
|
|
// A Collector is a Reporter and an Adder
|
|
type Collector interface {
|
|
Reporter
|
|
Adder
|
|
Close()
|
|
}
|
|
|
|
// Collector receives published reports from multiple producers. It yields a
|
|
// single merged report, representing all collected reports.
|
|
type collector struct {
|
|
mtx sync.Mutex
|
|
reports []report.Report
|
|
timestamps []time.Time
|
|
window time.Duration
|
|
cached *report.Report
|
|
merger Merger
|
|
waitableCondition
|
|
}
|
|
|
|
type waitableCondition struct {
|
|
sync.Mutex
|
|
waiters map[chan struct{}]struct{}
|
|
}
|
|
|
|
func (wc *waitableCondition) WaitOn(_ context.Context, waiter chan struct{}) {
|
|
wc.Lock()
|
|
wc.waiters[waiter] = struct{}{}
|
|
wc.Unlock()
|
|
}
|
|
|
|
func (wc *waitableCondition) UnWait(_ context.Context, waiter chan struct{}) {
|
|
wc.Lock()
|
|
delete(wc.waiters, waiter)
|
|
wc.Unlock()
|
|
}
|
|
|
|
func (wc *waitableCondition) Broadcast() {
|
|
wc.Lock()
|
|
for waiter := range wc.waiters {
|
|
// Non-block write to channel
|
|
select {
|
|
case waiter <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
wc.Unlock()
|
|
}
|
|
|
|
// NewCollector returns a collector ready for use.
|
|
func NewCollector(window time.Duration) Collector {
|
|
return &collector{
|
|
window: window,
|
|
waitableCondition: waitableCondition{
|
|
waiters: map[chan struct{}]struct{}{},
|
|
},
|
|
merger: NewFastMerger(),
|
|
}
|
|
}
|
|
|
|
// Close is a no-op for the regular collector
|
|
func (c *collector) Close() {}
|
|
|
|
// Add adds a report to the collector's internal state. It implements Adder.
|
|
func (c *collector) Add(_ context.Context, rpt report.Report, _ []byte) error {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
c.reports = append(c.reports, rpt)
|
|
c.timestamps = append(c.timestamps, mtime.Now())
|
|
|
|
c.clean()
|
|
c.cached = nil
|
|
if rpt.Shortcut {
|
|
c.Broadcast()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Report returns a merged report over all added reports. It implements
|
|
// Reporter.
|
|
// Note we copy return a copy in case callers modify the data.
|
|
func (c *collector) Report(_ context.Context, timestamp time.Time) (report.Report, error) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
// If the oldest report is still within range,
|
|
// and there is a cached report, return that.
|
|
if c.cached != nil && len(c.reports) > 0 {
|
|
oldest := timestamp.Add(-c.window)
|
|
if c.timestamps[0].After(oldest) {
|
|
return c.cached.Copy(), nil
|
|
}
|
|
}
|
|
|
|
c.clean()
|
|
c.quantise()
|
|
|
|
for i := range c.reports {
|
|
c.reports[i] = c.reports[i].Upgrade()
|
|
}
|
|
|
|
rpt := c.merger.Merge(c.reports)
|
|
c.cached = &rpt
|
|
return rpt.Copy(), nil
|
|
}
|
|
|
|
// HasReports indicates whether the collector contains reports between
|
|
// timestamp-app.window and timestamp.
|
|
func (c *collector) HasReports(ctx context.Context, timestamp time.Time) (bool, error) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
if len(c.timestamps) < 1 {
|
|
return false, nil
|
|
}
|
|
|
|
return !c.timestamps[0].After(timestamp) && !c.timestamps[len(c.reports)-1].Before(timestamp.Add(-c.window)), nil
|
|
}
|
|
|
|
// HasHistoricReports indicates whether the collector contains reports
|
|
// older than now-app.window.
|
|
func (c *collector) HasHistoricReports() bool {
|
|
return false
|
|
}
|
|
|
|
// AdminSummary returns a string with some internal information about
|
|
// the report, which may be useful to troubleshoot.
|
|
func (c *collector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
var b strings.Builder
|
|
for i := range c.reports {
|
|
fmt.Fprintf(&b, "%v: ", c.timestamps[i].Format(time.StampMilli))
|
|
b.WriteString(c.reports[i].Summary())
|
|
b.WriteByte('\n')
|
|
}
|
|
return b.String(), nil
|
|
}
|
|
|
|
// remove reports older than the app.window
|
|
func (c *collector) clean() {
|
|
var (
|
|
cleanedReports = make([]report.Report, 0, len(c.reports))
|
|
cleanedTimestamps = make([]time.Time, 0, len(c.timestamps))
|
|
oldest = mtime.Now().Add(-c.window)
|
|
)
|
|
for i, r := range c.reports {
|
|
if c.timestamps[i].After(oldest) {
|
|
cleanedReports = append(cleanedReports, r)
|
|
cleanedTimestamps = append(cleanedTimestamps, c.timestamps[i])
|
|
}
|
|
}
|
|
c.reports = cleanedReports
|
|
c.timestamps = cleanedTimestamps
|
|
}
|
|
|
|
// Merge reports received within the same reportQuantisationInterval.
|
|
//
|
|
// Quantisation is relative to the time of the first report in a given
|
|
// interval, rather than absolute time. So, for example, with a
|
|
// reportQuantisationInterval of 3s and reports with timestamps [0, 1,
|
|
// 2, 5, 6, 7], the result contains merged reports with
|
|
// timestamps/content of [0:{0,1,2}, 5:{5,6,7}].
|
|
func (c *collector) quantise() {
|
|
if len(c.reports) == 0 {
|
|
return
|
|
}
|
|
var (
|
|
quantisedReports = make([]report.Report, 0, len(c.reports))
|
|
quantisedTimestamps = make([]time.Time, 0, len(c.timestamps))
|
|
)
|
|
quantumStartIdx := 0
|
|
quantumStartTimestamp := c.timestamps[0]
|
|
for i, t := range c.timestamps {
|
|
if t.Sub(quantumStartTimestamp) < reportQuantisationInterval {
|
|
continue
|
|
}
|
|
quantisedReports = append(quantisedReports, c.merger.Merge(c.reports[quantumStartIdx:i]))
|
|
quantisedTimestamps = append(quantisedTimestamps, quantumStartTimestamp)
|
|
quantumStartIdx = i
|
|
quantumStartTimestamp = t
|
|
}
|
|
c.reports = append(quantisedReports, c.merger.Merge(c.reports[quantumStartIdx:]))
|
|
c.timestamps = append(quantisedTimestamps, c.timestamps[quantumStartIdx])
|
|
}
|
|
|
|
// StaticCollector always returns the given report.
|
|
type StaticCollector report.Report
|
|
|
|
// Report returns a merged report over all added reports. It implements
|
|
// Reporter.
|
|
func (c StaticCollector) Report(context.Context, time.Time) (report.Report, error) {
|
|
return report.Report(c).Copy(), nil
|
|
}
|
|
|
|
// Close is a no-op for the static collector
|
|
func (c StaticCollector) Close() {}
|
|
|
|
// HasReports indicates whether the collector contains reports between
|
|
// timestamp-app.window and timestamp.
|
|
func (c StaticCollector) HasReports(context.Context, time.Time) (bool, error) {
|
|
return true, nil
|
|
}
|
|
|
|
// HasHistoricReports indicates whether the collector contains reports
|
|
// older than now-app.window.
|
|
func (c StaticCollector) HasHistoricReports() bool {
|
|
return false
|
|
}
|
|
|
|
// AdminSummary implements Reporter
|
|
func (c StaticCollector) AdminSummary(ctx context.Context, timestamp time.Time) (string, error) {
|
|
return "not implemented", nil
|
|
}
|
|
|
|
// Add adds a report to the collector's internal state. It implements Adder.
|
|
func (c StaticCollector) Add(context.Context, report.Report, []byte) error { return nil }
|
|
|
|
// WaitOn lets other components wait on a new report being received. It
|
|
// implements Reporter.
|
|
func (c StaticCollector) WaitOn(context.Context, chan struct{}) {}
|
|
|
|
// UnWait lets other components stop waiting on a new report being received. It
|
|
// implements Reporter.
|
|
func (c StaticCollector) UnWait(context.Context, chan struct{}) {}
|
|
|
|
// NewFileCollector reads and parses the files at path (a file or
|
|
// directory) as reports. If there are multiple files, and they all
|
|
// have names representing "nanoseconds since epoch" timestamps,
|
|
// e.g. "1488557088545489008.msgpack.gz", then the collector will
|
|
// return merged reports resulting from replaying the file reports in
|
|
// a loop at a sequence and speed determined by the timestamps.
|
|
// Otherwise the collector always returns the merger of all reports.
|
|
func NewFileCollector(path string, window time.Duration) (Collector, error) {
|
|
var (
|
|
timestamps []time.Time
|
|
reports []report.Report
|
|
)
|
|
allTimestamped := true
|
|
if err := filepath.Walk(path,
|
|
func(p string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if info.IsDir() {
|
|
return nil
|
|
}
|
|
t, err := timestampFromFilepath(p)
|
|
if err != nil {
|
|
allTimestamped = false
|
|
}
|
|
timestamps = append(timestamps, t)
|
|
|
|
rpt, err := report.MakeFromFile(context.Background(), p)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reports = append(reports, *rpt)
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
if len(reports) > 1 && allTimestamped {
|
|
collector := NewCollector(window)
|
|
go replay(collector, timestamps, reports)
|
|
return collector, nil
|
|
}
|
|
return StaticCollector(NewFastMerger().Merge(reports).Upgrade()), nil
|
|
}
|
|
|
|
func timestampFromFilepath(path string) (time.Time, error) {
|
|
name := filepath.Base(path)
|
|
for {
|
|
ext := filepath.Ext(name)
|
|
if ext == "" {
|
|
break
|
|
}
|
|
name = strings.TrimSuffix(name, ext)
|
|
}
|
|
nanosecondsSinceEpoch, err := strconv.ParseInt(name, 10, 64)
|
|
if err != nil {
|
|
return time.Time{}, fmt.Errorf("filename '%s' is not a number (representing nanoseconds since epoch): %v", name, err)
|
|
}
|
|
return time.Unix(0, nanosecondsSinceEpoch), nil
|
|
}
|
|
|
|
func replay(a Adder, timestamps []time.Time, reports []report.Report) {
|
|
// calculate delays between report n and n+1
|
|
l := len(timestamps)
|
|
delays := make([]time.Duration, l, l)
|
|
for i, t := range timestamps[0 : l-1] {
|
|
delays[i] = timestamps[i+1].Sub(t)
|
|
if delays[i] < 0 {
|
|
panic(fmt.Errorf("replay timestamps are not in order! %v", timestamps))
|
|
}
|
|
}
|
|
// We don't know how long to wait before looping round, so make a
|
|
// good guess.
|
|
delays[l-1] = timestamps[l-1].Sub(timestamps[0]) / time.Duration(l)
|
|
|
|
due := time.Now()
|
|
for {
|
|
for i, r := range reports {
|
|
a.Add(nil, r, nil)
|
|
due = due.Add(delays[i])
|
|
delay := due.Sub(time.Now())
|
|
if delay > 0 {
|
|
time.Sleep(delay)
|
|
}
|
|
}
|
|
}
|
|
}
|