mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 02:00:43 +00:00
Merge pull request #2301 from weaveworks/report-playback
report playback
This commit is contained in:
@@ -22,7 +22,7 @@ func loadReport() (report.Report, error) {
|
||||
return fixture.Report, nil
|
||||
}
|
||||
|
||||
c, err := NewFileCollector(*benchReportFile)
|
||||
c, err := NewFileCollector(*benchReportFile, 0)
|
||||
if err != nil {
|
||||
return fixture.Report, err
|
||||
}
|
||||
|
||||
103
app/collector.go
103
app/collector.go
@@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -161,17 +162,74 @@ func (c StaticCollector) WaitOn(context.Context, chan struct{}) {}
|
||||
// implements Reporter.
|
||||
func (c StaticCollector) UnWait(context.Context, chan struct{}) {}
|
||||
|
||||
// NewFileCollector reads and parses the given path, returning a collector
|
||||
// which always returns that report.
|
||||
func NewFileCollector(path string) (Collector, error) {
|
||||
// NewFileCollector reads and parses the files at path (a file or
|
||||
// directory) as reports. If there are multiple files, and they all
|
||||
// have names representing "nanoseconds since epoch" timestamps,
|
||||
// e.g. "1488557088545489008.msgpack.gz", then the collector will
|
||||
// return merged reports resulting from replaying the file reports in
|
||||
// a loop at a sequence and speed determined by the timestamps.
|
||||
// Otherwise the collector always returns the merger of all reports.
|
||||
func NewFileCollector(path string, window time.Duration) (Collector, error) {
|
||||
var (
|
||||
timestamps []time.Time
|
||||
reports []report.Report
|
||||
)
|
||||
allTimestamped := true
|
||||
if err := filepath.Walk(path,
|
||||
func(p string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
t, err := timestampFromFilepath(p)
|
||||
if err != nil {
|
||||
allTimestamped = false
|
||||
}
|
||||
timestamps = append(timestamps, t)
|
||||
|
||||
rpt, err := readReport(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reports = append(reports, rpt)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(reports) > 1 && allTimestamped {
|
||||
collector := NewCollector(window)
|
||||
go replay(collector, timestamps, reports)
|
||||
return collector, nil
|
||||
}
|
||||
return StaticCollector(NewSmartMerger().Merge(reports).Upgrade()), nil
|
||||
}
|
||||
|
||||
func timestampFromFilepath(path string) (time.Time, error) {
|
||||
name := filepath.Base(path)
|
||||
for {
|
||||
ext := filepath.Ext(name)
|
||||
if ext == "" {
|
||||
break
|
||||
}
|
||||
name = strings.TrimSuffix(name, ext)
|
||||
}
|
||||
nanosecondsSinceEpoch, err := strconv.ParseInt(name, 10, 64)
|
||||
if err != nil {
|
||||
return time.Time{}, fmt.Errorf("filename '%s' is not a number (representing nanoseconds since epoch): %v", name, err)
|
||||
}
|
||||
return time.Unix(0, nanosecondsSinceEpoch), nil
|
||||
}
|
||||
|
||||
func readReport(path string) (rpt report.Report, _ error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return rpt, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var (
|
||||
rpt report.Report
|
||||
handle codec.Handle
|
||||
gzipped bool
|
||||
)
|
||||
@@ -186,12 +244,37 @@ func NewFileCollector(path string) (Collector, error) {
|
||||
case ".msgpack":
|
||||
handle = &codec.MsgpackHandle{}
|
||||
default:
|
||||
return nil, fmt.Errorf("Unsupported file extension: %v", fileType)
|
||||
return rpt, fmt.Errorf("Unsupported file extension: %v", fileType)
|
||||
}
|
||||
|
||||
if err := rpt.ReadBinary(f, gzipped, handle); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = rpt.ReadBinary(f, gzipped, handle)
|
||||
|
||||
return StaticCollector(rpt), nil
|
||||
return rpt, err
|
||||
}
|
||||
|
||||
func replay(a Adder, timestamps []time.Time, reports []report.Report) {
|
||||
// calculate delays between report n and n+1
|
||||
l := len(timestamps)
|
||||
delays := make([]time.Duration, l, l)
|
||||
for i, t := range timestamps[0 : l-1] {
|
||||
delays[i] = timestamps[i+1].Sub(t)
|
||||
if delays[i] < 0 {
|
||||
panic(fmt.Errorf("replay timestamps are not in order! %v", timestamps))
|
||||
}
|
||||
}
|
||||
// We don't know how long to wait before looping round, so make a
|
||||
// good guess.
|
||||
delays[l-1] = timestamps[l-1].Sub(timestamps[0]) / time.Duration(l)
|
||||
|
||||
due := time.Now()
|
||||
for {
|
||||
for i, r := range reports {
|
||||
a.Add(nil, r, nil)
|
||||
due = due.Add(delays[i])
|
||||
delay := due.Sub(time.Now())
|
||||
if delay > 0 {
|
||||
time.Sleep(delay)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,7 +100,7 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo
|
||||
|
||||
switch parsed.Scheme {
|
||||
case "file":
|
||||
return app.NewFileCollector(parsed.Path)
|
||||
return app.NewFileCollector(parsed.Path, window)
|
||||
case "dynamodb":
|
||||
s3, err := url.Parse(s3URL)
|
||||
if err != nil {
|
||||
|
||||
@@ -326,7 +326,7 @@ func main() {
|
||||
flag.Var(&containerLabelFilterFlags, "app.container-label-filter", "Add container label-based view filter, specified as title:label. Multiple flags are accepted. Example: --app.container-label-filter='Database Containers:role=db'")
|
||||
flag.Var(&containerLabelFilterFlagsExclude, "app.container-label-filter-exclude", "Add container label-based view filter that excludes containers with the given label, specified as title:label. Multiple flags are accepted. Example: --app.container-label-filter-exclude='Database Containers:role=db'")
|
||||
|
||||
flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, dynamodb, or file)")
|
||||
flag.StringVar(&flags.app.collectorURL, "app.collector", "local", "Collector to use (local, dynamodb, or file/directory)")
|
||||
flag.StringVar(&flags.app.s3URL, "app.collector.s3", "local", "S3 URL to use (when collector is dynamodb)")
|
||||
flag.StringVar(&flags.app.controlRouterURL, "app.control.router", "local", "Control router to use (local or sqs)")
|
||||
flag.StringVar(&flags.app.pipeRouterURL, "app.pipe.router", "local", "Pipe router to use (local)")
|
||||
|
||||
Reference in New Issue
Block a user