mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 10:11:03 +00:00
Do a conntrack -L before -E to capture existing connections and NAT mappings.
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"bufio"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
@@ -59,21 +60,28 @@ type Flow struct {
|
||||
Original, Reply, Independent *Meta `xml:"-"`
|
||||
}
|
||||
|
||||
type conntrack struct {
|
||||
XMLName xml.Name `xml:"conntrack"`
|
||||
Flows []Flow `xml:"flow"`
|
||||
}
|
||||
|
||||
// Conntracker uses the conntrack command to track network connections
|
||||
type Conntracker struct {
|
||||
sync.Mutex
|
||||
cmd exec.Cmd
|
||||
activeFlows map[int64]Flow // active flows in state != TIME_WAIT
|
||||
bufferedFlows []Flow // flows coming out of activeFlows spend 1 walk cycle here
|
||||
existingConns bool
|
||||
}
|
||||
|
||||
// NewConntracker creates and starts a new Conntracter
|
||||
func NewConntracker(args ...string) (*Conntracker, error) {
|
||||
func NewConntracker(existingConns bool, args ...string) (*Conntracker, error) {
|
||||
if !ConntrackModulePresent() {
|
||||
return nil, fmt.Errorf("No conntrack module")
|
||||
}
|
||||
result := &Conntracker{
|
||||
activeFlows: map[int64]Flow{},
|
||||
activeFlows: map[int64]Flow{},
|
||||
existingConns: existingConns,
|
||||
}
|
||||
go result.run(args...)
|
||||
return result, nil
|
||||
@@ -105,6 +113,19 @@ var ConntrackModulePresent = func() bool {
|
||||
|
||||
// NB this is not re-entrant!
|
||||
func (c *Conntracker) run(args ...string) {
|
||||
if c.existingConns {
|
||||
// Fork another conntrack, just to capture existing connections
|
||||
// for which we don't get events
|
||||
existingFlows, err := c.existingConnections(args...)
|
||||
if err != nil {
|
||||
log.Printf("conntrack existingConnections error: %v", err)
|
||||
return
|
||||
}
|
||||
for _, flow := range existingFlows {
|
||||
c.handleFlow(flow, true)
|
||||
}
|
||||
}
|
||||
|
||||
args = append([]string{"-E", "-o", "xml", "-p", "tcp"}, args...)
|
||||
cmd := exec.Command("conntrack", args...)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
@@ -143,17 +164,47 @@ func (c *Conntracker) run(args ...string) {
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
log.Printf("contrack exiting")
|
||||
}()
|
||||
|
||||
// Now loop on the output stream
|
||||
decoder := xml.NewDecoder(reader)
|
||||
for {
|
||||
var f Flow
|
||||
if err := decoder.Decode(&f); err != nil {
|
||||
log.Printf("conntrack error: %v", err)
|
||||
return
|
||||
}
|
||||
c.handleFlow(f)
|
||||
c.handleFlow(f, false)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conntracker) existingConnections(args ...string) ([]Flow, error) {
|
||||
var conntrack conntrack
|
||||
args = append([]string{"-L", "-o", "xml", "-p", "tcp"}, args...)
|
||||
cmd := exec.Command("conntrack", args...)
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return conntrack.Flows, err
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
return conntrack.Flows, err
|
||||
}
|
||||
defer func() {
|
||||
if err := cmd.Wait(); err != nil {
|
||||
log.Printf("conntrack existingConnections exit error: %v", err)
|
||||
}
|
||||
}()
|
||||
if err := xml.NewDecoder(stdout).Decode(&conntrack); err != nil {
|
||||
if err == io.EOF {
|
||||
return conntrack.Flows, err
|
||||
}
|
||||
return conntrack.Flows, err
|
||||
}
|
||||
return conntrack.Flows, nil
|
||||
}
|
||||
|
||||
// Stop stop stop
|
||||
func (c *Conntracker) Stop() {
|
||||
c.Lock()
|
||||
@@ -167,7 +218,7 @@ func (c *Conntracker) Stop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conntracker) handleFlow(f Flow) {
|
||||
func (c *Conntracker) handleFlow(f Flow, forceAdd bool) {
|
||||
// A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this
|
||||
// host) and the 'reply' 4 tuple, which is what it has been rewritten to.
|
||||
// This code finds those metas, which are identified by a Direction
|
||||
@@ -194,15 +245,15 @@ func (c *Conntracker) handleFlow(f Flow) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
switch f.Type {
|
||||
case New, Update:
|
||||
switch {
|
||||
case forceAdd || f.Type == New || f.Type == Update:
|
||||
if f.Independent.State != TimeWait {
|
||||
c.activeFlows[f.Independent.ID] = f
|
||||
} else if _, ok := c.activeFlows[f.Independent.ID]; ok {
|
||||
delete(c.activeFlows, f.Independent.ID)
|
||||
c.bufferedFlows = append(c.bufferedFlows, f)
|
||||
}
|
||||
case Destroy:
|
||||
case f.Type == Destroy:
|
||||
if _, ok := c.activeFlows[f.Independent.ID]; ok {
|
||||
delete(c.activeFlows, f.Independent.ID)
|
||||
c.bufferedFlows = append(c.bufferedFlows, f)
|
||||
|
||||
@@ -76,7 +76,7 @@ func TestConntracker(t *testing.T) {
|
||||
return testExec.NewMockCmd(reader)
|
||||
}
|
||||
|
||||
conntracker, err := NewConntracker()
|
||||
conntracker, err := NewConntracker(false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ type natmapper struct {
|
||||
}
|
||||
|
||||
func newNATMapper() (*natmapper, error) {
|
||||
ct, err := NewConntracker("--any-nat")
|
||||
ct, err := NewConntracker(true, "--any-nat")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo
|
||||
err error
|
||||
)
|
||||
if conntrackModulePresent && useConntrack {
|
||||
conntracker, err = NewConntracker()
|
||||
conntracker, err = NewConntracker(true)
|
||||
if err != nil {
|
||||
log.Printf("Failed to start conntracker: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user