Move probe main.go to prog/probe/, break out a probe struct with appropriate responsibilities.

Also adds test for probe 'engine'
This commit is contained in:
Paul Bellamy
2015-10-26 10:41:51 +00:00
committed by Tom Wilkie
parent 8dbc586866
commit 92ec7d9397
14 changed files with 337 additions and 257 deletions

1
.gitignore vendored
View File

@@ -38,6 +38,7 @@ app/app
app/scope-app
probe/probe
probe/scope-probe
prog/probe/scope-probe
docker/scope-app
docker/scope-probe
docker/docker*

View File

@@ -4,7 +4,7 @@
SUDO=sudo -E
DOCKERHUB_USER=weaveworks
APP_EXE=app/scope-app
PROBE_EXE=probe/scope-probe
PROBE_EXE=prog/probe/scope-probe
FIXPROBE_EXE=experimental/fixprobe/fixprobe
SCOPE_IMAGE=$(DOCKERHUB_USER)/scope
SCOPE_EXPORT=scope.tar

View File

@@ -45,9 +45,9 @@ test:
parallel: true
- cd $SRCDIR; make RM= static:
parallel: true
- cd $SRCDIR; rm -f app/scope-app probe/scope-probe; if [ "$CIRCLE_NODE_INDEX" = "0" ]; then GOARCH=arm make RM= app/scope-app probe/scope-probe; else GOOS=darwin make RM= app/scope-app probe/scope-probe; fi:
- cd $SRCDIR; rm -f app/scope-app prog/probe/scope-probe; if [ "$CIRCLE_NODE_INDEX" = "0" ]; then GOARCH=arm make RM= app/scope-app prog/probe/scope-probe; else GOOS=darwin make RM= app/scope-app prog/probe/scope-probe; fi:
parallel: true
- cd $SRCDIR; rm -f app/scope-app probe/scope-probe; make RM=:
- cd $SRCDIR; rm -f app/scope-app prog/probe/scope-probe; make RM=:
parallel: true
- cd $SRCDIR/experimental; ./build_on_circle.sh:
parallel: true

View File

@@ -1,8 +1,9 @@
package main
package probe
import "os"
func hostname() string {
// Hostname returns the hostname of this host.
func Hostname() string {
if hostname := os.Getenv("SCOPE_HOSTNAME"); hostname != "" {
return hostname
}

View File

@@ -1,27 +0,0 @@
package main
import (
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/scope/probe/endpoint"
)
var (
publishTicks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "scope",
Subsystem: "probe",
Name: "publish_ticks",
Help: "Number of publish ticks observed.",
},
[]string{},
)
)
func makePrometheusHandler() http.Handler {
prometheus.MustRegister(publishTicks)
prometheus.MustRegister(endpoint.SpyDuration)
return prometheus.Handler()
}

162
probe/probe.go Normal file
View File

@@ -0,0 +1,162 @@
package probe
import (
"log"
"sync"
"time"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)
// Probe sits there, generating and publishing reports.
type Probe struct {
spyInterval, publishInterval time.Duration
publisher xfer.Publisher
tickers []Ticker
reporters []Reporter
taggers []Tagger
quit chan struct{}
done sync.WaitGroup
rpt syncReport
}
// Tagger tags nodes with value-add node metadata.
type Tagger interface {
Tag(r report.Report) (report.Report, error)
}
// Reporter generates Reports.
type Reporter interface {
Report() (report.Report, error)
}
// Ticker is something which will be invoked every spyDuration.
// It's useful for things that should be updated on that interval.
// For example, cached shared state between Taggers and Reporters.
type Ticker interface {
Tick() error
}
// New makes a new Probe.
func New(spyInterval, publishInterval time.Duration, publisher xfer.Publisher) *Probe {
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
quit: make(chan struct{}),
}
result.rpt.swap(report.MakeReport())
return result
}
// AddTagger adds a new Tagger to the Probe
func (p *Probe) AddTagger(ts ...Tagger) {
p.taggers = append(p.taggers, ts...)
}
// AddReporter adds a new Reported to the Probe
func (p *Probe) AddReporter(rs ...Reporter) {
p.reporters = append(p.reporters, rs...)
}
// AddTicker adds a new Ticker to the Probe
func (p *Probe) AddTicker(ts ...Ticker) {
p.tickers = append(p.tickers, ts...)
}
// Start starts the probe
func (p *Probe) Start() {
p.done.Add(2)
go p.spyLoop()
go p.publishLoop()
}
// Stop stops the probe
func (p *Probe) Stop() {
close(p.quit)
p.done.Wait()
}
func (p *Probe) spyLoop() {
defer p.done.Done()
spyTick := time.Tick(p.spyInterval)
for {
select {
case <-spyTick:
start := time.Now()
for _, ticker := range p.tickers {
if err := ticker.Tick(); err != nil {
log.Printf("error doing ticker: %v", err)
}
}
localReport := p.rpt.copy()
localReport = localReport.Merge(p.report())
localReport = p.tag(localReport)
p.rpt.swap(localReport)
if took := time.Since(start); took > p.spyInterval {
log.Printf("report generation took too long (%s)", took)
}
case <-p.quit:
return
}
}
}
func (p *Probe) report() report.Report {
reports := make(chan report.Report, len(p.reporters))
for _, rep := range p.reporters {
go func(rep Reporter) {
newReport, err := rep.Report()
if err != nil {
log.Printf("error generating report: %v", err)
newReport = report.MakeReport() // empty is OK to merge
}
reports <- newReport
}(rep)
}
result := report.MakeReport()
for i := 0; i < cap(reports); i++ {
result = result.Merge(<-reports)
}
return result
}
func (p *Probe) tag(r report.Report) report.Report {
var err error
for _, tagger := range p.taggers {
r, err = tagger.Tag(r)
if err != nil {
log.Printf("error applying tagger: %v", err)
}
}
return r
}
func (p *Probe) publishLoop() {
defer p.done.Done()
var (
pubTick = time.Tick(p.publishInterval)
rptPub = xfer.NewReportPublisher(p.publisher)
)
for {
select {
case <-pubTick:
localReport := p.rpt.swap(report.MakeReport())
if err := rptPub.Publish(localReport); err != nil {
log.Printf("publish: %v", err)
}
case <-p.quit:
return
}
}
}

View File

@@ -0,0 +1,86 @@
package probe
import (
"compress/gzip"
"encoding/gob"
"io"
"reflect"
"testing"
"time"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)
func TestApply(t *testing.T) {
var (
endpointNodeID = "c"
addressNodeID = "d"
endpointNode = report.MakeNodeWith(map[string]string{"5": "6"})
addressNode = report.MakeNodeWith(map[string]string{"7": "8"})
)
p := New(0, 0, nil)
p.AddTagger(NewTopologyTagger())
r := report.MakeReport()
r.Endpoint.AddNode(endpointNodeID, endpointNode)
r.Address.AddNode(addressNodeID, addressNode)
r = p.tag(r)
for _, tuple := range []struct {
want report.Node
from report.Topology
via string
}{
{endpointNode.Merge(report.MakeNodeWith(map[string]string{"topology": "endpoint"})), r.Endpoint, endpointNodeID},
{addressNode.Merge(report.MakeNodeWith(map[string]string{"topology": "address"})), r.Address, addressNodeID},
} {
if want, have := tuple.want, tuple.from.Nodes[tuple.via]; !reflect.DeepEqual(want, have) {
t.Errorf("want %+v, have %+v", want, have)
}
}
}
type mockReporter struct {
r report.Report
}
func (m mockReporter) Report() (report.Report, error) {
return m.r.Copy(), nil
}
type mockPublisher struct {
have chan report.Report
}
func (m mockPublisher) Publish(in io.Reader) error {
var r report.Report
if reader, err := gzip.NewReader(in); err != nil {
return err
} else if err := gob.NewDecoder(reader).Decode(&r); err != nil {
return err
}
m.have <- r
return nil
}
func (m mockPublisher) Stop() {
close(m.have)
}
func TestProbe(t *testing.T) {
want := report.MakeReport()
node := report.MakeNodeWith(map[string]string{"b": "c"})
want.Endpoint.AddNode("a", node)
pub := mockPublisher{make(chan report.Report)}
p := New(10*time.Millisecond, 100*time.Millisecond, pub)
p.AddReporter(mockReporter{want})
p.Start()
defer p.Stop()
test.Poll(t, 300*time.Millisecond, want, func() interface{} {
return <-pub.have
})
}

26
probe/sync_report.go Normal file
View File

@@ -0,0 +1,26 @@
package probe
import (
"sync"
"github.com/weaveworks/scope/report"
)
type syncReport struct {
mtx sync.RWMutex
rpt report.Report
}
func (r *syncReport) swap(other report.Report) report.Report {
r.mtx.Lock()
defer r.mtx.Unlock()
old := r.rpt
r.rpt = other
return old
}
func (r *syncReport) copy() report.Report {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.rpt.Copy()
}

View File

@@ -1,45 +0,0 @@
package main
import (
"reflect"
"testing"
"github.com/weaveworks/scope/report"
)
func TestApply(t *testing.T) {
var (
endpointNodeID = "c"
addressNodeID = "d"
endpointNode = report.MakeNodeWith(map[string]string{"5": "6"})
addressNode = report.MakeNodeWith(map[string]string{"7": "8"})
)
r := report.MakeReport()
r.Endpoint.AddNode(endpointNodeID, endpointNode)
r.Address.AddNode(addressNodeID, addressNode)
r = Apply(r, []Tagger{newTopologyTagger()})
for _, tuple := range []struct {
want report.Node
from report.Topology
via string
}{
{endpointNode.Merge(report.MakeNodeWith(map[string]string{"topology": "endpoint"})), r.Endpoint, endpointNodeID},
{addressNode.Merge(report.MakeNodeWith(map[string]string{"topology": "address"})), r.Address, addressNodeID},
} {
if want, have := tuple.want, tuple.from.Nodes[tuple.via]; !reflect.DeepEqual(want, have) {
t.Errorf("want %+v, have %+v", want, have)
}
}
}
func TestTagMissingID(t *testing.T) {
const nodeID = "not-found"
r := report.MakeReport()
rpt, _ := newTopologyTagger().Tag(r)
_, ok := rpt.Endpoint.Nodes[nodeID]
if ok {
t.Error("TopologyTagger erroneously tagged a missing node ID")
}
}

View File

@@ -1,40 +1,9 @@
package main
package probe
import (
"log"
"github.com/weaveworks/scope/report"
)
// Tagger tags nodes with value-add node metadata.
type Tagger interface {
Tag(r report.Report) (report.Report, error)
}
// Reporter generates Reports.
type Reporter interface {
Report() (report.Report, error)
}
// Ticker is something which will be invoked every spyDuration.
// It's useful for things that should be updated on that interval.
// For example, cached shared state between Taggers and Reporters.
type Ticker interface {
Tick() error
}
// Apply tags the report with all the taggers.
func Apply(r report.Report, taggers []Tagger) report.Report {
var err error
for _, tagger := range taggers {
r, err = tagger.Tag(r)
if err != nil {
log.Printf("error applying tagger: %v", err)
}
}
return r
}
// Topology is the Node key for the origin topology.
const Topology = "topology"
@@ -42,7 +11,7 @@ type topologyTagger struct{}
// NewTopologyTagger tags each node with the topology that it comes from. It's
// kind of a proof-of-concept tagger, useful primarily for debugging.
func newTopologyTagger() Tagger {
func NewTopologyTagger() Tagger {
return &topologyTagger{}
}

View File

@@ -0,0 +1,17 @@
package probe
import (
"testing"
"github.com/weaveworks/scope/report"
)
func TestTagMissingID(t *testing.T) {
const nodeID = "not-found"
r := report.MakeReport()
rpt, _ := NewTopologyTagger().Tag(r)
_, ok := rpt.Endpoint.Nodes[nodeID]
if ok {
t.Error("TopologyTagger erroneously tagged a missing node ID")
}
}

View File

@@ -12,10 +12,10 @@ import (
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/weaveworks/scope/probe"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/endpoint"
@@ -36,7 +36,6 @@ func main() {
httpListen = flag.String("http.listen", "", "listen address for HTTP profiling and instrumentation server")
publishInterval = flag.Duration("publish.interval", 3*time.Second, "publish (output) interval")
spyInterval = flag.Duration("spy.interval", time.Second, "spy (scan) interval")
prometheusEndpoint = flag.String("prometheus.endpoint", "/metrics", "Prometheus metrics exposition endpoint (requires -http.listen)")
spyProcs = flag.Bool("processes", true, "report processes (needs root)")
dockerEnabled = flag.Bool("docker", false, "collect Docker-related attributes for processes")
dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes")
@@ -72,7 +71,7 @@ func main() {
rand.Seed(time.Now().UnixNano())
probeID := strconv.FormatInt(rand.Int63(), 16)
var (
hostName = hostname()
hostName = probe.Hostname()
hostID = hostName // TODO(pb): we should sanitize the hostname
)
log.Printf("probe starting, version %s, ID %s", version, probeID)
@@ -112,49 +111,39 @@ func main() {
}, xfer.ControlHandlerFunc(controls.HandleControlRequest), xfer.NewAppClient)
defer clients.Stop()
resolver := newStaticResolver(targets, publishers.Set, clients.Set)
resolver := xfer.NewStaticResolver(targets, publishers.Set, clients.Set)
defer resolver.Stop()
endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
defer endpointReporter.Stop()
processCache := process.NewCachingWalker(process.NewWalker(*procRoot))
var (
tickers = []Ticker{processCache}
reporters = []Reporter{endpointReporter, host.NewReporter(hostID, hostName, localNets), process.NewReporter(processCache, hostID)}
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID, probeID)}
p := probe.New(*spyInterval, *publishInterval, publishers)
p.AddTicker(processCache)
p.AddReporter(
endpointReporter,
host.NewReporter(hostID, hostName, localNets),
process.NewReporter(processCache, hostID),
)
p.AddTagger(probe.NewTopologyTagger(), host.NewTagger(hostID, probeID))
dockerTagger, dockerReporter, dockerRegistry := func() (*docker.Tagger, *docker.Reporter, docker.Registry) {
if !*dockerEnabled {
return nil, nil, nil
}
if *dockerEnabled {
if err := report.AddLocalBridge(*dockerBridge); err != nil {
log.Printf("Docker: problem with bridge %s: %v", *dockerBridge, err)
return nil, nil, nil
}
registry, err := docker.NewRegistry(*dockerInterval)
if err != nil {
if registry, err := docker.NewRegistry(*dockerInterval); err == nil {
defer registry.Stop()
p.AddTagger(docker.NewTagger(registry, processCache))
p.AddReporter(docker.NewReporter(registry, hostID))
} else {
log.Printf("Docker: failed to start registry: %v", err)
return nil, nil, nil
}
return docker.NewTagger(registry, processCache), docker.NewReporter(registry, hostID), registry
}()
if dockerTagger != nil {
taggers = append(taggers, dockerTagger)
}
if dockerReporter != nil {
reporters = append(reporters, dockerReporter)
}
if dockerRegistry != nil {
defer dockerRegistry.Stop()
}
if *kubernetesEnabled {
if client, err := kubernetes.NewClient(*kubernetesAPI, *kubernetesInterval); err == nil {
defer client.Stop()
reporters = append(reporters, kubernetes.NewReporter(client))
p.AddReporter(kubernetes.NewReporter(client))
} else {
log.Printf("Kubernetes: failed to start client: %v", err)
}
@@ -162,127 +151,27 @@ func main() {
if *weaveRouterAddr != "" {
weave := overlay.NewWeave(hostID, *weaveRouterAddr)
tickers = append(tickers, weave)
taggers = append(taggers, weave)
reporters = append(reporters, weave)
p.AddTicker(weave)
p.AddTagger(weave)
p.AddReporter(weave)
}
if *httpListen != "" {
go func() {
log.Printf("Profiling data being exported to %s", *httpListen)
log.Printf("go tool pprof http://%s/debug/pprof/{profile,heap,block}", *httpListen)
if *prometheusEndpoint != "" {
log.Printf("exposing Prometheus endpoint at %s%s", *httpListen, *prometheusEndpoint)
http.Handle(*prometheusEndpoint, makePrometheusHandler())
}
log.Printf("Profiling endpoint %s terminated: %v", *httpListen, http.ListenAndServe(*httpListen, nil))
}()
}
quit, done := make(chan struct{}), sync.WaitGroup{}
done.Add(2)
defer func() { done.Wait() }() // second, wait for the main loops to be killed
defer close(quit) // first, kill the main loops
var rpt syncReport
rpt.swap(report.MakeReport())
go func() {
defer done.Done()
spyTick := time.Tick(*spyInterval)
for {
select {
case <-spyTick:
start := time.Now()
for _, ticker := range tickers {
if err := ticker.Tick(); err != nil {
log.Printf("error doing ticker: %v", err)
}
}
localReport := rpt.copy()
localReport = localReport.Merge(doReport(reporters))
localReport = Apply(localReport, taggers)
rpt.swap(localReport)
if took := time.Since(start); took > *spyInterval {
log.Printf("report generation took too long (%s)", took)
}
case <-quit:
return
}
}
}()
go func() {
defer done.Done()
var (
pubTick = time.Tick(*publishInterval)
p = xfer.NewReportPublisher(publishers)
)
for {
select {
case <-pubTick:
publishTicks.WithLabelValues().Add(1)
localReport := rpt.swap(report.MakeReport())
localReport.Window = *publishInterval
if err := p.Publish(localReport); err != nil {
log.Printf("publish: %v", err)
}
case <-quit:
return
}
}
}()
p.Start()
defer p.Stop()
log.Printf("%s", <-interrupt())
}
func doReport(reporters []Reporter) report.Report {
reports := make(chan report.Report, len(reporters))
for _, rep := range reporters {
go func(rep Reporter) {
newReport, err := rep.Report()
if err != nil {
log.Printf("error generating report: %v", err)
newReport = report.MakeReport() // empty is OK to merge
}
reports <- newReport
}(rep)
}
result := report.MakeReport()
for i := 0; i < cap(reports); i++ {
result = result.Merge(<-reports)
}
return result
}
func interrupt() <-chan os.Signal {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
return c
}
type syncReport struct {
mtx sync.RWMutex
rpt report.Report
}
func (r *syncReport) swap(other report.Report) report.Report {
r.mtx.Lock()
defer r.mtx.Unlock()
old := r.rpt
r.rpt = other
return old
}
func (r *syncReport) copy() report.Report {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.rpt.Copy()
}

View File

@@ -1,4 +1,4 @@
package main
package xfer
import (
"log"
@@ -6,8 +6,6 @@ import (
"strconv"
"strings"
"time"
"github.com/weaveworks/scope/xfer"
)
const (
@@ -21,6 +19,11 @@ var (
type setter func(string, []string)
// Resolver is a thing that can be stopped...
type Resolver interface {
Stop()
}
type staticResolver struct {
setters []setter
targets []target
@@ -31,10 +34,10 @@ type target struct{ host, port string }
func (t target) String() string { return net.JoinHostPort(t.host, t.port) }
// newStaticResolver periodically resolves the targets, and calls the set
// NewStaticResolver periodically resolves the targets, and calls the set
// function with all the resolved IPs. It explictiy supports targets which
// resolve to multiple IPs.
func newStaticResolver(targets []string, setters ...setter) staticResolver {
func NewStaticResolver(targets []string, setters ...setter) Resolver {
r := staticResolver{
targets: prepare(targets),
setters: setters,
@@ -73,7 +76,7 @@ func prepare(strs []string) []target {
continue
}
} else {
host, port = s, strconv.Itoa(xfer.AppPort)
host, port = s, strconv.Itoa(AppPort)
}
targets = append(targets, target{host, port})
}

View File

@@ -1,4 +1,4 @@
package main
package xfer
import (
"fmt"
@@ -7,8 +7,6 @@ import (
"sync"
"testing"
"time"
"github.com/weaveworks/scope/xfer"
)
func TestResolver(t *testing.T) {
@@ -46,7 +44,7 @@ func TestResolver(t *testing.T) {
}
}
r := newStaticResolver([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}, set)
r := NewStaticResolver([]string{"symbolic.name" + port, "namewithnoport", ip1 + port, ip2}, set)
assertAdd := func(want ...string) {
remaining := map[string]struct{}{}
@@ -70,22 +68,22 @@ func TestResolver(t *testing.T) {
}
// Initial resolve should just give us IPs
assertAdd(ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort))
assertAdd(ip1+port, fmt.Sprintf("%s:%d", ip2, AppPort))
// Trigger another resolve with a tick; again,
// just want ips.
c <- time.Now()
assertAdd(ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort))
assertAdd(ip1+port, fmt.Sprintf("%s:%d", ip2, AppPort))
ip3 := "1.2.3.4"
updateIPs("symbolic.name", makeIPs(ip3))
c <- time.Now() // trigger a resolve
assertAdd(ip3+port, ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort))
assertAdd(ip3+port, ip1+port, fmt.Sprintf("%s:%d", ip2, AppPort))
ip4 := "10.10.10.10"
updateIPs("symbolic.name", makeIPs(ip3, ip4))
c <- time.Now() // trigger another resolve, this time with 2 adds
assertAdd(ip3+port, ip4+port, ip1+port, fmt.Sprintf("%s:%d", ip2, xfer.AppPort))
assertAdd(ip3+port, ip4+port, ip1+port, fmt.Sprintf("%s:%d", ip2, AppPort))
done := make(chan struct{})
go func() { r.Stop(); close(done) }()