mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 02:00:43 +00:00
Merge pull request #56 from weaveworks/one-connection-per-app
One connection per app
This commit is contained in:
@@ -2,7 +2,6 @@ package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -22,7 +21,6 @@ import (
|
||||
func main() {
|
||||
var (
|
||||
httpListen = flag.String("http.listen", "", "listen address for HTTP profiling and instrumentation server")
|
||||
version = flag.Bool("version", false, "print version number and exit")
|
||||
publishInterval = flag.Duration("publish.interval", 1*time.Second, "publish (output) interval")
|
||||
spyInterval = flag.Duration("spy.interval", 100*time.Millisecond, "spy (scan) interval")
|
||||
listen = flag.String("listen", ":"+strconv.Itoa(xfer.ProbePort), "listen address")
|
||||
@@ -39,12 +37,6 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// -version flag:
|
||||
if *version {
|
||||
fmt.Printf("unstable\n")
|
||||
return
|
||||
}
|
||||
|
||||
procspy.SetProcRoot(*procRoot)
|
||||
|
||||
if *httpListen != "" {
|
||||
|
||||
@@ -53,7 +53,12 @@ func (p *TCPPublisher) Publish(msg report.Report) {
|
||||
}
|
||||
|
||||
func (p *TCPPublisher) loop(incoming <-chan net.Conn) {
|
||||
var activeConns = make(map[net.Conn]*gob.Encoder)
|
||||
type connEncoder struct {
|
||||
net.Conn
|
||||
*gob.Encoder
|
||||
}
|
||||
|
||||
activeConns := map[string]connEncoder{} // host: connEncoder
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -62,26 +67,34 @@ func (p *TCPPublisher) loop(incoming <-chan net.Conn) {
|
||||
return // someone closed our connection chan -- weird?
|
||||
}
|
||||
|
||||
// Don't allow multiple connections from the same remote host.
|
||||
host, _, err := net.SplitHostPort(conn.RemoteAddr().String())
|
||||
if err != nil {
|
||||
log.Printf("incoming connection: %s: %v (dropped)", conn.RemoteAddr(), err)
|
||||
conn.Close()
|
||||
continue
|
||||
}
|
||||
if _, ok := activeConns[host]; ok {
|
||||
log.Printf("duplicate connection: %s (dropped)", conn.RemoteAddr())
|
||||
conn.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("connection initiated: %s", conn.RemoteAddr())
|
||||
activeConns[conn] = gob.NewEncoder(conn)
|
||||
activeConns[host] = connEncoder{conn, gob.NewEncoder(conn)}
|
||||
|
||||
case msg, ok := <-p.msg:
|
||||
if !ok {
|
||||
return // someone closed our msg chan, so we're done
|
||||
}
|
||||
|
||||
var teminatedConns []net.Conn
|
||||
for conn, encoder := range activeConns {
|
||||
if err := encoder.Encode(msg); err != nil {
|
||||
log.Printf("connection terminated: %v", err)
|
||||
teminatedConns = append(teminatedConns, conn)
|
||||
conn.Close()
|
||||
for host, connEncoder := range activeConns {
|
||||
if err := connEncoder.Encoder.Encode(msg); err != nil {
|
||||
log.Printf("connection terminated: %s: %v", connEncoder.Conn.RemoteAddr(), err)
|
||||
connEncoder.Conn.Close()
|
||||
delete(activeConns, host)
|
||||
}
|
||||
}
|
||||
|
||||
for _, conn := range teminatedConns {
|
||||
delete(activeConns, conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package xfer_test
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
@@ -15,9 +16,8 @@ import (
|
||||
func TestTCPPublisher(t *testing.T) {
|
||||
log.SetOutput(ioutil.Discard)
|
||||
|
||||
// Build the address
|
||||
port := ":12345"
|
||||
addr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1"+port)
|
||||
// Choose a port
|
||||
port, err := getFreePort()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -30,7 +30,7 @@ func TestTCPPublisher(t *testing.T) {
|
||||
defer p.Close()
|
||||
|
||||
// Start a raw listener
|
||||
conn, err := net.DialTCP("tcp4", nil, addr)
|
||||
conn, err := net.Dial("tcp4", "127.0.0.1"+port)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -46,3 +46,64 @@ func TestTCPPublisher(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublisherClosesDuplicateConnections(t *testing.T) {
|
||||
log.SetOutput(ioutil.Discard)
|
||||
|
||||
// Choose a port
|
||||
port, err := getFreePort()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Start a publisher
|
||||
p, err := xfer.NewTCPPublisher(port)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
// Connect a listener
|
||||
conn, err := net.Dial("tcp4", "127.0.0.1"+port)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
// Try to connect the same listener
|
||||
dupconn, err := net.Dial("tcp4", "127.0.0.1"+port)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer dupconn.Close()
|
||||
|
||||
// Publish a message
|
||||
p.Publish(report.Report{})
|
||||
|
||||
// The first listener should receive it
|
||||
var r report.Report
|
||||
if err := gob.NewDecoder(conn).Decode(&r); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// The duplicate listener should have an error
|
||||
if err := gob.NewDecoder(dupconn).Decode(&r); err == nil {
|
||||
t.Errorf("expected error, got none")
|
||||
} else {
|
||||
t.Logf("dupconn got expected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func getFreePort() (string, error) {
|
||||
ln, err := net.Listen("tcp4", ":0")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Listen: %v", err)
|
||||
}
|
||||
defer ln.Close()
|
||||
_, port, err := net.SplitHostPort(ln.Addr().String())
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("SplitHostPort(%s): %v", ln.Addr().String(), err)
|
||||
}
|
||||
return ":" + port, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user