mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-03 02:00:43 +00:00
Fix kubelet failure fallback and make port configurable
This commit is contained in:
@@ -1,14 +1,12 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
// KubeletURL is just exported for testing
|
||||
var KubeletURL = "http://localhost:10255"
|
||||
|
||||
// Intentionally not using the full kubernetes library DS
|
||||
// to make parsing faster and more tolerant to schema changes
|
||||
type podList struct {
|
||||
@@ -20,8 +18,9 @@ type podList struct {
|
||||
}
|
||||
|
||||
// GetLocalPodUIDs obtains the UID of the pods run locally (it's just exported for testing)
|
||||
var GetLocalPodUIDs = func() (map[string]struct{}, error) {
|
||||
resp, err := http.Get(KubeletURL + "/pods/")
|
||||
var GetLocalPodUIDs = func(kubeletHost string) (map[string]struct{}, error) {
|
||||
url := fmt.Sprintf("http://%s/pods/", kubeletHost)
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -30,11 +30,8 @@ func TestGetLocalPodUIDs(t *testing.T) {
|
||||
},
|
||||
))
|
||||
defer server.Close()
|
||||
var savedKubeletURL string
|
||||
savedKubeletURL, kubernetes.KubeletURL = kubernetes.KubeletURL, server.URL
|
||||
defer func() { kubernetes.KubeletURL = savedKubeletURL }()
|
||||
|
||||
uids, err := kubernetes.GetLocalPodUIDs()
|
||||
uids, err := kubernetes.GetLocalPodUIDs(server.URL.Host)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
@@ -98,10 +99,11 @@ type Reporter struct {
|
||||
probe *probe.Probe
|
||||
hostID string
|
||||
handlerRegistry *controls.HandlerRegistry
|
||||
kubeletPort uint
|
||||
}
|
||||
|
||||
// NewReporter makes a new Reporter
|
||||
func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry) *Reporter {
|
||||
func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostID string, probe *probe.Probe, handlerRegistry *controls.HandlerRegistry, kubeletPort uint) *Reporter {
|
||||
reporter := &Reporter{
|
||||
client: client,
|
||||
pipes: pipes,
|
||||
@@ -109,6 +111,7 @@ func NewReporter(client Client, pipes controls.PipeClient, probeID string, hostI
|
||||
probe: probe,
|
||||
hostID: hostID,
|
||||
handlerRegistry: handlerRegistry,
|
||||
kubeletPort: kubeletPort,
|
||||
}
|
||||
reporter.registerControls()
|
||||
client.WatchPods(reporter.podEvent)
|
||||
@@ -377,13 +380,13 @@ func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet) (re
|
||||
// 1. reconstructing the NodeName requires cloud provider credentials
|
||||
// 2. inferring the NodeName out of the hostname or system uuid is unreliable
|
||||
// (uuids and hostnames can be duplicated across the cluster).
|
||||
localPodUIDs, errUIDs := GetLocalPodUIDs()
|
||||
localPodUIDs, errUIDs := GetLocalPodUIDs(fmt.Sprintf("localhost:%d", r.kubeletPort))
|
||||
if errUIDs != nil {
|
||||
log.Warnf("Cannot obtain local pods, reporting all (which may impact performance): %v", errUIDs)
|
||||
}
|
||||
err := r.client.WalkPods(func(p Pod) error {
|
||||
// filter out non-local pods
|
||||
if errUIDs != nil {
|
||||
if errUIDs == nil {
|
||||
if _, ok := localPodUIDs[p.UID()]; !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
34
prog/main.go
34
prog/main.go
@@ -103,8 +103,9 @@ type probeFlags struct {
|
||||
dockerInterval time.Duration
|
||||
dockerBridge string
|
||||
|
||||
kubernetesEnabled bool
|
||||
kubernetesConfig kubernetes.ClientConfig
|
||||
kubernetesEnabled bool
|
||||
kubernetesClientConfig kubernetes.ClientConfig
|
||||
kubernetesKubeletPort uint
|
||||
|
||||
ecsEnabled bool
|
||||
ecsCacheSize int
|
||||
@@ -286,20 +287,21 @@ func main() {
|
||||
|
||||
// K8s
|
||||
flag.BoolVar(&flags.probe.kubernetesEnabled, "probe.kubernetes", false, "collect kubernetes-related attributes for containers, should only be enabled on the master node")
|
||||
flag.DurationVar(&flags.probe.kubernetesConfig.Interval, "probe.kubernetes.interval", 10*time.Second, "how often to do a full resync of the kubernetes data")
|
||||
flag.StringVar(&flags.probe.kubernetesConfig.Server, "probe.kubernetes.api", "", "The address and port of the Kubernetes API server (deprecated in favor of equivalent probe.kubernetes.server)")
|
||||
flag.StringVar(&flags.probe.kubernetesConfig.CertificateAuthority, "probe.kubernetes.certificate-authority", "", "Path to a cert. file for the certificate authority")
|
||||
flag.StringVar(&flags.probe.kubernetesConfig.ClientCertificate, "probe.kubernetes.client-certificate", "", "Path to a client certificate file for TLS")
|
||||
flag.StringVar(&flags.probe.kubernetesConfig.ClientKey, "probe.kubernetes.client-key", "", "Path to a client key file for TLS")
|
||||
flag.StringVar(&flags.probe.kubernetesConfig.Cluster, "probe.kubernetes.cluster", "", "The name of the kubeconfig cluster to use")
|
||||
flag.StringVar(&flags.probe.kubernetesConfig.Context, "probe.kubernetes.context", "", "The name of the kubeconfig context to use")
|
||||
flag.BoolVar(&flags.probe.kubernetesConfig.Insecure, "probe.kubernetes.insecure-skip-tls-verify", false, "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure")
|
||||
flag.StringVar(&flags.probe.kubernetesConfig.Kubeconfig, "probe.kubernetes.kubeconfig", "", "Path to the kubeconfig file to use")
|
||||
flag.StringVar(&flags.probe.kubernetesConfig.Password, kubernetesPasswordFlag, "", "Password for basic authentication to the API server")
|
||||
flag.StringVar(&flags.probe.kubernetesConfig.Server, "probe.kubernetes.server", "", "The address and port of the Kubernetes API server")
|
||||
flag.StringVar(&flags.probe.kubernetesConfig.Token, kubernetesTokenFlag, "", "Bearer token for authentication to the API server")
|
||||
flag.StringVar(&flags.probe.kubernetesConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use")
|
||||
flag.StringVar(&flags.probe.kubernetesConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server")
|
||||
flag.DurationVar(&flags.probe.kubernetesClientConfig.Interval, "probe.kubernetes.interval", 10*time.Second, "how often to do a full resync of the kubernetes data")
|
||||
flag.StringVar(&flags.probe.kubernetesClientConfig.Server, "probe.kubernetes.api", "", "The address and port of the Kubernetes API server (deprecated in favor of equivalent probe.kubernetes.server)")
|
||||
flag.StringVar(&flags.probe.kubernetesClientConfig.CertificateAuthority, "probe.kubernetes.certificate-authority", "", "Path to a cert. file for the certificate authority")
|
||||
flag.StringVar(&flags.probe.kubernetesClientConfig.ClientCertificate, "probe.kubernetes.client-certificate", "", "Path to a client certificate file for TLS")
|
||||
flag.StringVar(&flags.probe.kubernetesClientConfig.ClientKey, "probe.kubernetes.client-key", "", "Path to a client key file for TLS")
|
||||
flag.StringVar(&flags.probe.kubernetesClientConfig.Cluster, "probe.kubernetes.cluster", "", "The name of the kubeconfig cluster to use")
|
||||
flag.StringVar(&flags.probe.kubernetesClientConfig.Context, "probe.kubernetes.context", "", "The name of the kubeconfig context to use")
|
||||
flag.BoolVar(&flags.probe.kubernetesClientConfig.Insecure, "probe.kubernetes.insecure-skip-tls-verify", false, "If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure")
|
||||
flag.StringVar(&flags.probe.kubernetesClientConfig.Kubeconfig, "probe.kubernetes.kubeconfig", "", "Path to the kubeconfig file to use")
|
||||
flag.StringVar(&flags.probe.kubernetesClientConfig.Password, kubernetesPasswordFlag, "", "Password for basic authentication to the API server")
|
||||
flag.StringVar(&flags.probe.kubernetesClientConfig.Server, "probe.kubernetes.server", "", "The address and port of the Kubernetes API server")
|
||||
flag.StringVar(&flags.probe.kubernetesClientConfig.Token, kubernetesTokenFlag, "", "Bearer token for authentication to the API server")
|
||||
flag.StringVar(&flags.probe.kubernetesClientConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use")
|
||||
flag.StringVar(&flags.probe.kubernetesClientConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server")
|
||||
flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet")
|
||||
|
||||
// AWS ECS
|
||||
flag.BoolVar(&flags.probe.ecsEnabled, "probe.ecs", false, "Collect ecs-related attributes for containers on this node")
|
||||
|
||||
@@ -207,9 +207,9 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
|
||||
}
|
||||
|
||||
if flags.kubernetesEnabled {
|
||||
if client, err := kubernetes.NewClient(flags.kubernetesConfig); err == nil {
|
||||
if client, err := kubernetes.NewClient(flags.kubernetesClientConfig); err == nil {
|
||||
defer client.Stop()
|
||||
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry)
|
||||
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesKubeletPort)
|
||||
defer reporter.Stop()
|
||||
p.AddReporter(reporter)
|
||||
p.AddTagger(reporter)
|
||||
|
||||
Reference in New Issue
Block a user