From 3e9eb83d124961bd0e3ef009f56a4a23087f5432 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 1 Jun 2017 17:40:47 +0000 Subject: [PATCH] Use Kubernetes node name to filter pods if possible --- probe/kubernetes/reporter.go | 31 +++++++++++++++++-------------- prog/main.go | 2 ++ prog/probe.go | 2 +- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/probe/kubernetes/reporter.go b/probe/kubernetes/reporter.go index 45abeda2c..e93a6ccd1 100644 --- a/probe/kubernetes/reporter.go +++ b/probe/kubernetes/reporter.go @@ -111,11 +111,12 @@ type Reporter struct { probe *probe.Probe hostID string handlerRegistry *controls.HandlerRegistry + nodeName string kubeletPort uint } // NewReporter makes a new Reporter -func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry, kubeletPort uint) *Reporter { +func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry, nodeName string, kubeletPort uint) *Reporter { reporter := &Reporter{ client: client, pipes: pipes, @@ -123,6 +124,7 @@ func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostI probe: probe, hostID: hostID, handlerRegistry: handlerRegistry, + nodeName: nodeName, kubeletPort: kubeletPort, } reporter.registerControls() @@ -424,21 +426,22 @@ func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet, dae )) } - // Obtain the local pods from kubelet since we only want to report those - // for performance reasons. - // - // In theory a simpler approach would be to obtain the current NodeName - // and filter local pods based on that. However that's hard since - // 1. reconstructing the NodeName requires cloud provider credentials - // 2. inferring the NodeName out of the hostname or system uuid is unreliable - // (uuids and hostnames can be duplicated across the cluster). - localPodUIDs, errUIDs := GetLocalPodUIDs(fmt.Sprintf("127.0.0.1:%d", r.kubeletPort)) - if errUIDs != nil { - log.Warnf("Cannot obtain local pods, reporting all (which may impact performance): %v", errUIDs) + var localPodUIDs map[string]struct{} + if r.nodeName == "" { + // We don't know the node name: fall back to obtaining the local pods from kubelet + var err error + localPodUIDs, err = GetLocalPodUIDs(fmt.Sprintf("127.0.0.1:%d", r.kubeletPort)) + if err != nil { + log.Warnf("No node name and cannot obtain local pods, reporting all (which may impact performance): %v", err) + } } err := r.client.WalkPods(func(p Pod) error { - // filter out non-local pods - if errUIDs == nil { + // filter out non-local pods: we only want to report local ones for performance reasons. + if r.nodeName != "" { + if p.NodeName() != r.nodeName { + return nil + } + } else if localPodUIDs != nil { if _, ok := localPodUIDs[p.UID()]; !ok { return nil } diff --git a/prog/main.go b/prog/main.go index ecb9d3d30..52832223d 100644 --- a/prog/main.go +++ b/prog/main.go @@ -120,6 +120,7 @@ type probeFlags struct { dockerBridge string kubernetesEnabled bool + kubernetesNodeName string kubernetesClientConfig kubernetes.ClientConfig kubernetesKubeletPort uint @@ -314,6 +315,7 @@ func setupFlags(flags *flags) { flag.StringVar(&flags.probe.kubernetesClientConfig.Token, kubernetesTokenFlag, "", "Bearer token for authentication to the API server") flag.StringVar(&flags.probe.kubernetesClientConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use") flag.StringVar(&flags.probe.kubernetesClientConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server") + flag.StringVar(&flags.probe.kubernetesNodeName, "probe.kubernetes.node-name", "", "Name of this node, for filtering pods") flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet") // AWS ECS diff --git a/prog/probe.go b/prog/probe.go index 412f01511..a3f9806d5 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -215,7 +215,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { if flags.kubernetesEnabled { if client, err := kubernetes.NewClient(flags.kubernetesClientConfig); err == nil { defer client.Stop() - reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesKubeletPort) + reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesNodeName, flags.kubernetesKubeletPort) defer reporter.Stop() p.AddReporter(reporter) p.AddTagger(reporter)