Obtain local pods from kubelet

This commit is contained in:
Alfonso Acosta
2017-01-16 18:30:09 +00:00
parent 92b3cbe952
commit c6f7bdc78e
5 changed files with 3768 additions and 38 deletions

View File

@@ -0,0 +1,38 @@
package kubernetes
import (
"net/http"
"github.com/ugorji/go/codec"
)
// KubeletURL is just exported for testing
var KubeletURL = "http://localhost:10255"
// Intentionally not using the full kubernetes library DS
// to make parsing faster and more tolerant to schema changes
type podList struct {
Items []struct {
Metadata struct {
UID string `json:"uid"`
} `json:"metadata"`
} `json:"items"`
}
// GetLocalPodUIDs obtains the UID of the pods run locally (it's just exported for testing)
var GetLocalPodUIDs = func() (map[string]struct{}, error) {
resp, err := http.Get(KubeletURL + "/pods/")
if err != nil {
return nil, err
}
defer resp.Body.Close()
var localPods podList
if err := codec.NewDecoder(resp.Body, &codec.JsonHandle{}).Decode(&localPods); err != nil {
return nil, err
}
result := make(map[string]struct{}, len(localPods.Items))
for _, pod := range localPods.Items {
result[pod.Metadata.UID] = struct{}{}
}
return result, nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,73 @@
package kubernetes_test
import (
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"github.com/weaveworks/scope/probe/kubernetes"
)
const kubeletPodsJSONFile = "kubelet_pods.json"
// obtained with jq .items[].metadata.uid < kubelet_pods.json
var expectedPodUIDs = []string{
"af1b5325-d8cf-11e6-84fa-0800278a0c83",
"afeb8017-d8cf-11e6-84fa-0800278a0c83",
"b57a1ada-d8cf-11e6-84fa-0800278a0c83",
"00915ccb-dbd5-11e6-84fa-0800278a0c83",
"009a3d10-dbd5-11e6-84fa-0800278a0c83",
"fa8320c0-d8b0-11e6-828c-0800278a0c83",
"fa95ebae-d8b0-11e6-828c-0800278a0c83",
"af274a9e-d8cf-11e6-84fa-0800278a0c83",
"af628fd4-d8cf-11e6-84fa-0800278a0c83",
"af7f1d33-d8cf-11e6-84fa-0800278a0c83",
"b0104177-d8cf-11e6-84fa-0800278a0c83",
"b64d652b-d8cf-11e6-84fa-0800278a0c83",
"b687c058-d8cf-11e6-84fa-0800278a0c83",
"b0037e94-d8cf-11e6-84fa-0800278a0c83",
"b56298ba-d8cf-11e6-84fa-0800278a0c83",
"b613f0e9-d8cf-11e6-84fa-0800278a0c83",
"fa2eadd4-d8d1-11e6-84fa-0800278a0c83",
"af973584-d8cf-11e6-84fa-0800278a0c83",
"b079da0b-d8cf-11e6-84fa-0800278a0c83",
"b4e30a10-d8cf-11e6-84fa-0800278a0c83",
"b5025f49-d8cf-11e6-84fa-0800278a0c83",
"b52170e8-d8cf-11e6-84fa-0800278a0c83",
"b53e9d66-d8cf-11e6-84fa-0800278a0c83",
"b5c4c8c3-d8cf-11e6-84fa-0800278a0c83",
"014fb8f91f3d52450a942179a984bc15",
}
func TestGetLocalPodUIDs(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/pods/" {
t.Fatalf("unexpected path: %s", r.URL.Path)
}
b, err := ioutil.ReadFile(kubeletPodsJSONFile)
if err != nil {
t.Fatalf("unexpected error reading json file: %v", err)
}
w.Write(b)
},
))
defer server.Close()
var savedKubeletURL string
savedKubeletURL, kubernetes.KubeletURL = kubernetes.KubeletURL, server.URL
defer func() { kubernetes.KubeletURL = savedKubeletURL }()
uids, err := kubernetes.GetLocalPodUIDs()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(expectedPodUIDs) != len(uids) {
t.Errorf("nnexpected length in pod UIDs (%d): expected %d", len(uids), len(expectedPodUIDs))
}
for _, expectedUID := range expectedPodUIDs {
if _, ok := uids[expectedUID]; !ok {
t.Errorf("uid not found: %s", expectedUID)
}
}
}

View File

@@ -1,13 +1,11 @@
package kubernetes
import (
"io/ioutil"
"os"
"strings"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
log "github.com/Sirupsen/logrus"
"github.com/weaveworks/common/mtime"
"github.com/weaveworks/scope/probe"
"github.com/weaveworks/scope/probe/controls"
@@ -314,27 +312,6 @@ func (r *Reporter) replicaSetTopology(probeID string, deployments []Deployment)
return result, replicaSets, err
}
// GetNodeName return the k8s node name for the current machine.
// It is exported for testing.
var GetNodeName = func(r *Reporter) (string, error) {
uuidBytes, err := ioutil.ReadFile("/sys/class/dmi/id/product_uuid")
if os.IsNotExist(err) {
uuidBytes, err = ioutil.ReadFile("/sys/hypervisor/uuid")
}
if err != nil {
return "", err
}
uuid := strings.Trim(string(uuidBytes), "\n")
nodeName := ""
err = r.client.WalkNodes(func(node *api.Node) error {
if node.Status.NodeInfo.SystemUUID == string(uuid) {
nodeName = node.ObjectMeta.Name
}
return nil
})
return nodeName, err
}
type labelledChild interface {
Labels() map[string]string
AddParent(string, string)
@@ -387,13 +364,24 @@ func (r *Reporter) podTopology(services []Service, replicaSets []ReplicaSet) (re
))
}
thisNodeName, err := GetNodeName(r)
if err != nil {
return pods, err
// Obtain the local pods from kubelet since we only want to report those
// for performance reasons.
//
// In theory a simpler approach would be to obtain the current NodeName
// and filter local pods based on that. However that's hard since
// 1. reconstructing the NodeName requires cloud provider credentials
// 2. inferring the NodeName out of the hostname or system uuid is unreliable
// (uuids and hostnames can be duplicated across the cluster).
localPodUIDs, errUIDs := GetLocalPodUIDs()
if errUIDs != nil {
log.Warnf("Cannot obtain local pods, reporting all (which may impact performance): %v", errUIDs)
}
err = r.client.WalkPods(func(p Pod) error {
if p.NodeName() != thisNodeName {
return nil
err := r.client.WalkPods(func(p Pod) error {
// filter out non-local pods
if errUIDs != nil {
if _, ok := localPodUIDs[p.UID()]; !ok {
return nil
}
}
for _, selector := range selectors {
selector(p)

View File

@@ -180,10 +180,14 @@ func (c mockPipeClient) PipeClose(appID, id string) error {
}
func TestReporter(t *testing.T) {
oldGetNodeName := kubernetes.GetNodeName
defer func() { kubernetes.GetNodeName = oldGetNodeName }()
kubernetes.GetNodeName = func(*kubernetes.Reporter) (string, error) {
return nodeName, nil
oldGetNodeName := kubernetes.GetLocalPodUIDs
defer func() { kubernetes.GetLocalPodUIDs = oldGetNodeName }()
kubernetes.GetLocalPodUIDs = func() (map[string]struct{}, error) {
uids := map[string]struct{}{
pod1UID: {},
pod2UID: {},
}
return uids, nil
}
pod1ID := report.MakePodNodeID(pod1UID)
@@ -271,10 +275,10 @@ type callbackReadCloser struct {
func (c *callbackReadCloser) Close() error { return c.close() }
func TestReporterGetLogs(t *testing.T) {
oldGetNodeName := kubernetes.GetNodeName
defer func() { kubernetes.GetNodeName = oldGetNodeName }()
kubernetes.GetNodeName = func(*kubernetes.Reporter) (string, error) {
return nodeName, nil
oldGetNodeName := kubernetes.GetLocalPodUIDs
defer func() { kubernetes.GetLocalPodUIDs = oldGetNodeName }()
kubernetes.GetLocalPodUIDs = func() (map[string]struct{}, error) {
return map[string]struct{}{}, nil
}
client := newMockClient()