Gracefully shutdown goroutines in the probe.

This commit is contained in:
Tom Wilkie
2015-06-03 10:30:16 +00:00
parent 57629dcac1
commit 9fccb2126a
3 changed files with 54 additions and 13 deletions

View File

@@ -17,6 +17,8 @@ const (
type dockerMapper struct {
sync.RWMutex
quit chan struct{}
interval time.Duration
containers map[string]*docker.Container
containersByPID map[int]*docker.Container
@@ -24,8 +26,6 @@ type dockerMapper struct {
procRoot string
pidTree *pidTree
interval time.Duration
}
func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, error) {
@@ -43,16 +43,33 @@ func newDockerMapper(procRoot string, interval time.Duration) (*dockerMapper, er
pidTree: pidTree,
interval: interval,
quit: make(chan struct{}),
}
go m.loop()
return &m, nil
}
func (m *dockerMapper) Stop() {
close(m.quit)
}
func (m *dockerMapper) loop() {
m.update()
for range time.Tick(m.interval) {
m.update()
if !m.update() {
return
}
ticker := time.Tick(m.interval)
for {
select {
case <-ticker:
if !m.update() {
return
}
case <-m.quit:
return
}
}
}
@@ -74,18 +91,19 @@ var (
newPIDTreeStub = newPIDTree
)
func (m *dockerMapper) update() {
// returns false when stopping.
func (m *dockerMapper) update() bool {
endpoint := "unix:///var/run/docker.sock"
client, err := newDockerClient(endpoint)
if err != nil {
log.Printf("docker mapper: %s", err)
return
return true
}
events := make(chan *docker.APIEvents)
if err := client.AddEventListener(events); err != nil {
log.Printf("docker mapper: %s", err)
return
return true
}
defer func() {
if err := client.RemoveEventListener(events); err != nil {
@@ -95,12 +113,12 @@ func (m *dockerMapper) update() {
if err := m.updateContainers(client); err != nil {
log.Printf("docker mapper: %s", err)
return
return true
}
if err := m.updateImages(client); err != nil {
log.Printf("docker mapper: %s", err)
return
return true
}
otherUpdates := time.Tick(m.interval)
@@ -119,6 +137,9 @@ func (m *dockerMapper) update() {
log.Printf("docker mapper: %s", err)
continue
}
case <-m.quit:
return false
}
}
}

View File

@@ -71,7 +71,9 @@ func main() {
if *cgroupsRoot != "" {
if fi, err := os.Stat(*cgroupsRoot); err == nil && fi.IsDir() {
log.Printf("enriching -processes with cgroup names from %s", *cgroupsRoot)
pms = append(pms, newCgroupMapper(*cgroupsRoot, *cgroupsInterval))
cgroupMapper := newCgroupMapper(*cgroupsRoot, *cgroupsInterval)
defer cgroupMapper.Stop()
pms = append(pms, cgroupMapper)
} else {
log.Printf("-cgroups.root=%s: %v", *cgroupsRoot, err)
}
@@ -82,6 +84,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
defer docker.Stop()
pms = append(pms,
docker.idMapper(),
@@ -93,6 +96,8 @@ func main() {
log.Printf("listening on %s", *listen)
quit := make(chan struct{})
defer close(quit)
go func() {
var (
hostname = hostname()
@@ -113,6 +118,9 @@ func main() {
case <-spyTick:
r.Merge(spy(hostname, hostname, *spyProcs, pms))
// log.Printf("merged report:\n%#v\n", r)
case <-quit:
return
}
}
}()

View File

@@ -28,18 +28,24 @@ type cgroupMapper struct {
sync.RWMutex
root string
d map[uint]string
quit chan struct{}
}
func newCgroupMapper(root string, interval time.Duration) *cgroupMapper {
m := cgroupMapper{
root: root,
d: map[uint]string{},
quit: make(chan struct{}),
}
m.update()
go m.loop(interval)
return &m
}
func (m *cgroupMapper) Stop() {
close(m.quit)
}
func (m *cgroupMapper) Key() string { return "cgroup" }
// Map uses the cache to find the process name for pid. It is safe for
@@ -57,8 +63,14 @@ func (m *cgroupMapper) Map(pid uint) (string, error) {
}
func (m *cgroupMapper) loop(d time.Duration) {
for range time.Tick(d) {
m.update()
ticker := time.Tick(d)
for {
select {
case <-ticker:
m.update()
case <-m.quit:
return
}
}
}