mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-05 19:21:46 +00:00
Use host -> (Conn, Encoder) mapping for active conns
This commit is contained in:
@@ -53,7 +53,7 @@ func (p *TCPPublisher) Publish(msg report.Report) {
|
||||
}
|
||||
|
||||
func (p *TCPPublisher) loop(incoming <-chan net.Conn) {
|
||||
activeConns := map[net.Conn]*gob.Encoder{}
|
||||
activeConns := map[string]connEncoder{} // host: connEncoder
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -69,44 +69,36 @@ func (p *TCPPublisher) loop(incoming <-chan net.Conn) {
|
||||
conn.Close()
|
||||
continue
|
||||
}
|
||||
outer:
|
||||
for activeConn := range activeConns {
|
||||
activeHost, _, err := net.SplitHostPort(activeConn.RemoteAddr().String())
|
||||
if err != nil {
|
||||
log.Printf("active connection: %s: %v (strange)", activeConn.RemoteAddr(), err)
|
||||
continue
|
||||
}
|
||||
if host == activeHost {
|
||||
log.Printf("duplicate connection: %s (dropped)", conn.RemoteAddr())
|
||||
conn.Close()
|
||||
continue outer
|
||||
}
|
||||
if _, ok := activeConns[host]; ok {
|
||||
log.Printf("duplicate connection: %s (dropped)", conn.RemoteAddr())
|
||||
conn.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("connection initiated: %s", conn.RemoteAddr())
|
||||
activeConns[conn] = gob.NewEncoder(conn)
|
||||
activeConns[host] = connEncoder{conn, gob.NewEncoder(conn)}
|
||||
|
||||
case msg, ok := <-p.msg:
|
||||
if !ok {
|
||||
return // someone closed our msg chan, so we're done
|
||||
}
|
||||
|
||||
teminatedConns := []net.Conn{}
|
||||
for conn, encoder := range activeConns {
|
||||
if err := encoder.Encode(msg); err != nil {
|
||||
log.Printf("connection terminated: %s: %v", conn.RemoteAddr(), err)
|
||||
teminatedConns = append(teminatedConns, conn)
|
||||
conn.Close()
|
||||
for host, connEncoder := range activeConns {
|
||||
if err := connEncoder.Encoder.Encode(msg); err != nil {
|
||||
log.Printf("connection terminated: %s: %v", connEncoder.Conn.RemoteAddr(), err)
|
||||
connEncoder.Conn.Close()
|
||||
delete(activeConns, host)
|
||||
}
|
||||
}
|
||||
|
||||
for _, conn := range teminatedConns {
|
||||
delete(activeConns, conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type connEncoder struct {
|
||||
net.Conn
|
||||
*gob.Encoder
|
||||
}
|
||||
|
||||
func fwd(ln net.Listener) chan net.Conn {
|
||||
c := make(chan net.Conn)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user