Merge pull request #1511 from weaveworks/flag-disable-processes

Add flag to disable reporting of processes (and procspied endpoints)
This commit is contained in:
Paul Bellamy
2016-05-18 14:39:25 +01:00
8 changed files with 90 additions and 66 deletions

View File

@@ -61,18 +61,20 @@ func init() {
// be the verb to get to that state
topologyRegistry.add(
APITopologyDesc{
id: "processes",
renderer: render.FilterUnconnected(render.ProcessWithContainerNameRenderer),
Name: "Processes",
Rank: 1,
Options: unconnectedFilter,
id: "processes",
renderer: render.FilterUnconnected(render.ProcessWithContainerNameRenderer),
Name: "Processes",
Rank: 1,
Options: unconnectedFilter,
HideIfEmpty: true,
},
APITopologyDesc{
id: "processes-by-name",
parent: "processes",
renderer: render.FilterUnconnected(render.ProcessNameRenderer),
Name: "by name",
Options: unconnectedFilter,
id: "processes-by-name",
parent: "processes",
renderer: render.FilterUnconnected(render.ProcessNameRenderer),
Name: "by name",
Options: unconnectedFilter,
HideIfEmpty: true,
},
APITopologyDesc{
id: "containers",

View File

@@ -25,14 +25,14 @@ const (
// Reporter generates Reports containing the Endpoint topology.
type Reporter struct {
hostID string
hostName string
includeProcesses bool
includeNAT bool
flowWalker flowWalker // interface
scanner procspy.ConnectionScanner
natMapper natMapper
reverseResolver *reverseResolver
hostID string
hostName string
spyProcs bool
walkProc bool
flowWalker flowWalker // interface
scanner procspy.ConnectionScanner
natMapper natMapper
reverseResolver *reverseResolver
}
// SpyDuration is an exported prometheus metric
@@ -52,15 +52,16 @@ var SpyDuration = prometheus.NewSummaryVec(
// on the host machine, at the granularity of host and port. That information
// is stored in the Endpoint topology. It optionally enriches that topology
// with process (PID) information.
func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bool, scanner procspy.ConnectionScanner) *Reporter {
func NewReporter(hostID, hostName string, spyProcs, useConntrack, walkProc bool, scanner procspy.ConnectionScanner) *Reporter {
return &Reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
flowWalker: newConntrackFlowWalker(useConntrack),
natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, "--any-nat")),
reverseResolver: newReverseResolver(),
scanner: scanner,
hostID: hostID,
hostName: hostName,
spyProcs: spyProcs,
walkProc: walkProc,
flowWalker: newConntrackFlowWalker(useConntrack),
natMapper: makeNATMapper(newConntrackFlowWalker(useConntrack, "--any-nat")),
reverseResolver: newReverseResolver(),
scanner: scanner,
}
}
@@ -135,8 +136,8 @@ func (r *Reporter) Report() (report.Report, error) {
})
}
{
conns, err := r.scanner.Connections(r.includeProcesses)
if r.walkProc {
conns, err := r.scanner.Connections(r.spyProcs)
if err != nil {
return rpt, err
}
@@ -174,10 +175,6 @@ func (r *Reporter) Report() (report.Report, error) {
}
func (r *Reporter) addConnection(rpt *report.Report, t fourTuple, extraFromNode, extraToNode map[string]string) {
// Update endpoint topology
if !r.includeProcesses {
return
}
var (
fromEndpointNodeID = report.MakeEndpointNodeID(r.hostID, t.fromAddr, strconv.Itoa(int(t.fromPort)))
toEndpointNodeID = report.MakeEndpointNodeID(r.hostID, t.toAddr, strconv.Itoa(int(t.toPort)))

View File

@@ -69,12 +69,11 @@ func TestSpyNoProcesses(t *testing.T) {
)
scanner := procspy.FixedScanner(fixConnections)
reporter := endpoint.NewReporter(nodeID, nodeName, false, false, scanner)
reporter := endpoint.NewReporter(nodeID, nodeName, false, false, false, scanner)
r, _ := reporter.Report()
//buf, _ := json.MarshalIndent(r, "", " ")
//t.Logf("\n%s\n", buf)
// No process nodes, please
if want, have := 0, len(r.Endpoint.Nodes); want != have {
t.Fatalf("want %d, have %d", want, have)
}
@@ -87,7 +86,7 @@ func TestSpyWithProcesses(t *testing.T) {
)
scanner := procspy.FixedScanner(fixConnectionsWithProcesses)
reporter := endpoint.NewReporter(nodeID, nodeName, true, false, scanner)
reporter := endpoint.NewReporter(nodeID, nodeName, true, false, true, scanner)
r, _ := reporter.Report()
// buf, _ := json.MarshalIndent(r, "", " ") ; t.Logf("\n%s\n", buf)

View File

@@ -60,16 +60,18 @@ type probeFlags struct {
httpListen string
publishInterval time.Duration
spyInterval time.Duration
spyProcs bool
procRoot string
pluginsRoot string
useConntrack bool
insecure bool
logPrefix string
logLevel string
resolver string
noApp bool
useConntrack bool // Use conntrack for endpoint topo
spyProcs bool // Associate endpoints with processes (must be root)
procEnabled bool // Produce process topology & process nodes in endpoint
procRoot string
dockerEnabled bool
dockerInterval time.Duration
dockerBridge string
@@ -130,20 +132,30 @@ func main() {
flag.StringVar(&flags.probe.httpListen, "probe.http.listen", "", "listen address for HTTP profiling and instrumentation server")
flag.DurationVar(&flags.probe.publishInterval, "probe.publish.interval", 3*time.Second, "publish (output) interval")
flag.DurationVar(&flags.probe.spyInterval, "probe.spy.interval", time.Second, "spy (scan) interval")
flag.BoolVar(&flags.probe.spyProcs, "probe.processes", true, "report processes (needs root)")
flag.StringVar(&flags.probe.procRoot, "probe.proc.root", "/proc", "location of the proc filesystem")
flag.StringVar(&flags.probe.pluginsRoot, "probe.plugins.root", "/var/run/scope/plugins", "Root directory to search for plugins")
flag.BoolVar(&flags.probe.useConntrack, "probe.conntrack", true, "also use conntrack to track connections")
flag.BoolVar(&flags.probe.insecure, "probe.insecure", false, "(SSL) explicitly allow \"insecure\" SSL connections and transfers")
flag.StringVar(&flags.probe.resolver, "probe.resolver", "", "IP address & port of resolver to use. Default is to use system resolver.")
flag.StringVar(&flags.probe.logPrefix, "probe.log.prefix", "<probe>", "prefix for each log line")
flag.StringVar(&flags.probe.logLevel, "probe.log.level", "info", "logging threshold level: debug|info|warn|error|fatal|panic")
// Proc & endpoint
flag.BoolVar(&flags.probe.useConntrack, "probe.conntrack", true, "also use conntrack to track connections")
flag.BoolVar(&flags.probe.spyProcs, "probe.proc.spy", true, "associate endpoints with processes (needs root)")
flag.StringVar(&flags.probe.procRoot, "probe.proc.root", "/proc", "location of the proc filesystem")
flag.BoolVar(&flags.probe.procEnabled, "probe.processes", true, "produce process topology & include procspied connections")
// Docker
flag.BoolVar(&flags.probe.dockerEnabled, "probe.docker", false, "collect Docker-related attributes for processes")
flag.DurationVar(&flags.probe.dockerInterval, "probe.docker.interval", 10*time.Second, "how often to update Docker attributes")
flag.StringVar(&flags.probe.dockerBridge, "probe.docker.bridge", "docker0", "the docker bridge name")
// K8s
flag.BoolVar(&flags.probe.kubernetesEnabled, "probe.kubernetes", false, "collect kubernetes-related attributes for containers, should only be enabled on the master node")
flag.StringVar(&flags.probe.kubernetesAPI, "probe.kubernetes.api", "", "Address of kubernetes master api")
flag.DurationVar(&flags.probe.kubernetesInterval, "probe.kubernetes.interval", 10*time.Second, "how often to do a full resync of the kubernetes data")
// Weave
flag.StringVar(&flags.probe.weaveAddr, "probe.weave.addr", "127.0.0.1:6784", "IP address & port of the Weave router")
flag.StringVar(&flags.probe.weaveHostname, "probe.weave.hostname", app.DefaultHostname, "Hostname to lookup in WeaveDNS")

View File

@@ -123,23 +123,26 @@ func probeMain(flags probeFlags) {
resolver := appclient.NewResolver(targets, dnsLookupFn, clients.Set)
defer resolver.Stop()
processCache := process.NewCachingWalker(process.NewWalker(flags.procRoot))
scanner := procspy.NewConnectionScanner(processCache)
endpointReporter := endpoint.NewReporter(hostID, hostName, flags.spyProcs, flags.useConntrack, scanner)
defer endpointReporter.Stop()
p := probe.New(flags.spyInterval, flags.publishInterval, clients)
p.AddTicker(processCache)
hostReporter := host.NewReporter(hostID, hostName, probeID, version, clients)
defer hostReporter.Stop()
p.AddReporter(
endpointReporter,
hostReporter,
process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies),
)
p.AddReporter(hostReporter)
p.AddTagger(probe.NewTopologyTagger(), host.NewTagger(hostID))
var processCache *process.CachingWalker
var scanner procspy.ConnectionScanner
if flags.procEnabled {
processCache = process.NewCachingWalker(process.NewWalker(flags.procRoot))
scanner = procspy.NewConnectionScanner(processCache)
p.AddTicker(processCache)
p.AddReporter(process.NewReporter(processCache, hostID, process.GetDeltaTotalJiffies))
}
endpointReporter := endpoint.NewReporter(hostID, hostName, flags.spyProcs, flags.useConntrack, flags.procEnabled, scanner)
defer endpointReporter.Stop()
p.AddReporter(endpointReporter)
if flags.dockerEnabled {
// Don't add the bridge in Kubernetes since container IPs are global and
// shouldn't be scoped
@@ -150,7 +153,9 @@ func probeMain(flags probeFlags) {
}
if registry, err := docker.NewRegistry(flags.dockerInterval, clients, true, hostID); err == nil {
defer registry.Stop()
p.AddTagger(docker.NewTagger(registry, processCache))
if flags.procEnabled {
p.AddTagger(docker.NewTagger(registry, processCache))
}
p.AddReporter(docker.NewReporter(registry, hostID, probeID, p))
} else {
log.Errorf("Docker: failed to start registry: %v", err)

View File

@@ -140,7 +140,8 @@ func MapEndpoint2IP(m report.Node, local report.Networks) report.Nodes {
return report.Nodes{}
}
if ip := net.ParseIP(addr); ip != nil && !local.Contains(ip) {
return report.Nodes{TheInternetID: theInternetNode(m)}
node := theInternetNode(m)
return report.Nodes{node.ID: node}
}
// We don't always know what port a container is listening on, and

View File

@@ -15,7 +15,7 @@ const (
)
func renderKubernetesTopologies(rpt report.Report) bool {
return len(rpt.Pod.Nodes)+len(rpt.Service.Nodes) > 1
return len(rpt.Pod.Nodes)+len(rpt.Service.Nodes)+len(rpt.Deployment.Nodes)+len(rpt.ReplicaSet.Nodes) >= 1
}
// PodRenderer is a Renderer which produces a renderable kubernetes

View File

@@ -23,18 +23,24 @@ const (
Pseudo = "pseudo"
)
func renderProcesses(rpt report.Report) bool {
return len(rpt.Process.Nodes) >= 1
}
// EndpointRenderer is a Renderer which produces a renderable endpoint graph.
var EndpointRenderer = FilterNonProcspied(SelectEndpoint)
// ProcessRenderer is a Renderer which produces a renderable process
// graph by merging the endpoint graph and the process topology.
var ProcessRenderer = ColorConnected(MakeReduce(
MakeMap(
MapEndpoint2Process,
EndpointRenderer,
),
SelectProcess,
))
var ProcessRenderer = ConditionalRenderer(renderProcesses,
ColorConnected(MakeReduce(
MakeMap(
MapEndpoint2Process,
EndpointRenderer,
),
SelectProcess,
)),
)
// processWithContainerNameRenderer is a Renderer which produces a process
// graph enriched with container names where appropriate
@@ -73,9 +79,11 @@ var ProcessWithContainerNameRenderer = processWithContainerNameRenderer{ProcessR
// ProcessNameRenderer is a Renderer which produces a renderable process
// name graph by munging the progess graph.
var ProcessNameRenderer = MakeMap(
MapProcess2Name,
ProcessRenderer,
var ProcessNameRenderer = ConditionalRenderer(renderProcesses,
MakeMap(
MapProcess2Name,
ProcessRenderer,
),
)
// MapEndpoint2Pseudo makes internet of host pesudo nodes from a endpoint node.