From 9fccb2126ab238f154f323902aaa28d5b8625878 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 3 Jun 2015 10:30:16 +0000 Subject: [PATCH] Gracefully shutdown goroutines in the probe. --- probe/docker_process_mapper.go | 41 +++++++++++++++++++++++++--------- probe/main.go | 10 ++++++++- probe/process_mapper.go | 16 +++++++++++-- 3 files changed, 54 insertions(+), 13 deletions(-) diff --git a/probe/docker_process_mapper.go b/probe/docker_process_mapper.go index faffbcb4d..668793232 100644 --- a/probe/docker_process_mapper.go +++ b/probe/docker_process_mapper.go @@ -17,6 +17,8 @@ const ( type dockerMapper struct { sync.RWMutex + quit chan struct{} + interval time.Duration containers map[string]*docker.Container containersByPID map[int]*docker.Container @@ -24,8 +26,6 @@ type dockerMapper struct { procRoot string pidTree *pidTree - - interval time.Duration } func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, error) { @@ -43,16 +43,33 @@ func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, er pidTree: pidTree, interval: interval, + quit: make(chan struct{}), } go m.loop() return &m, nil } +func (m *dockerMapper) Stop() { + close(m.quit) +} + func (m *dockerMapper) loop() { - m.update() - for range time.Tick(m.interval) { - m.update() + if !m.update() { + return + } + + ticker := time.Tick(m.interval) + for { + select { + case <-ticker: + if !m.update() { + return + } + + case <-m.quit: + return + } } } @@ -74,18 +91,19 @@ var ( newPIDTreeStub = newPIDTree ) -func (m *dockerMapper) update() { +// returns false when stopping. +func (m *dockerMapper) update() bool { endpoint := "unix:///var/run/docker.sock" client, err := newDockerClient(endpoint) if err != nil { log.Printf("docker mapper: %s", err) - return + return true } events := make(chan *docker.APIEvents) if err := client.AddEventListener(events); err != nil { log.Printf("docker mapper: %s", err) - return + return true } defer func() { if err := client.RemoveEventListener(events); err != nil { @@ -95,12 +113,12 @@ func (m *dockerMapper) update() { if err := m.updateContainers(client); err != nil { log.Printf("docker mapper: %s", err) - return + return true } if err := m.updateImages(client); err != nil { log.Printf("docker mapper: %s", err) - return + return true } otherUpdates := time.Tick(m.interval) @@ -119,6 +137,9 @@ func (m *dockerMapper) update() { log.Printf("docker mapper: %s", err) continue } + + case <-m.quit: + return false } } } diff --git a/probe/main.go b/probe/main.go index 79d1ee3d7..ce7fb2733 100644 --- a/probe/main.go +++ b/probe/main.go @@ -71,7 +71,9 @@ func main() { if *cgroupsRoot != "" { if fi, err := os.Stat(*cgroupsRoot); err == nil && fi.IsDir() { log.Printf("enriching -processes with cgroup names from %s", *cgroupsRoot) - pms = append(pms, newCgroupMapper(*cgroupsRoot, *cgroupsInterval)) + cgroupMapper := newCgroupMapper(*cgroupsRoot, *cgroupsInterval) + defer cgroupMapper.Stop() + pms = append(pms, cgroupMapper) } else { log.Printf("-cgroups.root=%s: %v", *cgroupsRoot, err) } @@ -82,6 +84,7 @@ func main() { if err != nil { log.Fatal(err) } + defer docker.Stop() pms = append(pms, docker.idMapper(), @@ -93,6 +96,8 @@ func main() { log.Printf("listening on %s", *listen) + quit := make(chan struct{}) + defer close(quit) go func() { var ( hostname = hostname() @@ -113,6 +118,9 @@ func main() { case <-spyTick: r.Merge(spy(hostname, hostname, *spyProcs, pms)) // log.Printf("merged report:\n%#v\n", r) + + case <-quit: + return } } }() diff --git a/probe/process_mapper.go b/probe/process_mapper.go index b42964487..cfb59307d 100644 --- a/probe/process_mapper.go +++ b/probe/process_mapper.go @@ -28,18 +28,24 @@ type cgroupMapper struct { sync.RWMutex root string d map[uint]string + quit chan struct{} } func newCgroupMapper(root string, interval time.Duration) *cgroupMapper { m := cgroupMapper{ root: root, d: map[uint]string{}, + quit: make(chan struct{}), } m.update() go m.loop(interval) return &m } +func (m *cgroupMapper) Stop() { + close(m.quit) +} + func (m *cgroupMapper) Key() string { return "cgroup" } // Map uses the cache to find the process name for pid. It is safe for @@ -57,8 +63,14 @@ func (m *cgroupMapper) Map(pid uint) (string, error) { } func (m *cgroupMapper) loop(d time.Duration) { - for range time.Tick(d) { - m.update() + ticker := time.Tick(d) + for { + select { + case <-ticker: + m.update() + case <-m.quit: + return + } } }