From 20ffe37cea722f48039b899186be2752a6a5240f Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Tue, 24 Jan 2017 16:13:10 -0800 Subject: [PATCH] Add NPD endpoints: /debug/pprof, /healthz, /conditions. --- Makefile | 2 +- cmd/node_problem_detector.go | 30 +++++++++++++++++-- pkg/condition/manager.go | 38 ++++++++++++++++------- pkg/condition/manager_test.go | 12 ++++++++ pkg/problemdetector/problem_detector.go | 11 +++++++ pkg/util/http.go | 40 +++++++++++++++++++++++++ 6 files changed, 120 insertions(+), 13 deletions(-) create mode 100644 pkg/util/http.go diff --git a/Makefile b/Makefile index 8c8cf110..97bbc3e4 100644 --- a/Makefile +++ b/Makefile @@ -80,7 +80,7 @@ version: ./bin/node-problem-detector: $(PKG_SOURCES) CGO_ENABLED=$(CGO_ENABLED) GOOS=linux go build -o bin/node-problem-detector \ - -ldflags '-w -X $(PKG)/pkg/version.version=$(VERSION)' \ + -ldflags '-X $(PKG)/pkg/version.version=$(VERSION)' \ $(BUILD_TAGS) cmd/node_problem_detector.go Dockerfile: Dockerfile.in diff --git a/cmd/node_problem_detector.go b/cmd/node_problem_detector.go index ec0110e8..54323f08 100644 --- a/cmd/node_problem_detector.go +++ b/cmd/node_problem_detector.go @@ -18,9 +18,12 @@ package main import ( "flag" - "fmt" + "net" + "net/http" + _ "net/http/pprof" "net/url" "os" + "strconv" "github.com/golang/glog" @@ -35,6 +38,8 @@ var ( apiServerOverride = flag.String("apiserver-override", "", "Custom URI used to connect to Kubernetes ApiServer") printVersion = flag.Bool("version", false, "Print version information and quit") hostnameOverride = flag.String("hostname-override", "", "Custom node name used to override hostname") + serverPort = flag.Int("port", 10256, "The port to bind the node problem detector server. Use 0 to disable.") + serverAddress = flag.String("address", "127.0.0.1", "The address to bind the node problem detector server.") ) func validateCmdParams() { @@ -67,12 +72,27 @@ func getNodeNameOrDie() string { // environments. nodeName, err := os.Hostname() if err != nil { - panic(fmt.Sprintf("Failed to get host name: %v", err)) + glog.Fatalf("Failed to get host name: %v", err) } return nodeName } +func startHTTPServer(p problemdetector.ProblemDetector) { + // Add healthz http request handler. Always return ok now, add more health check + // logic in the future. + http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("ok")) + }) + // Add the http handlers in problem detector. + p.RegisterHTTPHandlers() + err := http.ListenAndServe(net.JoinHostPort(*serverAddress, strconv.Itoa(*serverPort)), nil) + if err != nil { + glog.Fatalf("Failed to start server: %v", err) + } +} + func main() { flag.Parse() validateCmdParams() @@ -86,6 +106,12 @@ func main() { k := kernelmonitor.NewKernelMonitorOrDie(*kernelMonitorConfigPath) p := problemdetector.NewProblemDetector(k, *apiServerOverride, nodeName) + + // Start http server. + if *serverPort > 0 { + startHTTPServer(p) + } + if err := p.Run(); err != nil { glog.Fatalf("Problem detector failed with error: %v", err) } diff --git a/pkg/condition/manager.go b/pkg/condition/manager.go index 523be3e3..a6f2f91e 100644 --- a/pkg/condition/manager.go +++ b/pkg/condition/manager.go @@ -54,19 +54,27 @@ type ConditionManager interface { Start() // UpdateCondition updates a specific condition. UpdateCondition(types.Condition) + // GetConditions returns all current conditions. + GetConditions() []types.Condition } type conditionManager struct { + // Only 2 fields will be accessed by more than one goroutines at the same time: + // * `updates`: updates will be written by random caller and the sync routine, + // so it needs to be protected by write lock in both `UpdateCondition` and + // `needUpdates`. + // * `conditions`: conditions will only be written in the sync routine, but + // it will be read by random caller and the sync routine. So it needs to be + // protected by write lock in `needUpdates` and read lock in `GetConditions`. + // No lock is needed in `sync`, because it is in the same goroutine with the + // write operation. + sync.RWMutex clock clock.Clock latestTry time.Time resyncNeeded bool client problemclient.Client - // updatesLock is the lock protecting updates. Only the field `updates` - // will be accessed by random caller and the sync routine, so only it - // needs to be protected. - updatesLock sync.Mutex - updates map[string]types.Condition - conditions map[string]types.Condition + updates map[string]types.Condition + conditions map[string]types.Condition } // NewConditionManager creates a condition manager. @@ -84,13 +92,23 @@ func (c *conditionManager) Start() { } func (c *conditionManager) UpdateCondition(condition types.Condition) { - c.updatesLock.Lock() - defer c.updatesLock.Unlock() + c.Lock() + defer c.Unlock() // New node condition will override the old condition, because we only need the newest // condition for each condition type. c.updates[condition.Type] = condition } +func (c *conditionManager) GetConditions() []types.Condition { + c.RLock() + defer c.RUnlock() + var conditions []types.Condition + for _, condition := range c.conditions { + conditions = append(conditions, condition) + } + return conditions +} + func (c *conditionManager) syncLoop() { updateCh := c.clock.Tick(updatePeriod) for { @@ -105,8 +123,8 @@ func (c *conditionManager) syncLoop() { // needUpdates checks whether there are recent updates. func (c *conditionManager) needUpdates() bool { - c.updatesLock.Lock() - defer c.updatesLock.Unlock() + c.Lock() + defer c.Unlock() needUpdate := false for t, update := range c.updates { if !reflect.DeepEqual(c.conditions[t], update) { diff --git a/pkg/condition/manager_test.go b/pkg/condition/manager_test.go index 1b565a8b..0468b61b 100644 --- a/pkg/condition/manager_test.go +++ b/pkg/condition/manager_test.go @@ -81,6 +81,18 @@ func TestNeedUpdates(t *testing.T) { } } +func TestGetConditions(t *testing.T) { + m, _, _ := newTestManager() + assert.Empty(t, m.GetConditions()) + testCondition1 := newTestCondition("TestCondition1") + testCondition2 := newTestCondition("TestCondition2") + m.UpdateCondition(testCondition1) + m.UpdateCondition(testCondition2) + assert.True(t, m.needUpdates()) + assert.Contains(t, m.GetConditions(), testCondition1) + assert.Contains(t, m.GetConditions(), testCondition2) +} + func TestResync(t *testing.T) { m, fakeClient, fakeClock := newTestManager() condition := newTestCondition("TestCondition") diff --git a/pkg/problemdetector/problem_detector.go b/pkg/problemdetector/problem_detector.go index 6d8a709e..e918cc37 100644 --- a/pkg/problemdetector/problem_detector.go +++ b/pkg/problemdetector/problem_detector.go @@ -17,6 +17,8 @@ limitations under the License. package problemdetector import ( + "net/http" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/util/clock" @@ -30,6 +32,7 @@ import ( // ProblemDetector collects statuses from all problem daemons and update the node condition and send node event. type ProblemDetector interface { Run() error + RegisterHTTPHandlers() } type problemDetector struct { @@ -70,3 +73,11 @@ func (p *problemDetector) Run() error { } } } + +// RegisterHTTPHandlers registers http handlers of node problem detector. +func (p *problemDetector) RegisterHTTPHandlers() { + // Add the handler to serve condition http request. + http.HandleFunc("/conditions", func(w http.ResponseWriter, r *http.Request) { + util.ReturnHTTPJson(w, p.conditionManager.GetConditions()) + }) +} diff --git a/pkg/util/http.go b/pkg/util/http.go new file mode 100644 index 00000000..1300be7e --- /dev/null +++ b/pkg/util/http.go @@ -0,0 +1,40 @@ +/* +Copyright 2017 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "encoding/json" + "net/http" +) + +// ReturnHTTPJson generates json http response. +func ReturnHTTPJson(w http.ResponseWriter, object interface{}) { + data, err := json.Marshal(object) + if err != nil { + ReturnHTTPError(w, err) + return + } + w.Header().Set("Content-type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(data) +} + +// ReturnHTTPError generates error http response. +func ReturnHTTPError(w http.ResponseWriter, err error) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) +}