New asynchronous, caching DNS resolver for reverse resolutions

Add nodes for the remote side of connections iff we have a DNS reverse resolution for the IP.
Unit test for the resolver
This commit is contained in:
Alvaro Saurin
2015-08-27 14:37:53 +02:00
parent 6e7a22ecae
commit 15e25edc40
6 changed files with 160 additions and 5 deletions

View File

@@ -26,6 +26,7 @@ type Reporter struct {
includeNAT bool
conntracker *Conntracker
natmapper *natmapper
revResolver *reverseResolver
}
// SpyDuration is an exported prometheus metric
@@ -64,12 +65,16 @@ func NewReporter(hostID, hostName string, includeProcesses bool, useConntrack bo
log.Printf("Failed to start natMapper: %v", err)
}
}
revRes := newReverseResolver()
return &Reporter{
hostID: hostID,
hostName: hostName,
includeProcesses: includeProcesses,
conntracker: conntracker,
natmapper: natmapper,
revResolver: revRes,
}
}
@@ -81,6 +86,7 @@ func (r *Reporter) Stop() {
if r.natmapper != nil {
r.natmapper.Stop()
}
r.revResolver.Stop()
}
// Report implements Reporter.
@@ -145,6 +151,13 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
})
)
// in case we have a reverse resolution for the IP, we can use it for the name...
if revRemoteName, err := r.revResolver.Get(remoteAddr, false); err == nil {
remoteNode = remoteNode.AddMetadata(map[string]string{
"name": revRemoteName,
})
}
if localIsClient {
// New nodes are merged into the report so we don't need to do any counting here; the merge does it for us.
localNode = localNode.WithEdge(remoteAddressNodeID, report.EdgeMetadata{
@@ -177,6 +190,13 @@ func (r *Reporter) addConnection(rpt *report.Report, localAddr, remoteAddr strin
})
)
// in case we have a reverse resolution for the IP, we can use it for the name...
if revRemoteName, err := r.revResolver.Get(remoteAddr, false); err == nil {
remoteNode = remoteNode.AddMetadata(map[string]string{
"name": revRemoteName,
})
}
if localIsClient {
// New nodes are merged into the report so we don't need to do any counting here; the merge does it for us.
localNode = localNode.WithEdge(remoteEndpointNodeID, report.EdgeMetadata{

View File

@@ -54,7 +54,7 @@ var (
LocalAddress: fixLocalAddress,
LocalPort: fixLocalPort,
RemoteAddress: fixRemoteAddress,
RemotePort: fixRemotePortB,
RemotePort: fixRemotePort,
Proc: procspy.Proc{
PID: fixProcessPID,
Name: fixProcessName,

View File

@@ -0,0 +1,85 @@
package endpoint
import (
"net"
"strings"
"time"
"github.com/bluele/gcache"
)
const (
rAddrCacheLen = 500 // Default cache length
rAddrBacklog = 1000
rAddrCacheExpiration = 30 * time.Minute
)
type revResFunc func(addr string) (names []string, err error)
type revResRequest struct {
address string
done chan struct{}
}
// ReverseResolver is a caching, reverse resolver
type reverseResolver struct {
addresses chan revResRequest
cache gcache.Cache
resolver revResFunc
}
// NewReverseResolver starts a new reverse resolver that
// performs reverse resolutions and caches the result.
func newReverseResolver() *reverseResolver {
r := reverseResolver{
addresses: make(chan revResRequest, rAddrBacklog),
cache: gcache.New(rAddrCacheLen).LRU().Expiration(rAddrCacheExpiration).Build(),
resolver: net.LookupAddr,
}
go r.loop()
return &r
}
// Get the reverse resolution for an IP address if already in the cache,
// a gcache.NotFoundKeyError error otherwise.
// Note: it returns one of the possible names that can be obtained for that IP.
func (r *reverseResolver) Get(address string, wait bool) (string, error) {
val, err := r.cache.Get(address)
if err == nil {
return val.(string), nil
}
if err == gcache.NotFoundKeyError {
request := revResRequest{address: address, done: make(chan struct{})}
// we trigger a asynchronous reverse resolution when not cached
select {
case r.addresses <- request:
if wait {
<-request.done
}
default:
}
}
return "", err
}
func (r *reverseResolver) loop() {
throttle := time.Tick(time.Second / 10)
for request := range r.addresses {
<-throttle // rate limit our DNS resolutions
// and check if the answer is already in the cache
if _, err := r.cache.Get(request.address); err == nil {
continue
}
names, err := r.resolver(request.address)
if err == nil && len(names) > 0 {
name := strings.TrimRight(names[0], ".")
r.cache.Set(request.address, name)
}
close(request.done)
}
}
// Stop the async reverse resolver
func (r *reverseResolver) Stop() {
close(r.addresses)
}

View File

@@ -0,0 +1,41 @@
package endpoint
import (
"errors"
"testing"
)
func TestReverseResolver(t *testing.T) {
tests := map[string]string{
"8.8.8.8": "google-public-dns-a.google.com",
"8.8.4.4": "google-public-dns-b.google.com",
}
revRes := newReverseResolver()
// use a mocked resolver function
revRes.resolver = func(addr string) (names []string, err error) {
if name, ok := tests[addr]; ok {
return []string{name}, nil
}
return []string{}, errors.New("invalid IP")
}
// first time: no names are returned for our reverse resolutions
for ip := range tests {
if have, err := revRes.Get(ip, true); have != "" || err == nil {
t.Errorf("we didn't get an error, or the cache was not empty, when trying to resolve '%q'", ip)
}
}
// so, if we check again these IPs, we should have the names now
for ip, want := range tests {
have, err := revRes.Get(ip, true)
if err != nil {
t.Errorf("%s: %v", ip, err)
}
if want != have {
t.Errorf("%s: want %q, have %q", ip, want, have)
}
}
}

View File

@@ -205,8 +205,11 @@ func OriginTable(r report.Report, originID string, addHostTags bool, addContaine
func connectionDetailsRows(topology report.Topology, originID string) []Row {
rows := []Row{}
labeler := func(nodeID string) (string, bool) {
labeler := func(nodeID string, meta map[string]string) (string, bool) {
if _, addr, port, ok := report.ParseEndpointNodeID(nodeID); ok {
if name, ok := meta["name"]; ok {
return fmt.Sprintf("%s:%s", name, port), true
}
return fmt.Sprintf("%s:%s", addr, port), true
}
if _, addr, ok := report.ParseAddressNodeID(nodeID); ok {
@@ -214,13 +217,13 @@ func connectionDetailsRows(topology report.Topology, originID string) []Row {
}
return "", false
}
local, ok := labeler(originID)
local, ok := labeler(originID, topology.Nodes[originID].Metadata)
if !ok {
return rows
}
// Firstly, collection outgoing connections from this node.
for _, serverNodeID := range topology.Nodes[originID].Adjacency {
remote, ok := labeler(serverNodeID)
remote, ok := labeler(serverNodeID, topology.Nodes[serverNodeID].Metadata)
if !ok {
continue
}
@@ -239,7 +242,7 @@ func connectionDetailsRows(topology report.Topology, originID string) []Row {
if !serverNodeIDs.Contains(originID) {
continue
}
remote, ok := labeler(clientNodeID)
remote, ok := labeler(clientNodeID, clientNode.Metadata)
if !ok {
continue
}

View File

@@ -102,6 +102,12 @@ func (n Node) WithMetadata(m map[string]string) Node {
return result
}
// AddMetadata returns a fresh copy of n, with Metadata set to the merge of n and the metadata provided
func (n Node) AddMetadata(m map[string]string) Node {
additional := MakeNodeWith(m)
return n.Merge(additional)
}
// WithCounters returns a fresh copy of n, with Counters set to c
func (n Node) WithCounters(c map[string]int) Node {
result := n.Copy()