Retry conntrack, after a short delay, in a loop.

This commit is contained in:
Tom Wilkie
2015-10-20 17:04:46 +00:00
parent 1e22629448
commit 0cd3de5c7f
3 changed files with 98 additions and 24 deletions

View File

@@ -2,16 +2,16 @@ package exec
import (
"io"
"os"
"os/exec"
)
// Cmd is a hook for mocking
type Cmd interface {
StdoutPipe() (io.ReadCloser, error)
StderrPipe() (io.ReadCloser, error)
Start() error
Wait() error
Process() *os.Process
Kill() error
}
// Command is a hook for mocking
@@ -23,6 +23,6 @@ type realCmd struct {
*exec.Cmd
}
func (c *realCmd) Process() *os.Process {
return c.Cmd.Process
func (c *realCmd) Kill() error {
return c.Cmd.Process.Kill()
}

View File

@@ -9,6 +9,7 @@ import (
"os"
"strings"
"sync"
"time"
"github.com/weaveworks/scope/common/exec"
)
@@ -78,6 +79,8 @@ type conntracker struct {
activeFlows map[int64]Flow // active flows in state != TIME_WAIT
bufferedFlows []Flow // flows coming out of activeFlows spend 1 walk cycle here
existingConns bool
args []string
quit chan struct{}
}
// NewConntracker creates and starts a new Conntracter
@@ -88,8 +91,9 @@ func NewConntracker(existingConns bool, args ...string) (Conntracker, error) {
result := &conntracker{
activeFlows: map[int64]Flow{},
existingConns: existingConns,
args: args,
}
go result.run(args...)
go result.loop()
return result, nil
}
@@ -117,12 +121,45 @@ var ConntrackModulePresent = func() bool {
return false
}
// NB this is not re-entrant!
func (c *conntracker) run(args ...string) {
func (c *conntracker) loop() {
for {
c.run()
c.clearFlows()
select {
case <-time.After(time.Second):
case <-c.quit:
return
}
}
}
func (c *conntracker) clearFlows() {
c.Lock()
defer c.Unlock()
for _, f := range c.activeFlows {
c.bufferedFlows = append(c.bufferedFlows, f)
}
c.activeFlows = map[int64]Flow{}
}
func logPipe(prefix string, reader io.Reader) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
log.Println(prefix, scanner.Text())
}
if err := scanner.Err(); err != nil {
log.Println(prefix, err)
}
}
func (c *conntracker) run() {
if c.existingConns {
// Fork another conntrack, just to capture existing connections
// for which we don't get events
existingFlows, err := c.existingConnections(args...)
existingFlows, err := c.existingConnections()
if err != nil {
log.Printf("conntrack existingConnections error: %v", err)
return
@@ -132,27 +169,44 @@ func (c *conntracker) run(args ...string) {
}
}
args = append([]string{"-E", "-o", "xml", "-p", "tcp"}, args...)
args := append([]string{"-E", "-o", "xml", "-p", "tcp"}, c.args...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Printf("conntrack error: %v", err)
return
}
stderr, err := cmd.StderrPipe()
if err != nil {
log.Printf("conntrack error: %v", err)
return
}
go logPipe("conntrack stderr:", stderr)
if err := cmd.Start(); err != nil {
log.Printf("conntrack error: %v", err)
return
}
c.Lock()
c.cmd = cmd
c.Unlock()
defer func() {
if err := cmd.Wait(); err != nil {
log.Printf("conntrack error: %v", err)
}
}()
c.Lock()
// We may have stopped in the mean time,
// so check to see if the channel is open
// under the lock.
select {
default:
case <-c.quit:
return
}
c.cmd = cmd
c.Unlock()
// Swallow the first two lines
reader := bufio.NewReader(stdout)
if line, err := reader.ReadString('\n'); err != nil {
@@ -184,8 +238,8 @@ func (c *conntracker) run(args ...string) {
}
}
func (c *conntracker) existingConnections(args ...string) ([]Flow, error) {
args = append([]string{"-L", "-o", "xml", "-p", "tcp"}, args...)
func (c *conntracker) existingConnections() ([]Flow, error) {
args := append([]string{"-L", "-o", "xml", "-p", "tcp"}, c.args...)
cmd := exec.Command("conntrack", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
@@ -212,12 +266,9 @@ func (c *conntracker) existingConnections(args ...string) ([]Flow, error) {
func (c *conntracker) Stop() {
c.Lock()
defer c.Unlock()
if c.cmd == nil {
return
}
if p := c.cmd.Process(); p != nil {
p.Kill()
close(c.quit)
if c.cmd != nil {
c.cmd.Kill()
}
}

View File

@@ -4,31 +4,39 @@ import (
"bytes"
"io"
"io/ioutil"
"os"
"github.com/weaveworks/scope/common/exec"
)
type mockCmd struct {
io.ReadCloser
quit chan struct{}
}
type blockingReader struct {
quit chan struct{}
}
// NewMockCmdString creates a new mock Cmd which has s on its stdout pipe
func NewMockCmdString(s string) exec.Cmd {
return &mockCmd{
struct {
ReadCloser: struct {
io.Reader
io.Closer
}{
bytes.NewBufferString(s),
ioutil.NopCloser(nil),
},
quit: make(chan struct{}),
}
}
// NewMockCmd creates a new mock Cmd with rc as its stdout pipe
func NewMockCmd(rc io.ReadCloser) exec.Cmd {
return &mockCmd{rc}
return &mockCmd{
ReadCloser: rc,
quit: make(chan struct{}),
}
}
func (c *mockCmd) Start() error {
@@ -43,6 +51,21 @@ func (c *mockCmd) StdoutPipe() (io.ReadCloser, error) {
return c.ReadCloser, nil
}
func (c *mockCmd) Process() *os.Process {
func (c *mockCmd) StderrPipe() (io.ReadCloser, error) {
return &blockingReader{c.quit}, nil
}
func (c *mockCmd) Kill() error {
close(c.quit)
return nil
}
func (b *blockingReader) Read(p []byte) (n int, err error) {
<-b.quit
return 0, nil
}
func (b *blockingReader) Close() error {
<-b.quit
return nil
}