Publisher refuses connections from the same host

This commit is contained in:
Peter Bourgon
2015-05-19 12:21:11 +02:00
parent 84d724f645
commit ad45ae2c96
2 changed files with 89 additions and 7 deletions

View File

@@ -53,7 +53,7 @@ func (p *TCPPublisher) Publish(msg report.Report) {
}
func (p *TCPPublisher) loop(incoming <-chan net.Conn) {
var activeConns = make(map[net.Conn]*gob.Encoder)
activeConns := map[net.Conn]*gob.Encoder{}
for {
select {
@@ -62,6 +62,27 @@ func (p *TCPPublisher) loop(incoming <-chan net.Conn) {
return // someone closed our connection chan -- weird?
}
// Don't allow multiple connections from the same remote host.
host, _, err := net.SplitHostPort(conn.RemoteAddr().String())
if err != nil {
log.Printf("incoming connection: %s: %v (dropped)", conn.RemoteAddr(), err)
conn.Close()
continue
}
outer:
for activeConn := range activeConns {
activeHost, _, err := net.SplitHostPort(activeConn.RemoteAddr().String())
if err != nil {
log.Printf("active connection: %s: %v (strange)", activeConn.RemoteAddr(), err)
continue
}
if host == activeHost {
log.Printf("duplicate connection: %s (dropped)", conn.RemoteAddr())
conn.Close()
continue outer
}
}
log.Printf("connection initiated: %s", conn.RemoteAddr())
activeConns[conn] = gob.NewEncoder(conn)
@@ -70,10 +91,10 @@ func (p *TCPPublisher) loop(incoming <-chan net.Conn) {
return // someone closed our msg chan, so we're done
}
var teminatedConns []net.Conn
teminatedConns := []net.Conn{}
for conn, encoder := range activeConns {
if err := encoder.Encode(msg); err != nil {
log.Printf("connection terminated: %v", err)
log.Printf("connection terminated: %s: %v", conn.RemoteAddr(), err)
teminatedConns = append(teminatedConns, conn)
conn.Close()
}

View File

@@ -2,6 +2,7 @@ package xfer_test
import (
"encoding/gob"
"fmt"
"io/ioutil"
"log"
"net"
@@ -15,9 +16,8 @@ import (
func TestTCPPublisher(t *testing.T) {
log.SetOutput(ioutil.Discard)
// Build the address
port := ":12345"
addr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1"+port)
// Choose a port
port, err := getFreePort()
if err != nil {
t.Fatal(err)
}
@@ -30,7 +30,7 @@ func TestTCPPublisher(t *testing.T) {
defer p.Close()
// Start a raw listener
conn, err := net.DialTCP("tcp4", nil, addr)
conn, err := net.Dial("tcp4", "127.0.0.1"+port)
if err != nil {
t.Fatal(err)
}
@@ -46,3 +46,64 @@ func TestTCPPublisher(t *testing.T) {
t.Fatal(err)
}
}
func TestPublisherClosesDuplicateConnections(t *testing.T) {
log.SetOutput(ioutil.Discard)
// Choose a port
port, err := getFreePort()
if err != nil {
t.Fatal(err)
}
// Start a publisher
p, err := xfer.NewTCPPublisher(port)
if err != nil {
t.Fatal(err)
}
defer p.Close()
// Connect a listener
conn, err := net.Dial("tcp4", "127.0.0.1"+port)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
time.Sleep(time.Millisecond)
// Try to connect the same listener
dupconn, err := net.Dial("tcp4", "127.0.0.1"+port)
if err != nil {
t.Fatal(err)
}
defer dupconn.Close()
// Publish a message
p.Publish(report.Report{})
// The first listener should receive it
var r report.Report
if err := gob.NewDecoder(conn).Decode(&r); err != nil {
t.Fatal(err)
}
// The duplicate listener should have an error
if err := gob.NewDecoder(dupconn).Decode(&r); err == nil {
t.Errorf("expected error, got none")
} else {
t.Logf("dupconn got expected error: %v", err)
}
}
func getFreePort() (string, error) {
ln, err := net.Listen("tcp4", ":0")
if err != nil {
return "", fmt.Errorf("Listen: %v", err)
}
defer ln.Close()
_, port, err := net.SplitHostPort(ln.Addr().String())
if err != nil {
return "", fmt.Errorf("SplitHostPort(%s): %v", ln.Addr().String(), err)
}
return ":" + port, nil
}