Refactoring of xfer/collector.go

- Add interface to xfer/collector.go
- Add some more tests
- quit field changed to make it consistent with other Stoppable components in Scope
- Add peek, only used for tests
This commit is contained in:
Peter Bourgon
2015-06-04 08:40:04 +00:00
committed by Tom Wilkie
parent bba5222a2b
commit 06d805db3c
4 changed files with 129 additions and 89 deletions

View File

@@ -67,7 +67,7 @@ func main() {
c := xfer.NewCollector(*batch)
defer c.Stop()
r := NewResolver(probes, c.AddAddress)
r := NewResolver(probes, c.Add)
defer r.Stop()
lifo := NewReportLIFO(c, *window)

View File

@@ -17,51 +17,62 @@ const (
var (
// MaxBackoff is the maximum time between connect retries.
MaxBackoff = 2 * time.Minute // externally configurable.
// It's exported so it's externally configurable.
MaxBackoff = 2 * time.Minute
// This is extracted out for mocking.
tick = time.Tick
)
// Collector connects to probes over TCP and merges reports published by those
// probes into a single one.
type Collector struct {
in chan report.Report
out chan report.Report
add chan string
remove chan string
quit chan chan struct{}
// Collector describes anything that can have addresses added and removed, and
// which produces reports that represent aggregate reports from all collected
// addresses.
type Collector interface {
Add(string)
Remove(string)
Reports() <-chan report.Report
Stop()
}
// NewCollector starts the report collector.
func NewCollector(batchTime time.Duration) *Collector {
c := &Collector{
// realCollector connects to probes over TCP and merges reports published by those
// probes into a single one.
type realCollector struct {
in chan report.Report
out chan report.Report
peekc chan chan report.Report
add chan string
remove chan string
quit chan struct{}
}
// NewCollector produces and returns a report collector.
func NewCollector(batchTime time.Duration) Collector {
c := &realCollector{
in: make(chan report.Report),
out: make(chan report.Report),
peekc: make(chan chan report.Report),
add: make(chan string),
remove: make(chan string),
quit: make(chan chan struct{}),
quit: make(chan struct{}),
}
go c.loop(batchTime)
return c
}
func (c *Collector) loop(batchTime time.Duration) {
func (c *realCollector) loop(batchTime time.Duration) {
var (
tick = time.Tick(batchTime)
tick = tick(batchTime)
current = report.NewReport()
addrs = map[string]chan struct{}{}
wg = &sync.WaitGroup{} // individual collector goroutines
wg = &sync.WaitGroup{} // per-address goroutines
)
add := func(ip string) {
if _, ok := addrs[ip]; ok {
return
}
addrs[ip] = make(chan struct{})
wg.Add(1)
go func(quit chan struct{}) {
defer wg.Done()
reportCollector(ip, c.in, quit)
@@ -73,7 +84,6 @@ func (c *Collector) loop(batchTime time.Duration) {
if !ok {
return // hmm
}
close(q)
delete(addrs, ip)
}
@@ -84,6 +94,9 @@ func (c *Collector) loop(batchTime time.Duration) {
c.out <- current
current = report.NewReport()
case pc := <-c.peekc:
pc <- current
case r := <-c.in:
current.Merge(r)
@@ -93,49 +106,43 @@ func (c *Collector) loop(batchTime time.Duration) {
case ip := <-c.remove:
remove(ip)
case q := <-c.quit:
case <-c.quit:
for _, q := range addrs {
close(q)
}
wg.Wait()
close(q)
return
}
}
}
// Stop shuts down a collector and all connections to probes.
func (c *Collector) Stop() {
q := make(chan struct{})
c.quit <- q
<-q
// Add adds an address to be collected from.
func (c *realCollector) Add(addr string) {
c.add <- addr
}
// AddAddress adds the passed IP to the collector, and starts (trying to)
// collect reports from the remote Publisher.
func (c *Collector) AddAddress(ip string) {
c.add <- ip
// Remove removes a previously-added address.
func (c *realCollector) Remove(addr string) {
c.remove <- addr
}
// AddAddresses adds the passed IPs to the collector, and starts (trying to)
// collect reports from the remote Publisher.
func (c *Collector) AddAddresses(ips []string) {
for _, addr := range ips {
c.AddAddress(addr)
}
}
// RemoveAddress removes the passed IP from the collector, and stops
// collecting reports from the remote Publisher.
func (c *Collector) RemoveAddress(ip string) {
c.remove <- ip
}
// Reports returns the channel where aggregate reports are sent.
func (c *Collector) Reports() <-chan report.Report {
// Reports returns the report chan. It must be consumed by the client, or the
// collector will break.
func (c *realCollector) Reports() <-chan report.Report {
return c.out
}
func (c *realCollector) peek() report.Report {
pc := make(chan report.Report)
c.peekc <- pc
return <-pc
}
// Stop terminates the collector.
func (c *realCollector) Stop() {
close(c.quit)
}
// reportCollector is the loop to connect to a single Probe. It'll keep
// running until the quit channel is closed.
func reportCollector(ip string, col chan<- report.Report, quit <-chan struct{}) {
@@ -188,7 +195,7 @@ func reportCollector(ip string, col chan<- report.Report, quit <-chan struct{})
log.Printf("decode error: %v", err)
break
}
//log.Printf("collector: got a report from %v", ip)
log.Printf("collector: got a report from %v", ip)
select {
case col <- report:

View File

@@ -1,69 +1,102 @@
package xfer_test
package xfer
import (
"bytes"
"encoding/gob"
"io/ioutil"
"log"
"net"
"runtime"
"testing"
"time"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)
func TestCollector(t *testing.T) {
log.SetOutput(ioutil.Discard)
// Build the address
port := ":12345"
addr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1"+port)
// Swap out ticker
publish := make(chan time.Time)
oldTick := tick
tick = func(time.Duration) <-chan time.Time { return publish }
defer func() { tick = oldTick }()
// Build a collector
collector := NewCollector(time.Second)
defer collector.Stop()
concreteCollector, ok := collector.(*realCollector)
if !ok {
t.Fatal("type assertion failure")
}
// Build a test publisher
reports := make(chan interface{})
ln := testPublisher(t, reports)
defer ln.Close()
// Connect the collector to the test publisher
addr := ln.Addr().String()
collector.Add(addr)
collector.Add(addr) // test duplicate case
runtime.Gosched() // make sure it connects
// Push a report through everything
reports <- report.Report{Network: report.Topology{NodeMetadatas: report.NodeMetadatas{"a": report.NodeMetadata{}}}}
poll(t, time.Millisecond, func() bool { return len(concreteCollector.peek().Network.NodeMetadatas) == 1 }, "missed the report")
go func() { publish <- time.Now() }()
if want, have := 1, len((<-collector.Reports()).Network.NodeMetadatas); want != have {
t.Errorf("want %d, have %d", want, have)
}
collector.Remove(addr)
collector.Remove(addr) // test duplicate case
}
func TestCollectorQuitWithActiveConnections(t *testing.T) {
c := NewCollector(time.Second)
c.Add("1.2.3.4:56789")
c.Stop()
}
func testPublisher(t *testing.T, input <-chan interface{}) net.Listener {
addr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
// Start a raw publisher
ln, err := net.ListenTCP("tcp4", addr)
if err != nil {
t.Fatal(err)
}
defer ln.Close()
// Accept one connection, write one report
data := make(chan []byte)
go func() {
conn, err := ln.Accept()
if err != nil {
t.Error(err)
t.Log(err)
return
}
defer conn.Close()
if _, err := conn.Write(<-data); err != nil {
t.Error(err)
return
for {
enc := gob.NewEncoder(conn)
for v := range input {
if err := enc.Encode(v); err != nil {
t.Error(err)
return
}
}
}
}()
return ln
}
// Start a collector
batchTime := 10 * time.Millisecond
c := xfer.NewCollector(batchTime)
c.AddAddress("127.0.0.1" + port)
gate := make(chan struct{})
go func() { <-c.Reports(); c.Stop(); close(gate) }()
// Publish a message
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(report.Report{}); err != nil {
t.Fatal(err)
}
data <- buf.Bytes()
// Check it was collected and forwarded
select {
case <-gate:
case <-time.After(2 * batchTime):
t.Errorf("timeout waiting for report")
func poll(t *testing.T, d time.Duration, condition func() bool, msg string) {
deadline := time.Now().Add(d)
for {
if time.Now().After(deadline) {
t.Fatal(msg)
}
if condition() {
return
}
time.Sleep(d / 10)
}
}

View File

@@ -32,8 +32,8 @@ func TestMerge(t *testing.T) {
batchTime := 100 * time.Millisecond
c := xfer.NewCollector(batchTime)
c.AddAddress(p1Addr)
c.AddAddress(p2Addr)
c.Add(p1Addr)
c.Add(p2Addr)
defer c.Stop()
time.Sleep(batchTime / 10) // connect