Merge pull request #83 from Random-Liu/add-npd-endpoint

Add NPD endpoints: /debug/pprof, /healthz, /conditions.
This commit is contained in:
Lantao Liu
2017-02-03 20:41:11 -08:00
committed by GitHub
6 changed files with 120 additions and 13 deletions

View File

@@ -80,7 +80,7 @@ version:
./bin/node-problem-detector: $(PKG_SOURCES)
CGO_ENABLED=$(CGO_ENABLED) GOOS=linux go build -o bin/node-problem-detector \
-ldflags '-w -X $(PKG)/pkg/version.version=$(VERSION)' \
-ldflags '-X $(PKG)/pkg/version.version=$(VERSION)' \
$(BUILD_TAGS) cmd/node_problem_detector.go
Dockerfile: Dockerfile.in

View File

@@ -18,9 +18,12 @@ package main
import (
"flag"
"fmt"
"net"
"net/http"
_ "net/http/pprof"
"net/url"
"os"
"strconv"
"github.com/golang/glog"
@@ -35,6 +38,8 @@ var (
apiServerOverride = flag.String("apiserver-override", "", "Custom URI used to connect to Kubernetes ApiServer")
printVersion = flag.Bool("version", false, "Print version information and quit")
hostnameOverride = flag.String("hostname-override", "", "Custom node name used to override hostname")
serverPort = flag.Int("port", 10256, "The port to bind the node problem detector server. Use 0 to disable.")
serverAddress = flag.String("address", "127.0.0.1", "The address to bind the node problem detector server.")
)
func validateCmdParams() {
@@ -67,12 +72,27 @@ func getNodeNameOrDie() string {
// environments.
nodeName, err := os.Hostname()
if err != nil {
panic(fmt.Sprintf("Failed to get host name: %v", err))
glog.Fatalf("Failed to get host name: %v", err)
}
return nodeName
}
func startHTTPServer(p problemdetector.ProblemDetector) {
// Add healthz http request handler. Always return ok now, add more health check
// logic in the future.
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
// Add the http handlers in problem detector.
p.RegisterHTTPHandlers()
err := http.ListenAndServe(net.JoinHostPort(*serverAddress, strconv.Itoa(*serverPort)), nil)
if err != nil {
glog.Fatalf("Failed to start server: %v", err)
}
}
func main() {
flag.Parse()
validateCmdParams()
@@ -86,6 +106,12 @@ func main() {
k := kernelmonitor.NewKernelMonitorOrDie(*kernelMonitorConfigPath)
p := problemdetector.NewProblemDetector(k, *apiServerOverride, nodeName)
// Start http server.
if *serverPort > 0 {
startHTTPServer(p)
}
if err := p.Run(); err != nil {
glog.Fatalf("Problem detector failed with error: %v", err)
}

View File

@@ -54,19 +54,27 @@ type ConditionManager interface {
Start()
// UpdateCondition updates a specific condition.
UpdateCondition(types.Condition)
// GetConditions returns all current conditions.
GetConditions() []types.Condition
}
type conditionManager struct {
// Only 2 fields will be accessed by more than one goroutines at the same time:
// * `updates`: updates will be written by random caller and the sync routine,
// so it needs to be protected by write lock in both `UpdateCondition` and
// `needUpdates`.
// * `conditions`: conditions will only be written in the sync routine, but
// it will be read by random caller and the sync routine. So it needs to be
// protected by write lock in `needUpdates` and read lock in `GetConditions`.
// No lock is needed in `sync`, because it is in the same goroutine with the
// write operation.
sync.RWMutex
clock clock.Clock
latestTry time.Time
resyncNeeded bool
client problemclient.Client
// updatesLock is the lock protecting updates. Only the field `updates`
// will be accessed by random caller and the sync routine, so only it
// needs to be protected.
updatesLock sync.Mutex
updates map[string]types.Condition
conditions map[string]types.Condition
updates map[string]types.Condition
conditions map[string]types.Condition
}
// NewConditionManager creates a condition manager.
@@ -84,13 +92,23 @@ func (c *conditionManager) Start() {
}
func (c *conditionManager) UpdateCondition(condition types.Condition) {
c.updatesLock.Lock()
defer c.updatesLock.Unlock()
c.Lock()
defer c.Unlock()
// New node condition will override the old condition, because we only need the newest
// condition for each condition type.
c.updates[condition.Type] = condition
}
func (c *conditionManager) GetConditions() []types.Condition {
c.RLock()
defer c.RUnlock()
var conditions []types.Condition
for _, condition := range c.conditions {
conditions = append(conditions, condition)
}
return conditions
}
func (c *conditionManager) syncLoop() {
updateCh := c.clock.Tick(updatePeriod)
for {
@@ -105,8 +123,8 @@ func (c *conditionManager) syncLoop() {
// needUpdates checks whether there are recent updates.
func (c *conditionManager) needUpdates() bool {
c.updatesLock.Lock()
defer c.updatesLock.Unlock()
c.Lock()
defer c.Unlock()
needUpdate := false
for t, update := range c.updates {
if !reflect.DeepEqual(c.conditions[t], update) {

View File

@@ -81,6 +81,18 @@ func TestNeedUpdates(t *testing.T) {
}
}
func TestGetConditions(t *testing.T) {
m, _, _ := newTestManager()
assert.Empty(t, m.GetConditions())
testCondition1 := newTestCondition("TestCondition1")
testCondition2 := newTestCondition("TestCondition2")
m.UpdateCondition(testCondition1)
m.UpdateCondition(testCondition2)
assert.True(t, m.needUpdates())
assert.Contains(t, m.GetConditions(), testCondition1)
assert.Contains(t, m.GetConditions(), testCondition2)
}
func TestResync(t *testing.T) {
m, fakeClient, fakeClock := newTestManager()
condition := newTestCondition("TestCondition")

View File

@@ -17,6 +17,8 @@ limitations under the License.
package problemdetector
import (
"net/http"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/clock"
@@ -30,6 +32,7 @@ import (
// ProblemDetector collects statuses from all problem daemons and update the node condition and send node event.
type ProblemDetector interface {
Run() error
RegisterHTTPHandlers()
}
type problemDetector struct {
@@ -70,3 +73,11 @@ func (p *problemDetector) Run() error {
}
}
}
// RegisterHTTPHandlers registers http handlers of node problem detector.
func (p *problemDetector) RegisterHTTPHandlers() {
// Add the handler to serve condition http request.
http.HandleFunc("/conditions", func(w http.ResponseWriter, r *http.Request) {
util.ReturnHTTPJson(w, p.conditionManager.GetConditions())
})
}

40
pkg/util/http.go Normal file
View File

@@ -0,0 +1,40 @@
/*
Copyright 2017 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"net/http"
)
// ReturnHTTPJson generates json http response.
func ReturnHTTPJson(w http.ResponseWriter, object interface{}) {
data, err := json.Marshal(object)
if err != nil {
ReturnHTTPError(w, err)
return
}
w.Header().Set("Content-type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(data)
}
// ReturnHTTPError generates error http response.
func ReturnHTTPError(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
}