diff --git a/probe/overlay/weave.go b/probe/overlay/weave.go index 05ff51b92..67ccdb505 100644 --- a/probe/overlay/weave.go +++ b/probe/overlay/weave.go @@ -9,6 +9,7 @@ import ( "regexp" "strings" "sync" + "time" "github.com/weaveworks/scope/common/exec" "github.com/weaveworks/scope/common/sanitize" @@ -44,8 +45,11 @@ type Weave struct { url string hostID string + quit chan struct{} + done sync.WaitGroup mtx sync.RWMutex status weaveStatus + ps map[string]psEntry } type weaveStatus struct { @@ -68,9 +72,51 @@ type weaveStatus struct { // NewWeave returns a new Weave tagger based on the Weave router at // address. The address should be an IP or FQDN, no port. func NewWeave(hostID, weaveRouterAddress string) *Weave { - return &Weave{ + w := &Weave{ url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress), hostID: hostID, + quit: make(chan struct{}), + ps: map[string]psEntry{}, + } + w.done.Add(1) + go w.loop() + return w +} + +// Name of this reporter/tagger/ticker, for metrics gathering +func (*Weave) Name() string { return "Weave" } + +// Stop gathering weave ps output. +func (w *Weave) Stop() { + close(w.quit) + w.done.Wait() +} + +func (w *Weave) loop() { + defer w.done.Done() + tick := time.Tick(5 * time.Second) + + for { + psEntries, err := w.getPSEntries() + if err != nil { + log.Printf("Error running weave ps: %v", err) + break + } + + psEntriesByPrefix := map[string]psEntry{} + for _, entry := range psEntries { + psEntriesByPrefix[entry.containerIDPrefix] = entry + } + + w.mtx.Lock() + w.ps = psEntriesByPrefix + w.mtx.Unlock() + + select { + case <-w.quit: + return + case <-tick: + } } } @@ -108,7 +154,7 @@ type psEntry struct { ips []string } -func (w *Weave) ps() ([]psEntry, error) { +func (w *Weave) getPSEntries() ([]psEntry, error) { var result []psEntry cmd := exec.Command("weave", "--local", "ps") out, err := cmd.StdoutPipe() @@ -160,17 +206,11 @@ func (w *Weave) Tag(r report.Report) (report.Report, error) { } // Put information from weave ps on the container nodes - psEntries, err := w.ps() - if err != nil { - return r, nil - } - psEntriesByPrefix := map[string]psEntry{} - for _, entry := range psEntries { - psEntriesByPrefix[entry.containerIDPrefix] = entry - } + w.mtx.RLock() + defer w.mtx.RUnlock() for id, node := range r.Container.Nodes { prefix := node.Metadata[docker.ContainerID][:12] - entry, ok := psEntriesByPrefix[prefix] + entry, ok := w.ps[prefix] if !ok { continue } diff --git a/probe/overlay/weave_test.go b/probe/overlay/weave_test.go index cd73f4252..9d551f793 100644 --- a/probe/overlay/weave_test.go +++ b/probe/overlay/weave_test.go @@ -26,6 +26,7 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) { defer s.Close() w := overlay.NewWeave(mockHostID, s.URL) + defer w.Stop() w.Tick() { diff --git a/prog/probe/main.go b/prog/probe/main.go index 5eae454a2..03f28cdb3 100644 --- a/prog/probe/main.go +++ b/prog/probe/main.go @@ -159,6 +159,7 @@ func main() { if *weaveRouterAddr != "" { weave := overlay.NewWeave(hostID, *weaveRouterAddr) + defer weave.Stop() p.AddTicker(weave) p.AddTagger(weave) p.AddReporter(weave)