Review feedback.

This commit is contained in:
Tom Wilkie
2015-11-11 11:42:28 +00:00
parent 47ef1e02b3
commit c610bf0ea1
3 changed files with 16 additions and 21 deletions

View File

@@ -25,11 +25,13 @@ type Reporter struct {
// NewReporter makes a new Reporter
func NewReporter(registry Registry, hostID string, probe *probe.Probe) *Reporter {
return &Reporter{
reporter := &Reporter{
registry: registry,
hostID: hostID,
probe: probe,
}
registry.WatchContainerUpdates(reporter.ContainerUpdated)
return reporter
}
// ContainerUpdated should be called whenever a container is updated.

View File

@@ -16,7 +16,7 @@ const (
// Probe sits there, generating and publishing reports.
type Probe struct {
spyInterval, publishInterval time.Duration
publisher xfer.Publisher
publisher *xfer.ReportPublisher
tickers []Ticker
reporters []Reporter
@@ -51,7 +51,7 @@ func New(spyInterval, publishInterval time.Duration, publisher xfer.Publisher) *
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: publisher,
publisher: xfer.NewReportPublisher(publisher),
quit: make(chan struct{}),
spiedReports: make(chan report.Report, reportBufferSize),
shortcutReports: make(chan report.Report, reportBufferSize),
@@ -152,37 +152,33 @@ func (p *Probe) tag(r report.Report) report.Report {
return r
}
func condense(rpt report.Report, rs chan report.Report) report.Report {
func (p *Probe) drainAndPublish(rpt report.Report, rs chan report.Report) {
ForLoop:
for {
select {
case r := <-rs:
rpt = rpt.Merge(r)
default:
return rpt
break ForLoop
}
}
if err := p.publisher.Publish(rpt); err != nil {
log.Printf("publish: %v", err)
}
}
func (p *Probe) publishLoop() {
defer p.done.Done()
var (
pubTick = time.Tick(p.publishInterval)
rptPub = xfer.NewReportPublisher(p.publisher)
)
pubTick := time.Tick(p.publishInterval)
for {
select {
case <-pubTick:
rpt := condense(report.MakeReport(), p.spiedReports)
if err := rptPub.Publish(rpt); err != nil {
log.Printf("publish: %v", err)
}
p.drainAndPublish(report.MakeReport(), p.spiedReports)
case rpt := <-p.shortcutReports:
rpt = condense(rpt, p.shortcutReports)
if err := rptPub.Publish(rpt); err != nil {
log.Printf("publish: %v", err)
}
p.drainAndPublish(rpt, p.shortcutReports)
case <-p.quit:
return

View File

@@ -134,10 +134,7 @@ func main() {
if registry, err := docker.NewRegistry(*dockerInterval); err == nil {
defer registry.Stop()
p.AddTagger(docker.NewTagger(registry, processCache))
reporter := docker.NewReporter(registry, hostID, p)
registry.WatchContainerUpdates(reporter.ContainerUpdated)
p.AddReporter(reporter)
p.AddReporter(docker.NewReporter(registry, hostID, p))
} else {
log.Printf("Docker: failed to start registry: %v", err)
}