Use Kubernetes node name to filter pods if possible

This commit is contained in:
Bryan Boreham
2017-06-01 17:40:47 +00:00
parent 387a68a3c7
commit 3e9eb83d12
3 changed files with 20 additions and 15 deletions

View File

@@ -111,11 +111,12 @@ type Reporter struct {
probe *probe.Probe
hostID string
handlerRegistry *controls.HandlerRegistry
nodeName string
kubeletPort uint
}
// NewReporter makes a new Reporter
func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry, kubeletPort uint) *Reporter {
func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry, nodeName string, kubeletPort uint) *Reporter {
reporter := &Reporter{
client: client,
pipes: pipes,
@@ -123,6 +124,7 @@ func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostI
probe: probe,
hostID: hostID,
handlerRegistry: handlerRegistry,
nodeName: nodeName,
kubeletPort: kubeletPort,
}
reporter.registerControls()
@@ -424,21 +426,22 @@ func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet, dae
))
}
// Obtain the local pods from kubelet since we only want to report those
// for performance reasons.
//
// In theory a simpler approach would be to obtain the current NodeName
// and filter local pods based on that. However that's hard since
// 1. reconstructing the NodeName requires cloud provider credentials
// 2. inferring the NodeName out of the hostname or system uuid is unreliable
// (uuids and hostnames can be duplicated across the cluster).
localPodUIDs, errUIDs := GetLocalPodUIDs(fmt.Sprintf("127.0.0.1:%d", r.kubeletPort))
if errUIDs != nil {
log.Warnf("Cannot obtain local pods, reporting all (which may impact performance): %v", errUIDs)
var localPodUIDs map[string]struct{}
if r.nodeName == "" {
// We don't know the node name: fall back to obtaining the local pods from kubelet
var err error
localPodUIDs, err = GetLocalPodUIDs(fmt.Sprintf("127.0.0.1:%d", r.kubeletPort))
if err != nil {
log.Warnf("No node name and cannot obtain local pods, reporting all (which may impact performance): %v", err)
}
}
err := r.client.WalkPods(func(p Pod) error {
// filter out non-local pods
if errUIDs == nil {
// filter out non-local pods: we only want to report local ones for performance reasons.
if r.nodeName != "" {
if p.NodeName() != r.nodeName {
return nil
}
} else if localPodUIDs != nil {
if _, ok := localPodUIDs[p.UID()]; !ok {
return nil
}

View File

@@ -120,6 +120,7 @@ type probeFlags struct {
dockerBridge string
kubernetesEnabled bool
kubernetesNodeName string
kubernetesClientConfig kubernetes.ClientConfig
kubernetesKubeletPort uint
@@ -314,6 +315,7 @@ func setupFlags(flags *flags) {
flag.StringVar(&flags.probe.kubernetesClientConfig.Token, kubernetesTokenFlag, "", "Bearer token for authentication to the API server")
flag.StringVar(&flags.probe.kubernetesClientConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use")
flag.StringVar(&flags.probe.kubernetesClientConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server")
flag.StringVar(&flags.probe.kubernetesNodeName, "probe.kubernetes.node-name", "", "Name of this node, for filtering pods")
flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet")
// AWS ECS

View File

@@ -215,7 +215,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
if flags.kubernetesEnabled {
if client, err := kubernetes.NewClient(flags.kubernetesClientConfig); err == nil {
defer client.Stop()
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesKubeletPort)
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesNodeName, flags.kubernetesKubeletPort)
defer reporter.Stop()
p.AddReporter(reporter)
p.AddTagger(reporter)