Files
weave-scope/probe/main.go
Peter Bourgon 36b743fe1f probe, app: reliable shutdown
- Restructure main funcs for clean defer-stack-unwinds
- Fix Docker container to handle signals properly
- Introduce runsvinit for container init process
- Integration test
2015-09-29 11:45:12 +02:00

224 lines
6.8 KiB
Go

package main
import (
"flag"
"fmt"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/endpoint"
"github.com/weaveworks/scope/probe/host"
"github.com/weaveworks/scope/probe/overlay"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)
var version = "dev" // set at build time
func main() {
var (
targets = []string{fmt.Sprintf("localhost:%d", xfer.AppPort), fmt.Sprintf("scope.weave.local:%d", xfer.AppPort)}
token = flag.String("token", "default-token", "probe token")
httpListen = flag.String("http.listen", "", "listen address for HTTP profiling and instrumentation server")
publishInterval = flag.Duration("publish.interval", 3*time.Second, "publish (output) interval")
spyInterval = flag.Duration("spy.interval", time.Second, "spy (scan) interval")
prometheusEndpoint = flag.String("prometheus.endpoint", "/metrics", "Prometheus metrics exposition endpoint (requires -http.listen)")
spyProcs = flag.Bool("processes", true, "report processes (needs root)")
dockerEnabled = flag.Bool("docker", false, "collect Docker-related attributes for processes")
dockerInterval = flag.Duration("docker.interval", 10*time.Second, "how often to update Docker attributes")
dockerBridge = flag.String("docker.bridge", "docker0", "the docker bridge name")
weaveRouterAddr = flag.String("weave.router.addr", "", "IP address or FQDN of the Weave router")
procRoot = flag.String("proc.root", "/proc", "location of the proc filesystem")
printVersion = flag.Bool("version", false, "print version number and exit")
useConntrack = flag.Bool("conntrack", true, "also use conntrack to track connections")
logPrefix = flag.String("log.prefix", "<probe>", "prefix for each log line")
)
flag.Parse()
if *printVersion {
fmt.Println(version)
return
}
if !strings.HasSuffix(*logPrefix, " ") {
*logPrefix += " "
}
log.SetPrefix(*logPrefix)
defer log.Print("probe exiting")
if *spyProcs && os.Getegid() != 0 {
log.Printf("warning: -process=true, but that requires root to find everything")
}
var (
hostName = hostname()
hostID = hostName // TODO(pb): we should sanitize the hostname
probeID = hostName // TODO(pb): does this need to be a random string instead?
)
log.Printf("probe starting, version %s, ID %s", version, probeID)
addrs, err := net.InterfaceAddrs()
if err != nil {
log.Fatal(err)
}
localNets := report.Networks{}
for _, addr := range addrs {
// Not all addrs are IPNets.
if ipNet, ok := addr.(*net.IPNet); ok {
localNets = append(localNets, ipNet)
}
}
if len(flag.Args()) > 0 {
targets = flag.Args()
}
log.Printf("publishing to: %s", strings.Join(targets, ", "))
factory := func(endpoint string) (string, xfer.Publisher, error) {
id, publisher, err := xfer.NewHTTPPublisher(endpoint, *token, probeID)
if err != nil {
return "", nil, err
}
return id, xfer.NewBackgroundPublisher(publisher), nil
}
publishers := xfer.NewMultiPublisher(factory)
defer publishers.Stop()
resolver := newStaticResolver(targets, publishers.Set)
defer resolver.Stop()
endpointReporter := endpoint.NewReporter(hostID, hostName, *spyProcs, *useConntrack)
defer endpointReporter.Stop()
processCache := process.NewCachingWalker(process.NewWalker(*procRoot))
var (
tickers = []Ticker{processCache}
reporters = []Reporter{endpointReporter, host.NewReporter(hostID, hostName, localNets), process.NewReporter(processCache, hostID)}
taggers = []Tagger{newTopologyTagger(), host.NewTagger(hostID)}
)
dockerTagger, dockerReporter, dockerRegistry := func() (*docker.Tagger, *docker.Reporter, docker.Registry) {
if !*dockerEnabled {
return nil, nil, nil
}
if err := report.AddLocalBridge(*dockerBridge); err != nil {
log.Printf("Docker: problem with bridge %s: %v", *dockerBridge, err)
return nil, nil, nil
}
registry, err := docker.NewRegistry(*dockerInterval)
if err != nil {
log.Printf("Docker: failed to start registry: %v", err)
return nil, nil, nil
}
return docker.NewTagger(registry, processCache), docker.NewReporter(registry, hostID), registry
}()
if dockerTagger != nil {
taggers = append(taggers, dockerTagger)
}
if dockerReporter != nil {
reporters = append(reporters, dockerReporter)
}
if dockerRegistry != nil {
defer dockerRegistry.Stop()
}
if *weaveRouterAddr != "" {
weave := overlay.NewWeave(hostID, *weaveRouterAddr)
tickers = append(tickers, weave)
taggers = append(taggers, weave)
reporters = append(reporters, weave)
}
if *httpListen != "" {
go func() {
log.Printf("Profiling data being exported to %s", *httpListen)
log.Printf("go tool pprof http://%s/debug/pprof/{profile,heap,block}", *httpListen)
if *prometheusEndpoint != "" {
log.Printf("exposing Prometheus endpoint at %s%s", *httpListen, *prometheusEndpoint)
http.Handle(*prometheusEndpoint, makePrometheusHandler())
}
log.Printf("Profiling endpoint %s terminated: %v", *httpListen, http.ListenAndServe(*httpListen, nil))
}()
}
quit, done := make(chan struct{}), make(chan struct{})
defer func() { <-done }() // second, wait for the main loop to be killed
defer close(quit) // first, kill the main loop
go func() {
defer close(done)
var (
pubTick = time.Tick(*publishInterval)
spyTick = time.Tick(*spyInterval)
r = report.MakeReport()
p = xfer.NewReportPublisher(publishers)
)
for {
select {
case <-pubTick:
publishTicks.WithLabelValues().Add(1)
r.Window = *publishInterval
if err := p.Publish(r); err != nil {
log.Printf("publish: %v", err)
}
r = report.MakeReport()
case <-spyTick:
start := time.Now()
for _, ticker := range tickers {
if err := ticker.Tick(); err != nil {
log.Printf("error doing ticker: %v", err)
}
}
r = r.Merge(doReport(reporters))
r = Apply(r, taggers)
if took := time.Since(start); took > *spyInterval {
log.Printf("report generation took too long (%s)", took)
}
case <-quit:
return
}
}
}()
log.Printf("%s", <-interrupt())
}
func doReport(reporters []Reporter) report.Report {
reports := make(chan report.Report, len(reporters))
for _, rep := range reporters {
go func(rep Reporter) {
newReport, err := rep.Report()
if err != nil {
log.Printf("error generating report: %v", err)
newReport = report.MakeReport() // empty is OK to merge
}
reports <- newReport
}(rep)
}
result := report.MakeReport()
for i := 0; i < cap(reports); i++ {
result = result.Merge(<-reports)
}
return result
}
func interrupt() <-chan os.Signal {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
return c
}