mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 18:20:27 +00:00
Merge pull request #176 from weaveworks/collector-interface
Add interface to xfer/collector.go, and some more tests
This commit is contained in:
@@ -67,7 +67,7 @@ func main() {
|
||||
c := xfer.NewCollector(*batch)
|
||||
defer c.Stop()
|
||||
|
||||
r := NewResolver(probes, c.AddAddress)
|
||||
r := NewResolver(probes, c.Add)
|
||||
defer r.Stop()
|
||||
|
||||
lifo := NewReportLIFO(c, *window)
|
||||
|
||||
@@ -17,51 +17,62 @@ const (
|
||||
|
||||
var (
|
||||
// MaxBackoff is the maximum time between connect retries.
|
||||
MaxBackoff = 2 * time.Minute // externally configurable.
|
||||
// It's exported so it's externally configurable.
|
||||
MaxBackoff = 2 * time.Minute
|
||||
|
||||
// This is extracted out for mocking.
|
||||
tick = time.Tick
|
||||
)
|
||||
|
||||
// Collector connects to probes over TCP and merges reports published by those
|
||||
// probes into a single one.
|
||||
type Collector struct {
|
||||
in chan report.Report
|
||||
out chan report.Report
|
||||
add chan string
|
||||
remove chan string
|
||||
quit chan chan struct{}
|
||||
// Collector describes anything that can have addresses added and removed, and
|
||||
// which produces reports that represent aggregate reports from all collected
|
||||
// addresses.
|
||||
type Collector interface {
|
||||
Add(string)
|
||||
Remove(string)
|
||||
Reports() <-chan report.Report
|
||||
Stop()
|
||||
}
|
||||
|
||||
// NewCollector starts the report collector.
|
||||
func NewCollector(batchTime time.Duration) *Collector {
|
||||
c := &Collector{
|
||||
// realCollector connects to probes over TCP and merges reports published by those
|
||||
// probes into a single one.
|
||||
type realCollector struct {
|
||||
in chan report.Report
|
||||
out chan report.Report
|
||||
peekc chan chan report.Report
|
||||
add chan string
|
||||
remove chan string
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// NewCollector produces and returns a report collector.
|
||||
func NewCollector(batchTime time.Duration) Collector {
|
||||
c := &realCollector{
|
||||
in: make(chan report.Report),
|
||||
out: make(chan report.Report),
|
||||
peekc: make(chan chan report.Report),
|
||||
add: make(chan string),
|
||||
remove: make(chan string),
|
||||
quit: make(chan chan struct{}),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
|
||||
go c.loop(batchTime)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Collector) loop(batchTime time.Duration) {
|
||||
func (c *realCollector) loop(batchTime time.Duration) {
|
||||
var (
|
||||
tick = time.Tick(batchTime)
|
||||
tick = tick(batchTime)
|
||||
current = report.NewReport()
|
||||
addrs = map[string]chan struct{}{}
|
||||
wg = &sync.WaitGroup{} // individual collector goroutines
|
||||
wg = &sync.WaitGroup{} // per-address goroutines
|
||||
)
|
||||
|
||||
add := func(ip string) {
|
||||
if _, ok := addrs[ip]; ok {
|
||||
return
|
||||
}
|
||||
|
||||
addrs[ip] = make(chan struct{})
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
go func(quit chan struct{}) {
|
||||
defer wg.Done()
|
||||
reportCollector(ip, c.in, quit)
|
||||
@@ -73,7 +84,6 @@ func (c *Collector) loop(batchTime time.Duration) {
|
||||
if !ok {
|
||||
return // hmm
|
||||
}
|
||||
|
||||
close(q)
|
||||
delete(addrs, ip)
|
||||
}
|
||||
@@ -84,6 +94,9 @@ func (c *Collector) loop(batchTime time.Duration) {
|
||||
c.out <- current
|
||||
current = report.NewReport()
|
||||
|
||||
case pc := <-c.peekc:
|
||||
pc <- current
|
||||
|
||||
case r := <-c.in:
|
||||
current.Merge(r)
|
||||
|
||||
@@ -93,49 +106,43 @@ func (c *Collector) loop(batchTime time.Duration) {
|
||||
case ip := <-c.remove:
|
||||
remove(ip)
|
||||
|
||||
case q := <-c.quit:
|
||||
case <-c.quit:
|
||||
for _, q := range addrs {
|
||||
close(q)
|
||||
}
|
||||
wg.Wait()
|
||||
close(q)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop shuts down a collector and all connections to probes.
|
||||
func (c *Collector) Stop() {
|
||||
q := make(chan struct{})
|
||||
c.quit <- q
|
||||
<-q
|
||||
// Add adds an address to be collected from.
|
||||
func (c *realCollector) Add(addr string) {
|
||||
c.add <- addr
|
||||
}
|
||||
|
||||
// AddAddress adds the passed IP to the collector, and starts (trying to)
|
||||
// collect reports from the remote Publisher.
|
||||
func (c *Collector) AddAddress(ip string) {
|
||||
c.add <- ip
|
||||
// Remove removes a previously-added address.
|
||||
func (c *realCollector) Remove(addr string) {
|
||||
c.remove <- addr
|
||||
}
|
||||
|
||||
// AddAddresses adds the passed IPs to the collector, and starts (trying to)
|
||||
// collect reports from the remote Publisher.
|
||||
func (c *Collector) AddAddresses(ips []string) {
|
||||
for _, addr := range ips {
|
||||
c.AddAddress(addr)
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveAddress removes the passed IP from the collector, and stops
|
||||
// collecting reports from the remote Publisher.
|
||||
func (c *Collector) RemoveAddress(ip string) {
|
||||
c.remove <- ip
|
||||
}
|
||||
|
||||
// Reports returns the channel where aggregate reports are sent.
|
||||
func (c *Collector) Reports() <-chan report.Report {
|
||||
// Reports returns the report chan. It must be consumed by the client, or the
|
||||
// collector will break.
|
||||
func (c *realCollector) Reports() <-chan report.Report {
|
||||
return c.out
|
||||
}
|
||||
|
||||
func (c *realCollector) peek() report.Report {
|
||||
pc := make(chan report.Report)
|
||||
c.peekc <- pc
|
||||
return <-pc
|
||||
}
|
||||
|
||||
// Stop terminates the collector.
|
||||
func (c *realCollector) Stop() {
|
||||
close(c.quit)
|
||||
}
|
||||
|
||||
// reportCollector is the loop to connect to a single Probe. It'll keep
|
||||
// running until the quit channel is closed.
|
||||
func reportCollector(ip string, col chan<- report.Report, quit <-chan struct{}) {
|
||||
@@ -188,7 +195,7 @@ func reportCollector(ip string, col chan<- report.Report, quit <-chan struct{})
|
||||
log.Printf("decode error: %v", err)
|
||||
break
|
||||
}
|
||||
//log.Printf("collector: got a report from %v", ip)
|
||||
log.Printf("collector: got a report from %v", ip)
|
||||
|
||||
select {
|
||||
case col <- report:
|
||||
|
||||
@@ -1,69 +1,102 @@
|
||||
package xfer_test
|
||||
package xfer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/weaveworks/scope/report"
|
||||
"github.com/weaveworks/scope/xfer"
|
||||
)
|
||||
|
||||
func TestCollector(t *testing.T) {
|
||||
log.SetOutput(ioutil.Discard)
|
||||
|
||||
// Build the address
|
||||
port := ":12345"
|
||||
addr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1"+port)
|
||||
// Swap out ticker
|
||||
publish := make(chan time.Time)
|
||||
oldTick := tick
|
||||
tick = func(time.Duration) <-chan time.Time { return publish }
|
||||
defer func() { tick = oldTick }()
|
||||
|
||||
// Build a collector
|
||||
collector := NewCollector(time.Second)
|
||||
defer collector.Stop()
|
||||
|
||||
concreteCollector, ok := collector.(*realCollector)
|
||||
if !ok {
|
||||
t.Fatal("type assertion failure")
|
||||
}
|
||||
|
||||
// Build a test publisher
|
||||
reports := make(chan interface{})
|
||||
ln := testPublisher(t, reports)
|
||||
defer ln.Close()
|
||||
|
||||
// Connect the collector to the test publisher
|
||||
addr := ln.Addr().String()
|
||||
collector.Add(addr)
|
||||
collector.Add(addr) // test duplicate case
|
||||
runtime.Gosched() // make sure it connects
|
||||
|
||||
// Push a report through everything
|
||||
reports <- report.Report{Network: report.Topology{NodeMetadatas: report.NodeMetadatas{"a": report.NodeMetadata{}}}}
|
||||
poll(t, time.Millisecond, func() bool { return len(concreteCollector.peek().Network.NodeMetadatas) == 1 }, "missed the report")
|
||||
go func() { publish <- time.Now() }()
|
||||
if want, have := 1, len((<-collector.Reports()).Network.NodeMetadatas); want != have {
|
||||
t.Errorf("want %d, have %d", want, have)
|
||||
}
|
||||
|
||||
collector.Remove(addr)
|
||||
collector.Remove(addr) // test duplicate case
|
||||
}
|
||||
|
||||
func TestCollectorQuitWithActiveConnections(t *testing.T) {
|
||||
c := NewCollector(time.Second)
|
||||
c.Add("1.2.3.4:56789")
|
||||
c.Stop()
|
||||
}
|
||||
|
||||
func testPublisher(t *testing.T, input <-chan interface{}) net.Listener {
|
||||
addr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Start a raw publisher
|
||||
ln, err := net.ListenTCP("tcp4", addr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer ln.Close()
|
||||
|
||||
// Accept one connection, write one report
|
||||
data := make(chan []byte)
|
||||
go func() {
|
||||
conn, err := ln.Accept()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Log(err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
if _, err := conn.Write(<-data); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
for {
|
||||
enc := gob.NewEncoder(conn)
|
||||
for v := range input {
|
||||
if err := enc.Encode(v); err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
return ln
|
||||
}
|
||||
|
||||
// Start a collector
|
||||
batchTime := 10 * time.Millisecond
|
||||
c := xfer.NewCollector(batchTime)
|
||||
c.AddAddress("127.0.0.1" + port)
|
||||
gate := make(chan struct{})
|
||||
go func() { <-c.Reports(); c.Stop(); close(gate) }()
|
||||
|
||||
// Publish a message
|
||||
var buf bytes.Buffer
|
||||
if err := gob.NewEncoder(&buf).Encode(report.Report{}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
data <- buf.Bytes()
|
||||
|
||||
// Check it was collected and forwarded
|
||||
select {
|
||||
case <-gate:
|
||||
case <-time.After(2 * batchTime):
|
||||
t.Errorf("timeout waiting for report")
|
||||
func poll(t *testing.T, d time.Duration, condition func() bool, msg string) {
|
||||
deadline := time.Now().Add(d)
|
||||
for {
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatal(msg)
|
||||
}
|
||||
if condition() {
|
||||
return
|
||||
}
|
||||
time.Sleep(d / 10)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,8 +32,8 @@ func TestMerge(t *testing.T) {
|
||||
|
||||
batchTime := 100 * time.Millisecond
|
||||
c := xfer.NewCollector(batchTime)
|
||||
c.AddAddress(p1Addr)
|
||||
c.AddAddress(p2Addr)
|
||||
c.Add(p1Addr)
|
||||
c.Add(p2Addr)
|
||||
defer c.Stop()
|
||||
time.Sleep(batchTime / 10) // connect
|
||||
|
||||
|
||||
Reference in New Issue
Block a user