Move apiserver-reporting logic into k8s_exporter.

Added CLI option "enable-k8s-exporter" (default to true). Users can use
this option to enable/disable exporting to Kubernetes control plane.

This commit also removes all the apiserver-specific logic from package
problemdetector.

Future exporters (e.g. to local journald, Prometheus, other control
planes) should implement types.Exporter interface.
This commit is contained in:
Xuewei Zhang
2019-05-20 15:00:18 -07:00
parent df2bc3df22
commit 5814195ad5
10 changed files with 130 additions and 63 deletions

View File

@@ -17,43 +17,20 @@ limitations under the License.
package main
import (
"net"
"net/http"
_ "net/http/pprof"
"os"
"strconv"
"github.com/golang/glog"
"github.com/spf13/pflag"
"k8s.io/node-problem-detector/cmd/options"
"k8s.io/node-problem-detector/pkg/custompluginmonitor"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter"
"k8s.io/node-problem-detector/pkg/problemdetector"
"k8s.io/node-problem-detector/pkg/systemlogmonitor"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/version"
)
func startHTTPServer(p problemdetector.ProblemDetector, npdo *options.NodeProblemDetectorOptions) {
// Add healthz http request handler. Always return ok now, add more health check
// logic in the future.
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
// Add the http handlers in problem detector.
p.RegisterHTTPHandlers()
addr := net.JoinHostPort(npdo.ServerAddress, strconv.Itoa(npdo.ServerPort))
go func() {
err := http.ListenAndServe(addr, nil)
if err != nil {
glog.Fatalf("Failed to start server: %v", err)
}
}()
}
func main() {
npdo := options.NewNodeProblemDetectorOptions()
npdo.AddFlags(pflag.CommandLine)
@@ -87,14 +64,19 @@ func main() {
}
monitors[config] = custompluginmonitor.NewCustomPluginMonitorOrDie(config)
}
c := problemclient.NewClientOrDie(npdo)
p := problemdetector.NewProblemDetector(monitors, c)
// Start http server.
if npdo.ServerPort > 0 {
startHTTPServer(p, npdo)
// Initialize exporters.
exporters := []types.Exporter{}
if ke := k8sexporter.NewExporterOrDie(npdo); ke != nil {
exporters = append(exporters, ke)
glog.Info("K8s exporter started.")
}
if len(exporters) == 0 {
glog.Fatalf("No exporter is successfully setup")
}
// Initialize NPD core.
p := problemdetector.NewProblemDetector(monitors, exporters)
if err := p.Run(); err != nil {
glog.Fatalf("Problem detector failed with error: %v", err)
}

View File

@@ -36,8 +36,6 @@ type NodeProblemDetectorOptions struct {
// CustomPluginMonitorConfigPaths specifies the list of paths to custom plugin monitor configuration
// files.
CustomPluginMonitorConfigPaths []string
// ApiServerOverride is the custom URI used to connect to Kubernetes ApiServer.
ApiServerOverride string
// PrintVersion is the flag determining whether version information is printed.
PrintVersion bool
// HostnameOverride specifies custom node name used to override hostname.
@@ -47,6 +45,14 @@ type NodeProblemDetectorOptions struct {
// ServerAddress is the address to bind the node problem detector server.
ServerAddress string
// exporter options
// k8sExporter options
// EnableK8sExporter is the flag determining whether to report to Kubernetes.
EnableK8sExporter bool
// ApiServerOverride is the custom URI used to connect to Kubernetes ApiServer.
ApiServerOverride string
// application options
// NodeName is the node name used to communicate with Kubernetes ApiServer.
@@ -63,8 +69,9 @@ func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) {
[]string{}, "List of paths to system log monitor config files, comma separated.")
fs.StringSliceVar(&npdo.CustomPluginMonitorConfigPaths, "custom-plugin-monitors",
[]string{}, "List of paths to custom plugin monitor config files, comma separated.")
fs.BoolVar(&npdo.EnableK8sExporter, "enable-k8s-exporter", true, "Enables reporting to Kubernetes API server.")
fs.StringVar(&npdo.ApiServerOverride, "apiserver-override",
"", "Custom URI used to connect to Kubernetes ApiServer")
"", "Custom URI used to connect to Kubernetes ApiServer. This is ignored if --enable-k8s-exporter is false.")
fs.BoolVar(&npdo.PrintVersion, "version", false, "Print version information and quit")
fs.StringVar(&npdo.HostnameOverride, "hostname-override",
"", "Custom node name used to override hostname")
@@ -76,7 +83,7 @@ func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) {
// ValidOrDie validates node problem detector command line options.
func (npdo *NodeProblemDetectorOptions) ValidOrDie() {
if _, err := url.Parse(npdo.ApiServerOverride); err != nil {
if _, err := url.Parse(npdo.ApiServerOverride); npdo.EnableK8sExporter && err != nil {
panic(fmt.Sprintf("apiserver-override %q is not a valid HTTP URI: %v",
npdo.ApiServerOverride, err))
}

View File

@@ -21,7 +21,7 @@ import (
"sync"
"time"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter/problemclient"
"k8s.io/node-problem-detector/pkg/types"
problemutil "k8s.io/node-problem-detector/pkg/util"

View File

@@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter/problemclient"
"k8s.io/node-problem-detector/pkg/types"
problemutil "k8s.io/node-problem-detector/pkg/util"

View File

@@ -0,0 +1,93 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package k8sexporter
import (
"net"
"net/http"
_ "net/http/pprof"
"strconv"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/node-problem-detector/cmd/options"
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter/condition"
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter/problemclient"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util"
)
type k8sExporter struct {
client problemclient.Client
conditionManager condition.ConditionManager
}
// NewExporterOrDie creates a exporter for Kubernetes apiserver exporting, panics if error occurs.
func NewExporterOrDie(npdo *options.NodeProblemDetectorOptions) types.Exporter {
if !npdo.EnableK8sExporter {
return nil
}
c := problemclient.NewClientOrDie(npdo)
ke := k8sExporter{
client: c,
conditionManager: condition.NewConditionManager(c, clock.RealClock{}),
}
ke.startHTTPReporting(npdo)
ke.conditionManager.Start()
return &ke
}
func (ke *k8sExporter) ExportProblems(status *types.Status) {
for _, event := range status.Events {
ke.client.Eventf(util.ConvertToAPIEventType(event.Severity), status.Source, event.Reason, event.Message)
}
for _, cdt := range status.Conditions {
ke.conditionManager.UpdateCondition(cdt)
}
}
func (ke *k8sExporter) startHTTPReporting(npdo *options.NodeProblemDetectorOptions) {
if npdo.ServerPort <= 0 {
return
}
mux := http.NewServeMux()
// Add healthz http request handler. Always return ok now, add more health check
// logic in the future.
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
// Add the handler to serve condition http request.
mux.HandleFunc("/conditions", func(w http.ResponseWriter, r *http.Request) {
util.ReturnHTTPJson(w, ke.conditionManager.GetConditions())
})
addr := net.JoinHostPort(npdo.ServerAddress, strconv.Itoa(npdo.ServerPort))
go func() {
err := http.ListenAndServe(addr, mux)
if err != nil {
glog.Fatalf("Failed to start server: %v", err)
}
}()
}

View File

@@ -18,43 +18,33 @@ package problemdetector
import (
"fmt"
"net/http"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/node-problem-detector/pkg/condition"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util"
)
// ProblemDetector collects statuses from all problem daemons and update the node condition and send node event.
type ProblemDetector interface {
Run() error
RegisterHTTPHandlers()
}
type problemDetector struct {
client problemclient.Client
conditionManager condition.ConditionManager
monitors map[string]types.Monitor
monitors map[string]types.Monitor
exporters []types.Exporter
}
// NewProblemDetector creates the problem detector. Currently we just directly passed in the problem daemons, but
// in the future we may want to let the problem daemons register themselves.
func NewProblemDetector(monitors map[string]types.Monitor, client problemclient.Client) ProblemDetector {
func NewProblemDetector(monitors map[string]types.Monitor, exporters []types.Exporter) ProblemDetector {
return &problemDetector{
client: client,
conditionManager: condition.NewConditionManager(client, clock.RealClock{}),
monitors: monitors,
monitors: monitors,
exporters: exporters,
}
}
// Run starts the problem detector.
func (p *problemDetector) Run() error {
p.conditionManager.Start()
// Start the log monitors one by one.
var chans []<-chan *types.Status
for cfg, m := range p.monitors {
@@ -75,24 +65,13 @@ func (p *problemDetector) Run() error {
for {
select {
case status := <-ch:
for _, event := range status.Events {
p.client.Eventf(util.ConvertToAPIEventType(event.Severity), status.Source, event.Reason, event.Message)
}
for _, cdt := range status.Conditions {
p.conditionManager.UpdateCondition(cdt)
for _, exporter := range p.exporters {
exporter.ExportProblems(status)
}
}
}
}
// RegisterHTTPHandlers registers http handlers of node problem detector.
func (p *problemDetector) RegisterHTTPHandlers() {
// Add the handler to serve condition http request.
http.HandleFunc("/conditions", func(w http.ResponseWriter, r *http.Request) {
util.ReturnHTTPJson(w, p.conditionManager.GetConditions())
})
}
func groupChannel(chans []<-chan *types.Status) <-chan *types.Status {
statuses := make(chan *types.Status)
for _, ch := range chans {

View File

@@ -107,3 +107,9 @@ type Monitor interface {
// Stop stops the log monitor.
Stop()
}
// Exporter exports machine health data to certain control plane.
type Exporter interface {
// Export problems to the control plane.
ExportProblems(*Status)
}