Merge pull request #2309 from weaveworks/2258-fix-kubelet-access

Fix kubelet failure fallback and make port configurable
This commit is contained in:
Alfonso Acosta
2017-03-08 10:15:21 -08:00
committed by GitHub
6 changed files with 38 additions and 35 deletions

View File

@@ -1,14 +1,12 @@
package kubernetes
import (
"fmt"
"net/http"
"github.com/ugorji/go/codec"
)
// KubeletURL is just exported for testing
var KubeletURL = "http://localhost:10255"
// Intentionally not using the full kubernetes library DS
// to make parsing faster and more tolerant to schema changes
type podList struct {
@@ -20,8 +18,9 @@ type podList struct {
}
// GetLocalPodUIDs obtains the UID of the pods run locally (it's just exported for testing)
var GetLocalPodUIDs = func() (map[string]struct{}, error) {
resp, err := http.Get(KubeletURL + "/pods/")
var GetLocalPodUIDs = func(kubeletHost string) (map[string]struct{}, error) {
url := fmt.Sprintf("http://%s/pods/", kubeletHost)
resp, err := http.Get(url)
if err != nil {
return nil, err
}

View File

@@ -4,6 +4,7 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/weaveworks/scope/probe/kubernetes"
@@ -30,11 +31,9 @@ func TestGetLocalPodUIDs(t *testing.T) {
},
))
defer server.Close()
var savedKubeletURL string
savedKubeletURL, kubernetes.KubeletURL = kubernetes.KubeletURL, server.URL
defer func() { kubernetes.KubeletURL = savedKubeletURL }()
uids, err := kubernetes.GetLocalPodUIDs()
serverURL, _ := url.Parse(server.URL)
uids, err := kubernetes.GetLocalPodUIDs(serverURL.Host)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@@ -1,6 +1,7 @@
package kubernetes
import (
"fmt"
"strings"
"k8s.io/kubernetes/pkg/labels"
@@ -98,10 +99,11 @@ type Reporter struct {
probe *probe.Probe
hostID string
handlerRegistry *controls.HandlerRegistry
kubeletPort uint
}
// NewReporter makes a new Reporter
func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry) *Reporter {
func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry, kubeletPort uint) *Reporter {
reporter := &Reporter{
client: client,
pipes: pipes,
@@ -109,6 +111,7 @@ func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostI
probe: probe,
hostID: hostID,
handlerRegistry: handlerRegistry,
kubeletPort: kubeletPort,
}
reporter.registerControls()
client.WatchPods(reporter.podEvent)
@@ -377,13 +380,13 @@ func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet) (re
// 1. reconstructing the NodeName requires cloud provider credentials
// 2. inferring the NodeName out of the hostname or system uuid is unreliable
// (uuids and hostnames can be duplicated across the cluster).
localPodUIDs, errUIDs := GetLocalPodUIDs()
localPodUIDs, errUIDs := GetLocalPodUIDs(fmt.Sprintf("localhost:%d", r.kubeletPort))
if errUIDs != nil {
log.Warnf("Cannot obtain local pods, reporting all (which may impact performance): %v", errUIDs)
}
err := r.client.WalkPods(func(p Pod) error {
// filter out non-local pods
if errUIDs != nil {
if errUIDs == nil {
if _, ok := localPodUIDs[p.UID()]; !ok {
return nil
}

View File

@@ -182,7 +182,7 @@ func (c mockPipeClient) PipeClose(appID, id string) error {
func TestReporter(t *testing.T) {
oldGetNodeName := kubernetes.GetLocalPodUIDs
defer func() { kubernetes.GetLocalPodUIDs = oldGetNodeName }()
kubernetes.GetLocalPodUIDs = func() (map[string]struct{}, error) {
kubernetes.GetLocalPodUIDs = func(string) (map[string]struct{}, error) {
uids := map[string]struct{}{
pod1UID: {},
pod2UID: {},
@@ -194,7 +194,7 @@ func TestReporter(t *testing.T) {
pod2ID := report.MakePodNodeID(pod2UID)
serviceID := report.MakeServiceNodeID(serviceUID)
hr := controls.NewDefaultHandlerRegistry()
rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "", "foo", nil, hr).Report()
rpt, _ := kubernetes.NewReporter(newMockClient(), nil, "", "foo", nil, hr, 0).Report()
// Reporter should have added the following pods
for _, pod := range []struct {
@@ -255,7 +255,7 @@ func TestTagger(t *testing.T) {
}))
hr := controls.NewDefaultHandlerRegistry()
rpt, err := kubernetes.NewReporter(newMockClient(), nil, "", "", nil, hr).Tag(rpt)
rpt, err := kubernetes.NewReporter(newMockClient(), nil, "", "", nil, hr, 0).Tag(rpt)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
@@ -277,14 +277,14 @@ func (c *callbackReadCloser) Close() error { return c.close() }
func TestReporterGetLogs(t *testing.T) {
oldGetNodeName := kubernetes.GetLocalPodUIDs
defer func() { kubernetes.GetLocalPodUIDs = oldGetNodeName }()
kubernetes.GetLocalPodUIDs = func() (map[string]struct{}, error) {
kubernetes.GetLocalPodUIDs = func(string) (map[string]struct{}, error) {
return map[string]struct{}{}, nil
}
client := newMockClient()
pipes := mockPipeClient{}
hr := controls.NewDefaultHandlerRegistry()
reporter := kubernetes.NewReporter(client, pipes, "", "", nil, hr)
reporter := kubernetes.NewReporter(client, pipes, "", "", nil, hr, 0)
// Should error on invalid IDs
{

View File

@@ -105,8 +105,9 @@ type probeFlags struct {
dockerInterval time.Duration
dockerBridge string
kubernetesEnabled bool
kubernetesConfig kubernetes.ClientConfig
kubernetesEnabled bool
kubernetesClientConfig kubernetes.ClientConfig
kubernetesKubeletPort uint
ecsEnabled bool
ecsCacheSize int
@@ -290,20 +291,21 @@ func main() {
// K8s
flag.BoolVar(&flags.probe.kubernetesEnabled, "probe.kubernetes", false, "collect kubernetes-related attributes for containers, should only be enabled on the master node")
flag.DurationVar(&flags.probe.kubernetesConfig.Interval, "probe.kubernetes.interval", 10*time.Second, "how often to do a full resync of the kubernetes data")
flag.StringVar(&flags.probe.kubernetesConfig.Server, "probe.kubernetes.api", "", "The address and port of the Kubernetes API server (deprecated in favor of equivalent probe.kubernetes.server)")
flag.StringVar(&flags.probe.kubernetesConfig.CertificateAuthority, "probe.kubernetes.certificate-authority", "", "Path to a cert. file for the certificate authority")
flag.StringVar(&flags.probe.kubernetesConfig.ClientCertificate, "probe.kubernetes.client-certificate", "", "Path to a client certificate file for TLS")
flag.StringVar(&flags.probe.kubernetesConfig.ClientKey, "probe.kubernetes.client-key", "", "Path to a client key file for TLS")
flag.StringVar(&flags.probe.kubernetesConfig.Cluster, "probe.kubernetes.cluster", "", "The name of the kubeconfig cluster to use")
flag.StringVar(&flags.probe.kubernetesConfig.Context, "probe.kubernetes.context", "", "The name of the kubeconfig context to use")
flag.BoolVar(&flags.probe.kubernetesConfig.Insecure, "probe.kubernetes.insecure-skip-tls-verify", false, "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure")
flag.StringVar(&flags.probe.kubernetesConfig.Kubeconfig, "probe.kubernetes.kubeconfig", "", "Path to the kubeconfig file to use")
flag.StringVar(&flags.probe.kubernetesConfig.Password, kubernetesPasswordFlag, "", "Password for basic authentication to the API server")
flag.StringVar(&flags.probe.kubernetesConfig.Server, "probe.kubernetes.server", "", "The address and port of the Kubernetes API server")
flag.StringVar(&flags.probe.kubernetesConfig.Token, kubernetesTokenFlag, "", "Bearer token for authentication to the API server")
flag.StringVar(&flags.probe.kubernetesConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use")
flag.StringVar(&flags.probe.kubernetesConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server")
flag.DurationVar(&flags.probe.kubernetesClientConfig.Interval, "probe.kubernetes.interval", 10*time.Second, "how often to do a full resync of the kubernetes data")
flag.StringVar(&flags.probe.kubernetesClientConfig.Server, "probe.kubernetes.api", "", "The address and port of the Kubernetes API server (deprecated in favor of equivalent probe.kubernetes.server)")
flag.StringVar(&flags.probe.kubernetesClientConfig.CertificateAuthority, "probe.kubernetes.certificate-authority", "", "Path to a cert. file for the certificate authority")
flag.StringVar(&flags.probe.kubernetesClientConfig.ClientCertificate, "probe.kubernetes.client-certificate", "", "Path to a client certificate file for TLS")
flag.StringVar(&flags.probe.kubernetesClientConfig.ClientKey, "probe.kubernetes.client-key", "", "Path to a client key file for TLS")
flag.StringVar(&flags.probe.kubernetesClientConfig.Cluster, "probe.kubernetes.cluster", "", "The name of the kubeconfig cluster to use")
flag.StringVar(&flags.probe.kubernetesClientConfig.Context, "probe.kubernetes.context", "", "The name of the kubeconfig context to use")
flag.BoolVar(&flags.probe.kubernetesClientConfig.Insecure, "probe.kubernetes.insecure-skip-tls-verify", false, "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure")
flag.StringVar(&flags.probe.kubernetesClientConfig.Kubeconfig, "probe.kubernetes.kubeconfig", "", "Path to the kubeconfig file to use")
flag.StringVar(&flags.probe.kubernetesClientConfig.Password, kubernetesPasswordFlag, "", "Password for basic authentication to the API server")
flag.StringVar(&flags.probe.kubernetesClientConfig.Server, "probe.kubernetes.server", "", "The address and port of the Kubernetes API server")
flag.StringVar(&flags.probe.kubernetesClientConfig.Token, kubernetesTokenFlag, "", "Bearer token for authentication to the API server")
flag.StringVar(&flags.probe.kubernetesClientConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use")
flag.StringVar(&flags.probe.kubernetesClientConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server")
flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet")
// AWS ECS
flag.BoolVar(&flags.probe.ecsEnabled, "probe.ecs", false, "Collect ecs-related attributes for containers on this node")

View File

@@ -217,9 +217,9 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
}
if flags.kubernetesEnabled {
if client, err := kubernetes.NewClient(flags.kubernetesConfig); err == nil {
if client, err := kubernetes.NewClient(flags.kubernetesClientConfig); err == nil {
defer client.Stop()
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry)
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesKubeletPort)
defer reporter.Stop()
p.AddReporter(reporter)
p.AddTagger(reporter)