diff --git a/probe/docker/reporter.go b/probe/docker/reporter.go index afd8e565a..6d1e088da 100644 --- a/probe/docker/reporter.go +++ b/probe/docker/reporter.go @@ -25,11 +25,13 @@ type Reporter struct { // NewReporter makes a new Reporter func NewReporter(registry Registry, hostID string, probe *probe.Probe) *Reporter { - return &Reporter{ + reporter := &Reporter{ registry: registry, hostID: hostID, probe: probe, } + registry.WatchContainerUpdates(reporter.ContainerUpdated) + return reporter } // ContainerUpdated should be called whenever a container is updated. diff --git a/probe/probe.go b/probe/probe.go index 57edba645..a0700ba2f 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -16,7 +16,7 @@ const ( // Probe sits there, generating and publishing reports. type Probe struct { spyInterval, publishInterval time.Duration - publisher xfer.Publisher + publisher *xfer.ReportPublisher tickers []Ticker reporters []Reporter @@ -51,7 +51,7 @@ func New(spyInterval, publishInterval time.Duration, publisher xfer.Publisher) * result := &Probe{ spyInterval: spyInterval, publishInterval: publishInterval, - publisher: publisher, + publisher: xfer.NewReportPublisher(publisher), quit: make(chan struct{}), spiedReports: make(chan report.Report, reportBufferSize), shortcutReports: make(chan report.Report, reportBufferSize), @@ -152,37 +152,33 @@ func (p *Probe) tag(r report.Report) report.Report { return r } -func condense(rpt report.Report, rs chan report.Report) report.Report { +func (p *Probe) drainAndPublish(rpt report.Report, rs chan report.Report) { +ForLoop: for { select { case r := <-rs: rpt = rpt.Merge(r) default: - return rpt + break ForLoop } } + + if err := p.publisher.Publish(rpt); err != nil { + log.Printf("publish: %v", err) + } } func (p *Probe) publishLoop() { defer p.done.Done() - var ( - pubTick = time.Tick(p.publishInterval) - rptPub = xfer.NewReportPublisher(p.publisher) - ) + pubTick := time.Tick(p.publishInterval) for { select { case <-pubTick: - rpt := condense(report.MakeReport(), p.spiedReports) - if err := rptPub.Publish(rpt); err != nil { - log.Printf("publish: %v", err) - } + p.drainAndPublish(report.MakeReport(), p.spiedReports) case rpt := <-p.shortcutReports: - rpt = condense(rpt, p.shortcutReports) - if err := rptPub.Publish(rpt); err != nil { - log.Printf("publish: %v", err) - } + p.drainAndPublish(rpt, p.shortcutReports) case <-p.quit: return diff --git a/prog/probe/main.go b/prog/probe/main.go index 536e0e572..6e2d24222 100644 --- a/prog/probe/main.go +++ b/prog/probe/main.go @@ -134,10 +134,7 @@ func main() { if registry, err := docker.NewRegistry(*dockerInterval); err == nil { defer registry.Stop() p.AddTagger(docker.NewTagger(registry, processCache)) - - reporter := docker.NewReporter(registry, hostID, p) - registry.WatchContainerUpdates(reporter.ContainerUpdated) - p.AddReporter(reporter) + p.AddReporter(docker.NewReporter(registry, hostID, p)) } else { log.Printf("Docker: failed to start registry: %v", err) }