From c1fa5bb665cb23fc6cde51ae8ad8009278fa9349 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 7 Mar 2017 13:29:02 +0000 Subject: [PATCH] Fix kubelet failure fallback and make port configurable --- probe/kubernetes/kubelet.go | 9 ++++----- probe/kubernetes/kubelet_test.go | 5 +---- probe/kubernetes/reporter.go | 9 ++++++--- prog/main.go | 34 +++++++++++++++++--------------- prog/probe.go | 4 ++-- 5 files changed, 31 insertions(+), 30 deletions(-) diff --git a/probe/kubernetes/kubelet.go b/probe/kubernetes/kubelet.go index 768b04d8c..74720056f 100644 --- a/probe/kubernetes/kubelet.go +++ b/probe/kubernetes/kubelet.go @@ -1,14 +1,12 @@ package kubernetes import ( + "fmt" "net/http" "github.com/ugorji/go/codec" ) -// KubeletURL is just exported for testing -var KubeletURL = "http://localhost:10255" - // Intentionally not using the full kubernetes library DS // to make parsing faster and more tolerant to schema changes type podList struct { @@ -20,8 +18,9 @@ type podList struct { } // GetLocalPodUIDs obtains the UID of the pods run locally (it's just exported for testing) -var GetLocalPodUIDs = func() (map[string]struct{}, error) { - resp, err := http.Get(KubeletURL + "/pods/") +var GetLocalPodUIDs = func(kubeletHost string) (map[string]struct{}, error) { + url := fmt.Sprintf("http://%s/pods/", kubeletHost) + resp, err := http.Get(url) if err != nil { return nil, err } diff --git a/probe/kubernetes/kubelet_test.go b/probe/kubernetes/kubelet_test.go index deefb8fb4..d29686ea6 100644 --- a/probe/kubernetes/kubelet_test.go +++ b/probe/kubernetes/kubelet_test.go @@ -30,11 +30,8 @@ func TestGetLocalPodUIDs(t *testing.T) { }, )) defer server.Close() - var savedKubeletURL string - savedKubeletURL, kubernetes.KubeletURL = kubernetes.KubeletURL, server.URL - defer func() { kubernetes.KubeletURL = savedKubeletURL }() - uids, err := kubernetes.GetLocalPodUIDs() + uids, err := kubernetes.GetLocalPodUIDs(server.URL.Host) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/probe/kubernetes/reporter.go b/probe/kubernetes/reporter.go index 6daaa76f5..7c7765f05 100644 --- a/probe/kubernetes/reporter.go +++ b/probe/kubernetes/reporter.go @@ -1,6 +1,7 @@ package kubernetes import ( + "fmt" "strings" "k8s.io/kubernetes/pkg/labels" @@ -98,10 +99,11 @@ type Reporter struct { probe *probe.Probe hostID string handlerRegistry *controls.HandlerRegistry + kubeletPort uint } // NewReporter makes a new Reporter -func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry) *Reporter { +func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry, kubeletPort uint) *Reporter { reporter := &Reporter{ client: client, pipes: pipes, @@ -109,6 +111,7 @@ func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostI probe: probe, hostID: hostID, handlerRegistry: handlerRegistry, + kubeletPort: kubeletPort, } reporter.registerControls() client.WatchPods(reporter.podEvent) @@ -377,13 +380,13 @@ func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet) (re // 1. reconstructing the NodeName requires cloud provider credentials // 2. inferring the NodeName out of the hostname or system uuid is unreliable // (uuids and hostnames can be duplicated across the cluster). - localPodUIDs, errUIDs := GetLocalPodUIDs() + localPodUIDs, errUIDs := GetLocalPodUIDs(fmt.Sprintf("localhost:%d", r.kubeletPort)) if errUIDs != nil { log.Warnf("Cannot obtain local pods, reporting all (which may impact performance): %v", errUIDs) } err := r.client.WalkPods(func(p Pod) error { // filter out non-local pods - if errUIDs != nil { + if errUIDs == nil { if _, ok := localPodUIDs[p.UID()]; !ok { return nil } diff --git a/prog/main.go b/prog/main.go index 1bbafee25..e1b55aa4b 100644 --- a/prog/main.go +++ b/prog/main.go @@ -103,8 +103,9 @@ type probeFlags struct { dockerInterval time.Duration dockerBridge string - kubernetesEnabled bool - kubernetesConfig kubernetes.ClientConfig + kubernetesEnabled bool + kubernetesClientConfig kubernetes.ClientConfig + kubernetesKubeletPort uint ecsEnabled bool ecsCacheSize int @@ -286,20 +287,21 @@ func main() { // K8s flag.BoolVar(&flags.probe.kubernetesEnabled, "probe.kubernetes", false, "collect kubernetes-related attributes for containers, should only be enabled on the master node") - flag.DurationVar(&flags.probe.kubernetesConfig.Interval, "probe.kubernetes.interval", 10*time.Second, "how often to do a full resync of the kubernetes data") - flag.StringVar(&flags.probe.kubernetesConfig.Server, "probe.kubernetes.api", "", "The address and port of the Kubernetes API server (deprecated in favor of equivalent probe.kubernetes.server)") - flag.StringVar(&flags.probe.kubernetesConfig.CertificateAuthority, "probe.kubernetes.certificate-authority", "", "Path to a cert. file for the certificate authority") - flag.StringVar(&flags.probe.kubernetesConfig.ClientCertificate, "probe.kubernetes.client-certificate", "", "Path to a client certificate file for TLS") - flag.StringVar(&flags.probe.kubernetesConfig.ClientKey, "probe.kubernetes.client-key", "", "Path to a client key file for TLS") - flag.StringVar(&flags.probe.kubernetesConfig.Cluster, "probe.kubernetes.cluster", "", "The name of the kubeconfig cluster to use") - flag.StringVar(&flags.probe.kubernetesConfig.Context, "probe.kubernetes.context", "", "The name of the kubeconfig context to use") - flag.BoolVar(&flags.probe.kubernetesConfig.Insecure, "probe.kubernetes.insecure-skip-tls-verify", false, "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure") - flag.StringVar(&flags.probe.kubernetesConfig.Kubeconfig, "probe.kubernetes.kubeconfig", "", "Path to the kubeconfig file to use") - flag.StringVar(&flags.probe.kubernetesConfig.Password, kubernetesPasswordFlag, "", "Password for basic authentication to the API server") - flag.StringVar(&flags.probe.kubernetesConfig.Server, "probe.kubernetes.server", "", "The address and port of the Kubernetes API server") - flag.StringVar(&flags.probe.kubernetesConfig.Token, kubernetesTokenFlag, "", "Bearer token for authentication to the API server") - flag.StringVar(&flags.probe.kubernetesConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use") - flag.StringVar(&flags.probe.kubernetesConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server") + flag.DurationVar(&flags.probe.kubernetesClientConfig.Interval, "probe.kubernetes.interval", 10*time.Second, "how often to do a full resync of the kubernetes data") + flag.StringVar(&flags.probe.kubernetesClientConfig.Server, "probe.kubernetes.api", "", "The address and port of the Kubernetes API server (deprecated in favor of equivalent probe.kubernetes.server)") + flag.StringVar(&flags.probe.kubernetesClientConfig.CertificateAuthority, "probe.kubernetes.certificate-authority", "", "Path to a cert. file for the certificate authority") + flag.StringVar(&flags.probe.kubernetesClientConfig.ClientCertificate, "probe.kubernetes.client-certificate", "", "Path to a client certificate file for TLS") + flag.StringVar(&flags.probe.kubernetesClientConfig.ClientKey, "probe.kubernetes.client-key", "", "Path to a client key file for TLS") + flag.StringVar(&flags.probe.kubernetesClientConfig.Cluster, "probe.kubernetes.cluster", "", "The name of the kubeconfig cluster to use") + flag.StringVar(&flags.probe.kubernetesClientConfig.Context, "probe.kubernetes.context", "", "The name of the kubeconfig context to use") + flag.BoolVar(&flags.probe.kubernetesClientConfig.Insecure, "probe.kubernetes.insecure-skip-tls-verify", false, "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure") + flag.StringVar(&flags.probe.kubernetesClientConfig.Kubeconfig, "probe.kubernetes.kubeconfig", "", "Path to the kubeconfig file to use") + flag.StringVar(&flags.probe.kubernetesClientConfig.Password, kubernetesPasswordFlag, "", "Password for basic authentication to the API server") + flag.StringVar(&flags.probe.kubernetesClientConfig.Server, "probe.kubernetes.server", "", "The address and port of the Kubernetes API server") + flag.StringVar(&flags.probe.kubernetesClientConfig.Token, kubernetesTokenFlag, "", "Bearer token for authentication to the API server") + flag.StringVar(&flags.probe.kubernetesClientConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use") + flag.StringVar(&flags.probe.kubernetesClientConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server") + flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet") // AWS ECS flag.BoolVar(&flags.probe.ecsEnabled, "probe.ecs", false, "Collect ecs-related attributes for containers on this node") diff --git a/prog/probe.go b/prog/probe.go index 50fee324d..1f1316bdf 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -207,9 +207,9 @@ func probeMain(flags probeFlags, targets []appclient.Target) { } if flags.kubernetesEnabled { - if client, err := kubernetes.NewClient(flags.kubernetesConfig); err == nil { + if client, err := kubernetes.NewClient(flags.kubernetesClientConfig); err == nil { defer client.Stop() - reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry) + reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesKubeletPort) defer reporter.Stop() p.AddReporter(reporter) p.AddTagger(reporter)