From 0cd3de5c7ff65a65c6893e860d8a2f03b19450f0 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 20 Oct 2015 17:04:46 +0000 Subject: [PATCH] Retry conntrack, after a short delay, in a loop. --- common/exec/exec.go | 8 ++-- probe/endpoint/conntrack.go | 83 ++++++++++++++++++++++++++++++------- test/exec/exec.go | 31 ++++++++++++-- 3 files changed, 98 insertions(+), 24 deletions(-) diff --git a/common/exec/exec.go b/common/exec/exec.go index 00b9e68f0..651457862 100644 --- a/common/exec/exec.go +++ b/common/exec/exec.go @@ -2,16 +2,16 @@ package exec import ( "io" - "os" "os/exec" ) // Cmd is a hook for mocking type Cmd interface { StdoutPipe() (io.ReadCloser, error) + StderrPipe() (io.ReadCloser, error) Start() error Wait() error - Process() *os.Process + Kill() error } // Command is a hook for mocking @@ -23,6 +23,6 @@ type realCmd struct { *exec.Cmd } -func (c *realCmd) Process() *os.Process { - return c.Cmd.Process +func (c *realCmd) Kill() error { + return c.Cmd.Process.Kill() } diff --git a/probe/endpoint/conntrack.go b/probe/endpoint/conntrack.go index d3bc9e679..45c7a92e7 100644 --- a/probe/endpoint/conntrack.go +++ b/probe/endpoint/conntrack.go @@ -9,6 +9,7 @@ import ( "os" "strings" "sync" + "time" "github.com/weaveworks/scope/common/exec" ) @@ -78,6 +79,8 @@ type conntracker struct { activeFlows map[int64]Flow // active flows in state != TIME_WAIT bufferedFlows []Flow // flows coming out of activeFlows spend 1 walk cycle here existingConns bool + args []string + quit chan struct{} } // NewConntracker creates and starts a new Conntracter @@ -88,8 +91,9 @@ func NewConntracker(existingConns bool, args ...string) (Conntracker, error) { result := &conntracker{ activeFlows: map[int64]Flow{}, existingConns: existingConns, + args: args, } - go result.run(args...) + go result.loop() return result, nil } @@ -117,12 +121,45 @@ var ConntrackModulePresent = func() bool { return false } -// NB this is not re-entrant! -func (c *conntracker) run(args ...string) { +func (c *conntracker) loop() { + for { + c.run() + c.clearFlows() + + select { + case <-time.After(time.Second): + case <-c.quit: + return + } + } +} + +func (c *conntracker) clearFlows() { + c.Lock() + defer c.Unlock() + + for _, f := range c.activeFlows { + c.bufferedFlows = append(c.bufferedFlows, f) + } + + c.activeFlows = map[int64]Flow{} +} + +func logPipe(prefix string, reader io.Reader) { + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + log.Println(prefix, scanner.Text()) + } + if err := scanner.Err(); err != nil { + log.Println(prefix, err) + } +} + +func (c *conntracker) run() { if c.existingConns { // Fork another conntrack, just to capture existing connections // for which we don't get events - existingFlows, err := c.existingConnections(args...) + existingFlows, err := c.existingConnections() if err != nil { log.Printf("conntrack existingConnections error: %v", err) return @@ -132,27 +169,44 @@ func (c *conntracker) run(args ...string) { } } - args = append([]string{"-E", "-o", "xml", "-p", "tcp"}, args...) + args := append([]string{"-E", "-o", "xml", "-p", "tcp"}, c.args...) cmd := exec.Command("conntrack", args...) stdout, err := cmd.StdoutPipe() if err != nil { log.Printf("conntrack error: %v", err) return } + + stderr, err := cmd.StderrPipe() + if err != nil { + log.Printf("conntrack error: %v", err) + return + } + go logPipe("conntrack stderr:", stderr) + if err := cmd.Start(); err != nil { log.Printf("conntrack error: %v", err) return } - c.Lock() - c.cmd = cmd - c.Unlock() defer func() { if err := cmd.Wait(); err != nil { log.Printf("conntrack error: %v", err) } }() + c.Lock() + // We may have stopped in the mean time, + // so check to see if the channel is open + // under the lock. + select { + default: + case <-c.quit: + return + } + c.cmd = cmd + c.Unlock() + // Swallow the first two lines reader := bufio.NewReader(stdout) if line, err := reader.ReadString('\n'); err != nil { @@ -184,8 +238,8 @@ func (c *conntracker) run(args ...string) { } } -func (c *conntracker) existingConnections(args ...string) ([]Flow, error) { - args = append([]string{"-L", "-o", "xml", "-p", "tcp"}, args...) +func (c *conntracker) existingConnections() ([]Flow, error) { + args := append([]string{"-L", "-o", "xml", "-p", "tcp"}, c.args...) cmd := exec.Command("conntrack", args...) stdout, err := cmd.StdoutPipe() if err != nil { @@ -212,12 +266,9 @@ func (c *conntracker) existingConnections(args ...string) ([]Flow, error) { func (c *conntracker) Stop() { c.Lock() defer c.Unlock() - if c.cmd == nil { - return - } - - if p := c.cmd.Process(); p != nil { - p.Kill() + close(c.quit) + if c.cmd != nil { + c.cmd.Kill() } } diff --git a/test/exec/exec.go b/test/exec/exec.go index 61f47d36a..a1464bf13 100644 --- a/test/exec/exec.go +++ b/test/exec/exec.go @@ -4,31 +4,39 @@ import ( "bytes" "io" "io/ioutil" - "os" "github.com/weaveworks/scope/common/exec" ) type mockCmd struct { io.ReadCloser + quit chan struct{} +} + +type blockingReader struct { + quit chan struct{} } // NewMockCmdString creates a new mock Cmd which has s on its stdout pipe func NewMockCmdString(s string) exec.Cmd { return &mockCmd{ - struct { + ReadCloser: struct { io.Reader io.Closer }{ bytes.NewBufferString(s), ioutil.NopCloser(nil), }, + quit: make(chan struct{}), } } // NewMockCmd creates a new mock Cmd with rc as its stdout pipe func NewMockCmd(rc io.ReadCloser) exec.Cmd { - return &mockCmd{rc} + return &mockCmd{ + ReadCloser: rc, + quit: make(chan struct{}), + } } func (c *mockCmd) Start() error { @@ -43,6 +51,21 @@ func (c *mockCmd) StdoutPipe() (io.ReadCloser, error) { return c.ReadCloser, nil } -func (c *mockCmd) Process() *os.Process { +func (c *mockCmd) StderrPipe() (io.ReadCloser, error) { + return &blockingReader{c.quit}, nil +} + +func (c *mockCmd) Kill() error { + close(c.quit) + return nil +} + +func (b *blockingReader) Read(p []byte) (n int, err error) { + <-b.quit + return 0, nil +} + +func (b *blockingReader) Close() error { + <-b.quit return nil }