mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 02:00:43 +00:00
No more nil flow workers
This commit is contained in:
@@ -3,7 +3,6 @@ package endpoint
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
@@ -68,6 +67,11 @@ type flowWalker interface {
|
||||
stop()
|
||||
}
|
||||
|
||||
type nilFlowWalker struct{}
|
||||
|
||||
func (n *nilFlowWalker) stop() {}
|
||||
func (n *nilFlowWalker) walkFlows(f func(flow)) {}
|
||||
|
||||
// conntrackWalker uses the conntrack command to track network connections and
|
||||
// implement flowWalker.
|
||||
type conntrackWalker struct {
|
||||
@@ -81,9 +85,12 @@ type conntrackWalker struct {
|
||||
}
|
||||
|
||||
// newConntracker creates and starts a new conntracker.
|
||||
func newConntrackFlowWalker(existingConns bool, args ...string) (flowWalker, error) {
|
||||
func newConntrackFlowWalker(useConntrack, existingConns bool, args ...string) flowWalker {
|
||||
if !ConntrackModulePresent() {
|
||||
return nil, fmt.Errorf("No conntrack module")
|
||||
log.Printf("Not using conntrack: module not present")
|
||||
return &nilFlowWalker{}
|
||||
} else if !useConntrack {
|
||||
return &nilFlowWalker{}
|
||||
}
|
||||
result := &conntrackWalker{
|
||||
activeFlows: map[int64]flow{},
|
||||
@@ -91,7 +98,7 @@ func newConntrackFlowWalker(existingConns bool, args ...string) (flowWalker, err
|
||||
args: args,
|
||||
}
|
||||
go result.loop()
|
||||
return result, nil
|
||||
return result
|
||||
}
|
||||
|
||||
// ConntrackModulePresent returns true if the kernel has the conntrack module
|
||||
|
||||
@@ -83,11 +83,7 @@ func TestConntracker(t *testing.T) {
|
||||
return testexec.NewMockCmd(reader)
|
||||
}
|
||||
|
||||
flowWalker, err := newConntrackFlowWalker(false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
flowWalker := newConntrackFlowWalker(true, false)
|
||||
bw := bufio.NewWriter(writer)
|
||||
if _, err := bw.WriteString(xmlHeader); err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
@@ -49,9 +49,6 @@ func toMapping(f flow) *endpointMapping {
|
||||
// applyNAT duplicates Nodes in the endpoint topology of a report, based on
|
||||
// the NAT table.
|
||||
func (n natMapper) applyNAT(rpt report.Report, scope string) {
|
||||
if n.flowWalker == nil { // TODO(pb)
|
||||
return
|
||||
}
|
||||
n.flowWalker.walkFlows(func(f flow) {
|
||||
var (
|
||||
mapping = toMapping(f)
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package endpoint
|
||||
|
||||
import (
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@@ -27,7 +26,7 @@ type Reporter struct {
|
||||
includeProcesses bool
|
||||
includeNAT bool
|
||||
flowWalker flowWalker // interface
|
||||
natMapper *natMapper
|
||||
natMapper natMapper
|
||||
reverseResolver *reverseResolver
|
||||
}
|
||||
|
||||
@@ -49,42 +48,20 @@ var SpyDuration = prometheus.NewSummaryVec(
|
||||
// is stored in the Endpoint topology. It optionally enriches that topology
|
||||
// with process (PID) information.
|
||||
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool) *Reporter {
|
||||
var (
|
||||
flowWalker flowWalker
|
||||
natMapper *natMapper
|
||||
)
|
||||
if ConntrackModulePresent() { // TODO(pb)
|
||||
if useConntrack {
|
||||
var err error
|
||||
if flowWalker, err = newConntrackFlowWalker(true); err != nil {
|
||||
log.Printf("Failed to start conntracker for endpoint reporter: %v", err)
|
||||
}
|
||||
}
|
||||
if natmapperFlowWalker, err := newConntrackFlowWalker(true, "--any-nat"); err == nil {
|
||||
m := makeNATMapper(natmapperFlowWalker)
|
||||
natMapper = &m // TODO(pb): if we only ever use this as a pointer, newNATMapper
|
||||
} else {
|
||||
log.Printf("Failed to start conntracker for NAT mapper: %v", err)
|
||||
}
|
||||
}
|
||||
return &Reporter{
|
||||
hostID: hostID,
|
||||
hostName: hostName,
|
||||
includeProcesses: includeProcesses,
|
||||
flowWalker: flowWalker,
|
||||
natMapper: natMapper,
|
||||
flowWalker: newConntrackFlowWalker(useConntrack, true),
|
||||
natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, true, "--any-nat")),
|
||||
reverseResolver: newReverseResolver(),
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stop stop
|
||||
func (r *Reporter) Stop() {
|
||||
if r.flowWalker != nil { // TODO(pb): this should never be nil (implies interface)
|
||||
r.flowWalker.stop()
|
||||
}
|
||||
if r.natMapper != nil { // TODO(pb): this should never be nil (implies interface)
|
||||
r.natMapper.stop()
|
||||
}
|
||||
r.flowWalker.stop()
|
||||
r.natMapper.stop()
|
||||
r.reverseResolver.stop()
|
||||
}
|
||||
|
||||
@@ -123,7 +100,8 @@ func (r *Reporter) Report() (report.Report, error) {
|
||||
}
|
||||
}
|
||||
|
||||
if r.flowWalker != nil {
|
||||
// Consult the flowWalker for short-live connections
|
||||
{
|
||||
extraNodeInfo := report.MakeNode().WithMetadata(report.Metadata{
|
||||
Conntracked: "true",
|
||||
})
|
||||
@@ -138,10 +116,7 @@ func (r *Reporter) Report() (report.Report, error) {
|
||||
})
|
||||
}
|
||||
|
||||
if r.natMapper != nil { // TODO(pb): should never be nil
|
||||
r.natMapper.applyNAT(rpt, r.hostID)
|
||||
}
|
||||
|
||||
r.natMapper.applyNAT(rpt, r.hostID)
|
||||
return rpt, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user