diff --git a/probe/proc/net.go b/probe/proc/net.go index 1d2f933f6..7e9ffbf4d 100644 --- a/probe/proc/net.go +++ b/probe/proc/net.go @@ -30,6 +30,7 @@ func (c Connection) Copy() Connection { c.LocalAddress = dupIP(c.LocalAddress) c.RemoteAddress = dupIP(c.RemoteAddress) + c.Process = c.Process.Copy() return c } diff --git a/probe/proc/reader.go b/probe/proc/reader.go index 6ef7c4a4b..5e43fd967 100644 --- a/probe/proc/reader.go +++ b/probe/proc/reader.go @@ -76,6 +76,14 @@ type Process struct { Inodes []uint64 } +// Copy returns a copy of a process +func (p Process) Copy() Process { + dup := make([]uint64, len(p.Inodes)) + copy(dup, p.Inodes) + p.Inodes = dup + return p +} + // Reader is something that reads the /proc directory and // returns some info like processes and connections type Reader interface { @@ -131,7 +139,7 @@ func (c *CachingProcReader) Close() error { return c.source.Close() } -// Update updates the cached copy of the processes and connections lists +// Tick updates the cached copy of the processes and connections lists func (c *CachingProcReader) Tick() error { newProcsCache := []Process{} newConnsCache := []Connection{} diff --git a/probe/proc/reader_darwin.go b/probe/proc/reader_darwin.go index 3e48f2730..c0546ebf7 100644 --- a/probe/proc/reader_darwin.go +++ b/probe/proc/reader_darwin.go @@ -45,7 +45,7 @@ func (reader) Processes(f func(Process)) error { return nil } -func (w *reader) Connections(withProcs bool, f func(Connection)) error { +func (r *reader) Connections(withProcs bool, f func(Connection)) error { out, err := exec.Command( netstatBinary, "-n", // no number resolving @@ -94,7 +94,7 @@ func (w *reader) Connections(withProcs bool, f func(Connection)) error { } // Close closes the Darwin "/proc" reader -func (w *reader) Close() error { +func (reader) Close() error { return nil } diff --git a/probe/proc/reader_linux.go b/probe/proc/reader_linux.go index 52dcba7bb..dcd322f60 100644 --- a/probe/proc/reader_linux.go +++ b/probe/proc/reader_linux.go @@ -23,26 +23,32 @@ var tcpFiles = []string{ } // A cache for files handles -// Note: not intended to be used from multiple goroutines type filesCache struct { handles gcache.Cache } +type filesCacheEntry struct { + sync.RWMutex + File +} + func newFilesCache(proc Dir) *filesCache { loadFunc := func(fileName interface{}) (interface{}, error) { - return proc.Open(fileName.(string)) + f, err := proc.Open(fileName.(string)) + if err != nil { + return nil, err + } + return filesCacheEntry{File: f}, nil } evictionFunc := func(key, value interface{}) { - value.(File).Close() + value.(filesCacheEntry).Close() } return &filesCache{ handles: gcache.New(filesCacheLen).LoaderFunc(loadFunc).EvictedFunc(evictionFunc).Expiration(filesCacheExpiration).ARC().Build(), } } -// Read a "/proc" file, identified as a subdir (eg "1134/comm"), into a buffer -// Note: this is not goroutine-safe: two goroutines getting and reading from -// the same handle can obtain some unexpected contents... +// Read a "/proc" file, identified as a file in a subdir (eg "1134/comm"), into a buffer func (fc *filesCache) ReadInto(filename string, buf *bytes.Buffer) error { // we could use a lock here, but this is only used from Processes()/Connections(), // and they are always invoked sequentially... @@ -50,7 +56,10 @@ func (fc *filesCache) ReadInto(filename string, buf *bytes.Buffer) error { if err != nil { return err } - return h.(File).ReadInto(buf) + handle := h.(filesCacheEntry) + handle.Lock() + defer handle.Unlock() + return handle.ReadInto(buf) } // Close closes all the handles in the cache @@ -76,27 +85,27 @@ func NewReader(proc Dir) Reader { } // Close closes the Linux "/proc" reader -func (w *reader) Close() error { - return w.handles.Close() +func (r *reader) Close() error { + return r.handles.Close() } // Processes walks the /proc directory and marshalls the files into // instances of Process, which it then passes one-by-one to the // supplied function. Processes() is only made public so that is // can be tested. -func (w *reader) Processes(f func(Process)) error { - dirEntries, err := w.proc.ReadDirNames(w.proc.Root()) +func (r *reader) Processes(f func(Process)) error { + dirEntries, err := r.proc.ReadDirNames(r.proc.Root()) if err != nil { return err } - buf := bytes.NewBuffer(make([]byte, 0, 5000)) + var fdStat syscall.Stat_t + buf := bytes.Buffer{} for _, subdir := range dirEntries { readIntoBuffer := func(filename string) error { buf.Reset() - res := w.handles.ReadInto(path.Join(subdir, filename), buf) - return res + return r.handles.ReadInto(path.Join(subdir, filename), &buf) } pid, err := strconv.Atoi(subdir) @@ -129,15 +138,14 @@ func (w *reader) Processes(f func(Process)) error { comm = strings.TrimSpace(buf.String()) } - fdBase := path.Join(w.proc.Root(), strconv.Itoa(pid), "fd") - fdNames, err := w.proc.ReadDirNames(fdBase) + fdBase := path.Join(r.proc.Root(), strconv.Itoa(pid), "fd") + fdNames, err := r.proc.ReadDirNames(fdBase) if err != nil { return err } inodes := []uint64{} for _, fdName := range fdNames { - var fdStat syscall.Stat_t // Direct use of syscall.Stat() to save garbage. fdPath := path.Join(fdBase, fdName) err = syscall.Stat(fdPath, &fdStat) @@ -159,30 +167,21 @@ func (w *reader) Processes(f func(Process)) error { return nil } -var bufPool = sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(make([]byte, 0, 5000)) - }, -} - // Connections walks through all the connections in the "/proc" -func (w *reader) Connections(withProcs bool, f func(Connection)) error { +func (r *reader) Connections(withProcs bool, f func(Connection)) error { // create a map of inode->Process procs := make(map[uint64]Process) if withProcs { - w.Processes(func(p Process) { + r.Processes(func(p Process) { for _, inode := range p.Inodes { procs[inode] = p } }) } - buf := bufPool.Get().(*bytes.Buffer) - buf.Reset() - defer bufPool.Put(buf) - + buf := bytes.Buffer{} for _, tcpFile := range tcpFiles { - err := w.handles.ReadInto(tcpFile, buf) + err := r.handles.ReadInto(tcpFile, &buf) if err != nil { return err }