Merge pull request #1833 from weaveworks/simplify-container-stats

use go-dockerclient's Client.Stats

Fixes #1799
This commit is contained in:
Matthias Radestock
2016-08-26 13:07:16 +01:00
committed by GitHub
4 changed files with 59 additions and 125 deletions

View File

@@ -1,13 +1,9 @@
package docker
import (
"bufio"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"sync"
@@ -15,7 +11,6 @@ import (
log "github.com/Sirupsen/logrus"
docker "github.com/fsouza/go-dockerclient"
"github.com/ugorji/go/codec"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/report"
@@ -77,20 +72,9 @@ var (
StateDeleted = "deleted"
)
// Exported for testing
var (
DialStub = net.Dial
NewClientConnStub = newClientConn
)
func newClientConn(c net.Conn, r *bufio.Reader) ClientConn {
return httputil.NewClientConn(c, r)
}
// ClientConn is exported for testing
type ClientConn interface {
Do(req *http.Request) (resp *http.Response, err error)
Close() error
// StatsGatherer gathers container stats
type StatsGatherer interface {
Stats(docker.StatsOptions) error
}
// Container represents a Docker container
@@ -106,7 +90,7 @@ type Container interface {
StateString() string
HasTTY() bool
Container() *docker.Container
StartGatheringStats() error
StartGatheringStats(StatsGatherer) error
StopGatheringStats()
NetworkMode() (string, bool)
NetworkInfo([]net.IP) report.Sets
@@ -115,7 +99,7 @@ type Container interface {
type container struct {
sync.RWMutex
container *docker.Container
statsConn ClientConn
stopStats chan<- bool
latestStats docker.Stats
pendingStats [60]docker.Stats
numPending int
@@ -176,83 +160,50 @@ func (c *container) Container() *docker.Container {
return c.container
}
func (c *container) StartGatheringStats() error {
func (c *container) StartGatheringStats(client StatsGatherer) error {
c.Lock()
defer c.Unlock()
if c.statsConn != nil {
if c.stopStats != nil {
return nil
}
done := make(chan bool)
c.stopStats = done
stats := make(chan *docker.Stats)
opts := docker.StatsOptions{
ID: c.container.ID,
Stats: stats,
Stream: true,
Done: done,
}
log.Debugf("docker container: collecting stats for %s", c.container.ID)
go func() {
log.Debugf("docker container: collecting stats for %s", c.container.ID)
req, err := http.NewRequest("GET", fmt.Sprintf("/containers/%s/stats", c.container.ID), nil)
if err != nil {
log.Errorf("docker container: %v", err)
return
if err := client.Stats(opts); err != nil && err != io.EOF && err != io.ErrClosedPipe {
log.Errorf("docker container: error collecting stats for %s: %v", c.container.ID, err)
}
req.Header.Set("User-Agent", "weavescope")
}()
url, err := url.Parse(endpoint)
if err != nil {
log.Errorf("docker container: %v", err)
return
}
dial, err := DialStub(url.Scheme, url.Path)
if err != nil {
log.Errorf("docker container: %v", err)
return
}
conn := NewClientConnStub(dial, nil)
resp, err := conn.Do(req)
if err != nil {
log.Errorf("docker container: %v", err)
return
}
defer resp.Body.Close()
c.Lock()
c.statsConn = conn
c.Unlock()
defer func() {
c.Lock()
defer c.Unlock()
log.Debugf("docker container: stopped collecting stats for %s", c.container.ID)
c.statsConn = nil
}()
// Use a buffer since the codec library doesn't implicitly do it
bufReader := bufio.NewReader(resp.Body)
decoder := codec.NewDecoder(bufReader, &codec.JsonHandle{})
for {
var stats docker.Stats
if err := decoder.Decode(&stats); err != nil {
if err == io.EOF {
break
}
// Unfortunately we typically get a different error
// than io.EOF. Yes, this is really the best we can do
// in go - https://github.com/golang/go/issues/4373
if opErr, ok := err.(*net.OpError); ok && opErr.Err.Error() == "use of closed network connection" {
break
}
log.Errorf("docker container: error reading event for %s: %v", c.container.ID, err)
break
}
go func() {
for s := range stats {
c.Lock()
if c.numPending >= len(c.pendingStats) {
log.Warnf("docker container: dropping stats for %s", c.container.ID)
} else {
c.latestStats = stats
c.pendingStats[c.numPending] = stats
c.latestStats = *s
c.pendingStats[c.numPending] = *s
c.numPending++
}
c.Unlock()
}
log.Debugf("docker container: stopped collecting stats for %s", c.container.ID)
c.Lock()
if c.stopStats == done {
c.stopStats = nil
}
c.Unlock()
}()
return nil
@@ -261,14 +212,10 @@ func (c *container) StartGatheringStats() error {
func (c *container) StopGatheringStats() {
c.Lock()
defer c.Unlock()
if c.statsConn == nil {
return
if c.stopStats != nil {
close(c.stopStats)
c.stopStats = nil
}
c.statsConn.Close()
c.statsConn = nil
return
}
func (c *container) ports(localAddrs []net.IP) report.StringSet {

View File

@@ -1,17 +1,11 @@
package docker_test
import (
"bufio"
"io"
"io/ioutil"
"net"
"net/http"
"testing"
"time"
log "github.com/Sirupsen/logrus"
client "github.com/fsouza/go-dockerclient"
"github.com/ugorji/go/codec"
"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/probe/docker"
@@ -20,44 +14,35 @@ import (
"github.com/weaveworks/scope/test/reflect"
)
type mockConnection struct {
reader *io.PipeReader
type mockStatsGatherer struct {
opts client.StatsOptions
ready chan bool
}
func (c *mockConnection) Do(req *http.Request) (resp *http.Response, err error) {
return &http.Response{
Body: c.reader,
}, nil
func NewMockStatsGatherer() *mockStatsGatherer {
return &mockStatsGatherer{ready: make(chan bool)}
}
func (c *mockConnection) Close() error {
return c.reader.Close()
func (s *mockStatsGatherer) Stats(opts client.StatsOptions) error {
s.opts = opts
close(s.ready)
return nil
}
func (s *mockStatsGatherer) Send(stats *client.Stats) {
<-s.ready
s.opts.Stats <- stats
}
func TestContainer(t *testing.T) {
log.SetOutput(ioutil.Discard)
oldDialStub, oldNewClientConnStub := docker.DialStub, docker.NewClientConnStub
defer func() { docker.DialStub, docker.NewClientConnStub = oldDialStub, oldNewClientConnStub }()
docker.DialStub = func(network, address string) (net.Conn, error) {
return nil, nil
}
reader, writer := io.Pipe()
connection := &mockConnection{reader}
docker.NewClientConnStub = func(c net.Conn, r *bufio.Reader) docker.ClientConn {
return connection
}
now := time.Unix(12345, 67890).UTC()
mtime.NowForce(now)
defer mtime.NowReset()
const hostID = "scope"
c := docker.NewContainer(container1, hostID)
err := c.StartGatheringStats()
s := NewMockStatsGatherer()
err := c.StartGatheringStats(s)
if err != nil {
t.Errorf("%v", err)
}
@@ -68,10 +53,7 @@ func TestContainer(t *testing.T) {
stats.Read = now
stats.MemoryStats.Usage = 12345
stats.MemoryStats.Limit = 45678
encoder := codec.NewEncoder(writer, &codec.JsonHandle{})
if err = encoder.Encode(&stats); err != nil {
t.Error(err)
}
s.Send(stats)
// Now see if we go them
{

View File

@@ -85,6 +85,7 @@ type Client interface {
AttachToContainerNonBlocking(docker_client.AttachToContainerOptions) (docker_client.CloseWaiter, error)
CreateExec(docker_client.CreateExecOptions) (*docker_client.Exec, error)
StartExecNonBlocking(string, docker_client.StartExecOptions) (docker_client.CloseWaiter, error)
Stats(docker_client.StatsOptions) error
}
func newDockerClient(endpoint string) (Client, error) {
@@ -361,7 +362,7 @@ func (r *registry) updateContainerState(containerID string, intendedState *strin
// And finally, ensure we gather stats for it
if r.collectStats {
if dockerContainer.State.Running {
if err := c.StartGatheringStats(); err != nil {
if err := c.StartGatheringStats(r.client); err != nil {
log.Errorf("Error gathering stats for container %s: %s", containerID, err)
return
}

View File

@@ -55,7 +55,7 @@ func (c *mockContainer) StateString() string {
return docker.StateRunning
}
func (c *mockContainer) StartGatheringStats() error {
func (c *mockContainer) StartGatheringStats(docker.StatsGatherer) error {
return nil
}
@@ -163,6 +163,10 @@ func (m *mockDockerClient) RemoveContainer(_ client.RemoveContainerOptions) erro
return fmt.Errorf("remove")
}
func (m *mockDockerClient) Stats(_ client.StatsOptions) error {
return fmt.Errorf("stats")
}
type mockCloseWaiter struct{}
func (mockCloseWaiter) Close() error { return nil }