Use a mutex in the /proc files cache

Some minor renames and cleanups.
This commit is contained in:
Alvaro Saurin
2015-09-14 10:31:29 +02:00
parent 026d8e9065
commit a7dc0ac206
4 changed files with 41 additions and 33 deletions

View File

@@ -30,6 +30,7 @@ func (c Connection) Copy() Connection {
c.LocalAddress = dupIP(c.LocalAddress)
c.RemoteAddress = dupIP(c.RemoteAddress)
c.Process = c.Process.Copy()
return c
}

View File

@@ -76,6 +76,14 @@ type Process struct {
Inodes []uint64
}
// Copy returns a copy of a process
func (p Process) Copy() Process {
dup := make([]uint64, len(p.Inodes))
copy(dup, p.Inodes)
p.Inodes = dup
return p
}
// Reader is something that reads the /proc directory and
// returns some info like processes and connections
type Reader interface {
@@ -131,7 +139,7 @@ func (c *CachingProcReader) Close() error {
return c.source.Close()
}
// Update updates the cached copy of the processes and connections lists
// Tick updates the cached copy of the processes and connections lists
func (c *CachingProcReader) Tick() error {
newProcsCache := []Process{}
newConnsCache := []Connection{}

View File

@@ -45,7 +45,7 @@ func (reader) Processes(f func(Process)) error {
return nil
}
func (w *reader) Connections(withProcs bool, f func(Connection)) error {
func (r *reader) Connections(withProcs bool, f func(Connection)) error {
out, err := exec.Command(
netstatBinary,
"-n", // no number resolving
@@ -94,7 +94,7 @@ func (w *reader) Connections(withProcs bool, f func(Connection)) error {
}
// Close closes the Darwin "/proc" reader
func (w *reader) Close() error {
func (reader) Close() error {
return nil
}

View File

@@ -23,26 +23,32 @@ var tcpFiles = []string{
}
// A cache for files handles
// Note: not intended to be used from multiple goroutines
type filesCache struct {
handles gcache.Cache
}
type filesCacheEntry struct {
sync.RWMutex
File
}
func newFilesCache(proc Dir) *filesCache {
loadFunc := func(fileName interface{}) (interface{}, error) {
return proc.Open(fileName.(string))
f, err := proc.Open(fileName.(string))
if err != nil {
return nil, err
}
return filesCacheEntry{File: f}, nil
}
evictionFunc := func(key, value interface{}) {
value.(File).Close()
value.(filesCacheEntry).Close()
}
return &filesCache{
handles: gcache.New(filesCacheLen).LoaderFunc(loadFunc).EvictedFunc(evictionFunc).Expiration(filesCacheExpiration).ARC().Build(),
}
}
// Read a "/proc" file, identified as a subdir (eg "1134/comm"), into a buffer
// Note: this is not goroutine-safe: two goroutines getting and reading from
// the same handle can obtain some unexpected contents...
// Read a "/proc" file, identified as a file in a subdir (eg "1134/comm"), into a buffer
func (fc *filesCache) ReadInto(filename string, buf *bytes.Buffer) error {
// we could use a lock here, but this is only used from Processes()/Connections(),
// and they are always invoked sequentially...
@@ -50,7 +56,10 @@ func (fc *filesCache) ReadInto(filename string, buf *bytes.Buffer) error {
if err != nil {
return err
}
return h.(File).ReadInto(buf)
handle := h.(filesCacheEntry)
handle.Lock()
defer handle.Unlock()
return handle.ReadInto(buf)
}
// Close closes all the handles in the cache
@@ -76,27 +85,27 @@ func NewReader(proc Dir) Reader {
}
// Close closes the Linux "/proc" reader
func (w *reader) Close() error {
return w.handles.Close()
func (r *reader) Close() error {
return r.handles.Close()
}
// Processes walks the /proc directory and marshalls the files into
// instances of Process, which it then passes one-by-one to the
// supplied function. Processes() is only made public so that is
// can be tested.
func (w *reader) Processes(f func(Process)) error {
dirEntries, err := w.proc.ReadDirNames(w.proc.Root())
func (r *reader) Processes(f func(Process)) error {
dirEntries, err := r.proc.ReadDirNames(r.proc.Root())
if err != nil {
return err
}
buf := bytes.NewBuffer(make([]byte, 0, 5000))
var fdStat syscall.Stat_t
buf := bytes.Buffer{}
for _, subdir := range dirEntries {
readIntoBuffer := func(filename string) error {
buf.Reset()
res := w.handles.ReadInto(path.Join(subdir, filename), buf)
return res
return r.handles.ReadInto(path.Join(subdir, filename), &buf)
}
pid, err := strconv.Atoi(subdir)
@@ -129,15 +138,14 @@ func (w *reader) Processes(f func(Process)) error {
comm = strings.TrimSpace(buf.String())
}
fdBase := path.Join(w.proc.Root(), strconv.Itoa(pid), "fd")
fdNames, err := w.proc.ReadDirNames(fdBase)
fdBase := path.Join(r.proc.Root(), strconv.Itoa(pid), "fd")
fdNames, err := r.proc.ReadDirNames(fdBase)
if err != nil {
return err
}
inodes := []uint64{}
for _, fdName := range fdNames {
var fdStat syscall.Stat_t
// Direct use of syscall.Stat() to save garbage.
fdPath := path.Join(fdBase, fdName)
err = syscall.Stat(fdPath, &fdStat)
@@ -159,30 +167,21 @@ func (w *reader) Processes(f func(Process)) error {
return nil
}
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 5000))
},
}
// Connections walks through all the connections in the "/proc"
func (w *reader) Connections(withProcs bool, f func(Connection)) error {
func (r *reader) Connections(withProcs bool, f func(Connection)) error {
// create a map of inode->Process
procs := make(map[uint64]Process)
if withProcs {
w.Processes(func(p Process) {
r.Processes(func(p Process) {
for _, inode := range p.Inodes {
procs[inode] = p
}
})
}
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufPool.Put(buf)
buf := bytes.Buffer{}
for _, tcpFile := range tcpFiles {
err := w.handles.ReadInto(tcpFile, buf)
err := r.handles.ReadInto(tcpFile, &buf)
if err != nil {
return err
}