Move calls to weave ps to a background goroutine.

This commit is contained in:
Tom Wilkie
2015-11-12 17:02:28 +00:00
parent 9142325d54
commit 3e1be5cbfe
3 changed files with 53 additions and 11 deletions

View File

@@ -9,6 +9,7 @@ import (
"regexp"
"strings"
"sync"
"time"
"github.com/weaveworks/scope/common/exec"
"github.com/weaveworks/scope/common/sanitize"
@@ -44,8 +45,11 @@ type Weave struct {
url string
hostID string
quit chan struct{}
done sync.WaitGroup
mtx sync.RWMutex
status weaveStatus
ps map[string]psEntry
}
type weaveStatus struct {
@@ -68,9 +72,51 @@ type weaveStatus struct {
// NewWeave returns a new Weave tagger based on the Weave router at
// address. The address should be an IP or FQDN, no port.
func NewWeave(hostID, weaveRouterAddress string) *Weave {
return &Weave{
w := &Weave{
url: sanitize.URL("http://", 6784, "/report")(weaveRouterAddress),
hostID: hostID,
quit: make(chan struct{}),
ps: map[string]psEntry{},
}
w.done.Add(1)
go w.loop()
return w
}
// Name of this reporter/tagger/ticker, for metrics gathering
func (*Weave) Name() string { return "Weave" }
// Stop gathering weave ps output.
func (w *Weave) Stop() {
close(w.quit)
w.done.Wait()
}
func (w *Weave) loop() {
defer w.done.Done()
tick := time.Tick(5 * time.Second)
for {
psEntries, err := w.getPSEntries()
if err != nil {
log.Printf("Error running weave ps: %v", err)
break
}
psEntriesByPrefix := map[string]psEntry{}
for _, entry := range psEntries {
psEntriesByPrefix[entry.containerIDPrefix] = entry
}
w.mtx.Lock()
w.ps = psEntriesByPrefix
w.mtx.Unlock()
select {
case <-w.quit:
return
case <-tick:
}
}
}
@@ -108,7 +154,7 @@ type psEntry struct {
ips []string
}
func (w *Weave) ps() ([]psEntry, error) {
func (w *Weave) getPSEntries() ([]psEntry, error) {
var result []psEntry
cmd := exec.Command("weave", "--local", "ps")
out, err := cmd.StdoutPipe()
@@ -160,17 +206,11 @@ func (w *Weave) Tag(r report.Report) (report.Report, error) {
}
// Put information from weave ps on the container nodes
psEntries, err := w.ps()
if err != nil {
return r, nil
}
psEntriesByPrefix := map[string]psEntry{}
for _, entry := range psEntries {
psEntriesByPrefix[entry.containerIDPrefix] = entry
}
w.mtx.RLock()
defer w.mtx.RUnlock()
for id, node := range r.Container.Nodes {
prefix := node.Metadata[docker.ContainerID][:12]
entry, ok := psEntriesByPrefix[prefix]
entry, ok := w.ps[prefix]
if !ok {
continue
}

View File

@@ -26,6 +26,7 @@ func TestWeaveTaggerOverlayTopology(t *testing.T) {
defer s.Close()
w := overlay.NewWeave(mockHostID, s.URL)
defer w.Stop()
w.Tick()
{

View File

@@ -159,6 +159,7 @@ func main() {
if *weaveRouterAddr != "" {
weave := overlay.NewWeave(hostID, *weaveRouterAddr)
defer weave.Stop()
p.AddTicker(weave)
p.AddTagger(weave)
p.AddReporter(weave)