Merge pull request #275 from xueweiz/exp

node-problem-detector: report disk queue length in Prometheus format
This commit is contained in:
Kubernetes Prow Robot
2019-06-13 15:24:14 -07:00
committed by GitHub
29 changed files with 1486 additions and 101 deletions

View File

@@ -59,6 +59,12 @@ List of supported problem daemons:
| [KernelMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json) | KernelDeadlock | A system log monitor monitors kernel log and reports problem according to predefined rules. |
| [AbrtAdaptor](https://github.com/kubernetes/node-problem-detector/blob/master/config/abrt-adaptor.json) | None | Monitor ABRT log messages and report them further. ABRT (Automatic Bug Report Tool) is health monitoring daemon able to catch kernel problems as well as application crashes of various kinds occurred on the host. For more information visit the [link](https://github.com/abrt). |
| [CustomPluginMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/custom-plugin-monitor.json) | On-demand(According to users configuration) | A custom plugin monitor for node-problem-detector to invoke and check various node problems with user defined check scripts. See proposal [here](https://docs.google.com/document/d/1jK_5YloSYtboj-DtfjmYKxfNnUxCAvohLnsH5aGCAYQ/edit#). |
| [SystemStatsMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/system-stats-monitor.json) | None(Could be added in the future) | A system stats monitor for node-problem-detector to collect various health-related system stats as metrics. See proposal [here](https://docs.google.com/document/d/1SeaUz6kBavI283Dq8GBpoEUDrHA2a795xtw0OvjM568/edit). |
# Exporter
An exporter is a component of node-problem-detector. It reports node problems and/or metrics to
certain back end (e.g. Kubernetes API server, or Prometheus scrape endpoint).
# Usage
@@ -67,16 +73,21 @@ List of supported problem daemons:
* `--version`: Print current version of node-problem-detector.
* `--address`: The address to bind the node problem detector server.
* `--port`: The port to bind the node problem detector server. Use 0 to disable.
* `--system-log-monitors`: List of paths to system log monitor configuration files, comma separated, e.g.
* `--config.system-log-monitor`: List of paths to system log monitor configuration files, comma separated, e.g.
[config/kernel-monitor.json](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json).
Node problem detector will start a separate log monitor for each configuration. You can
use different log monitors to monitor different system log.
* `--custom-plugin-monitors`: List of paths to custom plugin monitor config files, comma separated, e.g.
* `--config.custom-plugin-monitor`: List of paths to custom plugin monitor config files, comma separated, e.g.
[config/custom-plugin-monitor.json](https://github.com/kubernetes/node-problem-detector/blob/master/config/custom-plugin-monitor.json).
Node problem detector will start a separate custom plugin monitor for each configuration. You can
use different custom plugin monitors to monitor different node problems.
* `--config.system-stats-monitor`: List of paths to system stats monitor config files, comma separated, e.g.
[config/system-stats-monitor.json](https://github.com/kubernetes/node-problem-detector/blob/master/config/system-stats-monitor.json).
Node problem detector will start a separate system stats monitor for each configuration. You can
use different system stats monitors to monitor different problem-related system stats.
* `--enable-k8s-exporter`: Enables reporting to Kubernetes API server, default to `true`.
* `--apiserver-override`: A URI parameter used to customize how node-problem-detector
connects the apiserver. The format is same as the
connects the apiserver. This is ignored if `--enable-k8s-exporter` is `false`. The format is same as the
[`source`](https://github.com/kubernetes/heapster/blob/master/docs/source-configuration.md#kubernetes)
flag of [Heapster](https://github.com/kubernetes/heapster).
For example, to run without auth, use the following config:
@@ -85,6 +96,14 @@ For example, to run without auth, use the following config:
```
Refer [heapster docs](https://github.com/kubernetes/heapster/blob/master/docs/source-configuration.md#kubernetes) for a complete list of available options.
* `--hostname-override`: A customized node name used for node-problem-detector to update conditions and emit events. node-problem-detector gets node name first from `hostname-override`, then `NODE_NAME` environment variable and finally fall back to `os.Hostname`.
* `--prometheus-address`: The address to bind the Prometheus scrape endpoint, default to `127.0.0.1`.
* `--prometheus-port`: The port to bind the Prometheus scrape endpoint, default to 20257. Use 0 to disable.
### Deprecated Flags
* `--system-log-monitors`: List of paths to system log monitor config files, comma separated. This option is deprecated, replaced by `--config.system-log-monitor`, and will be removed. NPD will panic if both `--system-log-monitors` and `--config.system-log-monitor` are set.
* `--custom-plugin-monitors`: List of paths to custom plugin monitor config files, comma separated. This option is deprecated, replaced by `--config.custom-plugin-monitor`, and will be removed. NPD will panic if both `--custom-plugin-monitors` and `--config.custom-plugin-monitor` are set.
## Build Image
@@ -149,12 +168,13 @@ For example, to test [KernelMonitor](https://github.com/kubernetes/node-problem-
1. ```make``` (build node-problem-detector locally)
2. ```kubectl proxy --port=8080``` (make a running cluster's API server available locally)
3. Update [KernelMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json)'s ```logPath``` to your local kernel log directory. For example, on some Linux systems, it is ```/run/log/journal``` instead of ```/var/log/journal```.
3. ```./bin/node-problem-detector --logtostderr --apiserver-override=http://127.0.0.1:8080?inClusterConfig=false --system-log-monitors=config/kernel-monitor.json --port=20256``` (or point to any API server address:port)
3. ```./bin/node-problem-detector --logtostderr --apiserver-override=http://127.0.0.1:8080?inClusterConfig=false --config.system-log-monitor=config/kernel-monitor.json --config.system-stats-monitor=config/system-stats-monitor.json --port=20256 --prometheus-port=20257``` (or point to any API server address:port and Prometheus port)
4. ```sudo sh -c "echo 'kernel: BUG: unable to handle kernel NULL pointer dereference at TESTING' >> /dev/kmsg"```
5. You can see ```KernelOops``` event in the node-problem-detector log.
6. ```sudo sh -c "echo 'kernel: INFO: task docker:20744 blocked for more than 120 seconds.' >> /dev/kmsg"```
7. You can see ```DockerHung``` event and condition in the node-problem-detector log.
8. You can see ```DockerHung``` condition at [http://127.0.0.1:20256/conditions](http://127.0.0.1:20256/conditions).
9. You can see disk related system metrics in Prometheus format at [http://127.0.0.1:20257/metrics](http://127.0.0.1:20257/metrics).
**Note**:
- You can see more rule examples under [test/kernel_log_generator/problems](https://github.com/kubernetes/node-problem-detector/tree/master/test/kernel_log_generator/problems).

View File

@@ -17,43 +17,20 @@ limitations under the License.
package main
import (
"net"
"net/http"
_ "net/http/pprof"
"os"
"strconv"
"github.com/golang/glog"
"github.com/spf13/pflag"
"k8s.io/node-problem-detector/cmd/options"
"k8s.io/node-problem-detector/pkg/custompluginmonitor"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter"
"k8s.io/node-problem-detector/pkg/exporters/prometheusexporter"
"k8s.io/node-problem-detector/pkg/problemdaemon"
"k8s.io/node-problem-detector/pkg/problemdetector"
"k8s.io/node-problem-detector/pkg/systemlogmonitor"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/version"
)
func startHTTPServer(p problemdetector.ProblemDetector, npdo *options.NodeProblemDetectorOptions) {
// Add healthz http request handler. Always return ok now, add more health check
// logic in the future.
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
// Add the http handlers in problem detector.
p.RegisterHTTPHandlers()
addr := net.JoinHostPort(npdo.ServerAddress, strconv.Itoa(npdo.ServerPort))
go func() {
err := http.ListenAndServe(addr, nil)
if err != nil {
glog.Fatalf("Failed to start server: %v", err)
}
}()
}
func main() {
npdo := options.NewNodeProblemDetectorOptions()
npdo.AddFlags(pflag.CommandLine)
@@ -66,35 +43,31 @@ func main() {
}
npdo.SetNodeNameOrDie()
npdo.SetConfigFromDeprecatedOptionsOrDie()
npdo.ValidOrDie()
monitors := make(map[string]types.Monitor)
for _, config := range npdo.SystemLogMonitorConfigPaths {
if _, ok := monitors[config]; ok {
// Skip the config if it's duplicated.
glog.Warningf("Duplicated monitor configuration %q", config)
continue
}
monitors[config] = systemlogmonitor.NewLogMonitorOrDie(config)
// Initialize problem daemons.
problemDaemons := problemdaemon.NewProblemDaemons(npdo.MonitorConfigPaths)
if len(problemDaemons) == 0 {
glog.Fatalf("No problem daemon is configured")
}
for _, config := range npdo.CustomPluginMonitorConfigPaths {
if _, ok := monitors[config]; ok {
// Skip the config if it's duplicated.
glog.Warningf("Duplicated monitor configuration %q", config)
continue
}
monitors[config] = custompluginmonitor.NewCustomPluginMonitorOrDie(config)
// Initialize exporters.
exporters := []types.Exporter{}
if ke := k8sexporter.NewExporterOrDie(npdo); ke != nil {
exporters = append(exporters, ke)
glog.Info("K8s exporter started.")
}
c := problemclient.NewClientOrDie(npdo)
p := problemdetector.NewProblemDetector(monitors, c)
// Start http server.
if npdo.ServerPort > 0 {
startHTTPServer(p, npdo)
if pe := prometheusexporter.NewExporterOrDie(npdo); pe != nil {
exporters = append(exporters, pe)
glog.Info("Prometheus exporter started.")
}
if len(exporters) == 0 {
glog.Fatalf("No exporter is successfully setup")
}
// Initialize NPD core.
p := problemdetector.NewProblemDetector(problemDaemons, exporters)
if err := p.Run(); err != nil {
glog.Fatalf("Problem detector failed with error: %v", err)
}

View File

@@ -24,20 +24,17 @@ import (
"net/url"
"github.com/spf13/pflag"
"k8s.io/node-problem-detector/pkg/custompluginmonitor"
"k8s.io/node-problem-detector/pkg/problemdaemon"
"k8s.io/node-problem-detector/pkg/systemlogmonitor"
"k8s.io/node-problem-detector/pkg/types"
)
// NodeProblemDetectorOptions contains node problem detector command line and application options.
type NodeProblemDetectorOptions struct {
// command line options
// SystemLogMonitorConfigPaths specifies the list of paths to system log monitor configuration
// files.
SystemLogMonitorConfigPaths []string
// CustomPluginMonitorConfigPaths specifies the list of paths to custom plugin monitor configuration
// files.
CustomPluginMonitorConfigPaths []string
// ApiServerOverride is the custom URI used to connect to Kubernetes ApiServer.
ApiServerOverride string
// PrintVersion is the flag determining whether version information is printed.
PrintVersion bool
// HostnameOverride specifies custom node name used to override hostname.
@@ -47,6 +44,35 @@ type NodeProblemDetectorOptions struct {
// ServerAddress is the address to bind the node problem detector server.
ServerAddress string
// exporter options
// k8sExporter options
// EnableK8sExporter is the flag determining whether to report to Kubernetes.
EnableK8sExporter bool
// ApiServerOverride is the custom URI used to connect to Kubernetes ApiServer.
ApiServerOverride string
// prometheusExporter options
// PrometheusServerPort is the port to bind the Prometheus scrape endpoint. Use 0 to disable.
PrometheusServerPort int
// PrometheusServerAddress is the address to bind the Prometheus scrape endpoint.
PrometheusServerAddress string
// problem daemon options
// SystemLogMonitorConfigPaths specifies the list of paths to system log monitor configuration
// files.
// SystemLogMonitorConfigPaths is used by the deprecated option --system-log-monitors. The new
// option --config.system-log-monitor will stored the config file paths in MonitorConfigPaths.
SystemLogMonitorConfigPaths []string
// CustomPluginMonitorConfigPaths specifies the list of paths to custom plugin monitor configuration
// files.
// CustomPluginMonitorConfigPaths is used by the deprecated option --custom-plugin-monitors. The
// new option --config.custom-plugin-monitor will stored the config file paths in MonitorConfigPaths.
CustomPluginMonitorConfigPaths []string
// MonitorConfigPaths specifies the list of paths to configuration files for each monitor.
MonitorConfigPaths types.ProblemDaemonConfigPathMap
// application options
// NodeName is the node name used to communicate with Kubernetes ApiServer.
@@ -54,17 +80,20 @@ type NodeProblemDetectorOptions struct {
}
func NewNodeProblemDetectorOptions() *NodeProblemDetectorOptions {
return &NodeProblemDetectorOptions{}
return &NodeProblemDetectorOptions{MonitorConfigPaths: types.ProblemDaemonConfigPathMap{}}
}
// AddFlags adds node problem detector command line options to pflag.
func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&npdo.SystemLogMonitorConfigPaths, "system-log-monitors",
[]string{}, "List of paths to system log monitor config files, comma separated.")
fs.MarkDeprecated("system-log-monitors", "replaced by --config.system-log-monitor. NPD will panic if both --system-log-monitors and --config.system-log-monitor are set.")
fs.StringSliceVar(&npdo.CustomPluginMonitorConfigPaths, "custom-plugin-monitors",
[]string{}, "List of paths to custom plugin monitor config files, comma separated.")
fs.MarkDeprecated("custom-plugin-monitors", "replaced by --config.custom-plugin-monitor. NPD will panic if both --custom-plugin-monitors and --config.custom-plugin-monitor are set.")
fs.BoolVar(&npdo.EnableK8sExporter, "enable-k8s-exporter", true, "Enables reporting to Kubernetes API server.")
fs.StringVar(&npdo.ApiServerOverride, "apiserver-override",
"", "Custom URI used to connect to Kubernetes ApiServer")
"", "Custom URI used to connect to Kubernetes ApiServer. This is ignored if --enable-k8s-exporter is false.")
fs.BoolVar(&npdo.PrintVersion, "version", false, "Print version information and quit")
fs.StringVar(&npdo.HostnameOverride, "hostname-override",
"", "Custom node name used to override hostname")
@@ -72,16 +101,77 @@ func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) {
20256, "The port to bind the node problem detector server. Use 0 to disable.")
fs.StringVar(&npdo.ServerAddress, "address",
"127.0.0.1", "The address to bind the node problem detector server.")
fs.IntVar(&npdo.PrometheusServerPort, "prometheus-port",
20257, "The port to bind the Prometheus scrape endpoint. Prometheus exporter is enabled by default at port 20257. Use 0 to disable.")
fs.StringVar(&npdo.PrometheusServerAddress, "prometheus-address",
"127.0.0.1", "The address to bind the Prometheus scrape endpoint.")
for _, problemDaemonName := range problemdaemon.GetProblemDaemonNames() {
npdo.MonitorConfigPaths[problemDaemonName] = &[]string{}
fs.StringSliceVar(
npdo.MonitorConfigPaths[problemDaemonName],
"config."+string(problemDaemonName),
[]string{},
fmt.Sprintf("Comma separated configurations for %v monitor. %v",
problemDaemonName,
problemdaemon.GetProblemDaemonHandlerOrDie(problemDaemonName).CmdOptionDescription))
}
}
// ValidOrDie validates node problem detector command line options.
func (npdo *NodeProblemDetectorOptions) ValidOrDie() {
if _, err := url.Parse(npdo.ApiServerOverride); err != nil {
if _, err := url.Parse(npdo.ApiServerOverride); npdo.EnableK8sExporter && err != nil {
panic(fmt.Sprintf("apiserver-override %q is not a valid HTTP URI: %v",
npdo.ApiServerOverride, err))
}
if len(npdo.SystemLogMonitorConfigPaths) == 0 && len(npdo.CustomPluginMonitorConfigPaths) == 0 {
panic(fmt.Sprintf("Either --system-log-monitors or --custom-plugin-monitors is required"))
if len(npdo.SystemLogMonitorConfigPaths) != 0 {
panic("SystemLogMonitorConfigPaths is deprecated. It should have been reassigned to MonitorConfigPaths. This should not happen.")
}
if len(npdo.CustomPluginMonitorConfigPaths) != 0 {
panic("CustomPluginMonitorConfigPaths is deprecated. It should have been reassigned to MonitorConfigPaths. This should not happen.")
}
configCount := 0
for _, problemDaemonConfigPaths := range npdo.MonitorConfigPaths {
configCount += len(*problemDaemonConfigPaths)
}
if configCount == 0 {
panic("No configuration option for any problem daemon is specified.")
}
}
// SetConfigFromDeprecatedOptionsOrDie sets NPD option using deprecated options.
func (npdo *NodeProblemDetectorOptions) SetConfigFromDeprecatedOptionsOrDie() {
if len(npdo.SystemLogMonitorConfigPaths) != 0 {
if npdo.MonitorConfigPaths[systemlogmonitor.SystemLogMonitorName] == nil {
npdo.MonitorConfigPaths[systemlogmonitor.SystemLogMonitorName] = &[]string{}
}
if len(*npdo.MonitorConfigPaths[systemlogmonitor.SystemLogMonitorName]) != 0 {
panic("Option --system-log-monitors is deprecated in favor of --config.system-log-monitor. They cannot be set at the same time.")
}
*npdo.MonitorConfigPaths[systemlogmonitor.SystemLogMonitorName] = append(
*npdo.MonitorConfigPaths[systemlogmonitor.SystemLogMonitorName],
npdo.SystemLogMonitorConfigPaths...)
npdo.SystemLogMonitorConfigPaths = []string{}
}
if len(npdo.CustomPluginMonitorConfigPaths) != 0 {
if npdo.MonitorConfigPaths[custompluginmonitor.CustomPluginMonitorName] == nil {
npdo.MonitorConfigPaths[custompluginmonitor.CustomPluginMonitorName] = &[]string{}
}
if len(*npdo.MonitorConfigPaths[custompluginmonitor.CustomPluginMonitorName]) != 0 {
panic("Option --custom-plugin-monitors is deprecated in favor of --config.custom-plugin-monitor. They cannot be set at the same time.")
}
*npdo.MonitorConfigPaths[custompluginmonitor.CustomPluginMonitorName] = append(
*npdo.MonitorConfigPaths[custompluginmonitor.CustomPluginMonitorName],
npdo.CustomPluginMonitorConfigPaths...)
npdo.CustomPluginMonitorConfigPaths = []string{}
}
}

View File

@@ -18,9 +18,47 @@ package options
import (
"os"
"reflect"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/node-problem-detector/pkg/custompluginmonitor"
"k8s.io/node-problem-detector/pkg/systemlogmonitor"
"k8s.io/node-problem-detector/pkg/types"
)
func equalMonitorConfigPaths(npdoX NodeProblemDetectorOptions, npdoY NodeProblemDetectorOptions) bool {
monitorConfigPathsX, monitorConfigPathsY := npdoX.MonitorConfigPaths, npdoY.MonitorConfigPaths
if monitorConfigPathsX == nil && monitorConfigPathsY == nil {
return true
}
if monitorConfigPathsX == nil || monitorConfigPathsY == nil {
return false
}
if len(monitorConfigPathsX) != len(monitorConfigPathsY) {
return false
}
for problemDaemonType, configPathsX := range monitorConfigPathsX {
configPathsY, ok := monitorConfigPathsY[problemDaemonType]
if !ok {
return false
}
if configPathsX == nil && configPathsY == nil {
continue
}
if configPathsX == nil || configPathsY == nil {
return false
}
if !reflect.DeepEqual(*configPathsX, *configPathsY) {
return false
}
}
return true
}
type options struct {
Nodename string
HostnameOverride string
@@ -82,3 +120,248 @@ func TestSetNodeNameOrDie(t *testing.T) {
}
}
}
func TestValidOrDie(t *testing.T) {
fooMonitorConfigMap := types.ProblemDaemonConfigPathMap{}
fooMonitorConfigMap["foo-monitor"] = &[]string{"config-a", "config-b"}
emptyMonitorConfigMap := types.ProblemDaemonConfigPathMap{}
testCases := []struct {
name string
npdo NodeProblemDetectorOptions
expectPanic bool
}{
{
name: "default k8s exporter config",
npdo: NodeProblemDetectorOptions{
MonitorConfigPaths: fooMonitorConfigMap,
},
expectPanic: false,
},
{
name: "enables k8s exporter config",
npdo: NodeProblemDetectorOptions{
ApiServerOverride: "",
EnableK8sExporter: true,
MonitorConfigPaths: fooMonitorConfigMap,
},
expectPanic: false,
},
{
name: "k8s exporter config with valid ApiServerOverride",
npdo: NodeProblemDetectorOptions{
ApiServerOverride: "127.0.0.1",
EnableK8sExporter: true,
MonitorConfigPaths: fooMonitorConfigMap,
},
expectPanic: false,
},
{
name: "k8s exporter config with invalid ApiServerOverride",
npdo: NodeProblemDetectorOptions{
ApiServerOverride: ":foo",
EnableK8sExporter: true,
MonitorConfigPaths: fooMonitorConfigMap,
},
expectPanic: true,
},
{
name: "non-empty MonitorConfigPaths",
npdo: NodeProblemDetectorOptions{
MonitorConfigPaths: fooMonitorConfigMap,
},
expectPanic: false,
},
{
name: "empty MonitorConfigPaths",
npdo: NodeProblemDetectorOptions{
MonitorConfigPaths: emptyMonitorConfigMap,
},
expectPanic: true,
},
{
name: "un-initialized MonitorConfigPaths",
npdo: NodeProblemDetectorOptions{},
expectPanic: true,
},
{
name: "mixture of deprecated SystemLogMonitorConfigPaths and new MonitorConfigPaths",
npdo: NodeProblemDetectorOptions{
SystemLogMonitorConfigPaths: []string{"config-a"},
MonitorConfigPaths: fooMonitorConfigMap,
},
expectPanic: true,
},
{
name: "mixture of deprecated CustomPluginMonitorConfigPaths and new MonitorConfigPaths",
npdo: NodeProblemDetectorOptions{
CustomPluginMonitorConfigPaths: []string{"config-a"},
MonitorConfigPaths: fooMonitorConfigMap,
},
expectPanic: true,
},
{
name: "deprecated SystemLogMonitor option with empty MonitorConfigPaths",
npdo: NodeProblemDetectorOptions{
SystemLogMonitorConfigPaths: []string{"config-a"},
MonitorConfigPaths: emptyMonitorConfigMap,
},
expectPanic: true,
},
{
name: "deprecated SystemLogMonitor option with un-initialized MonitorConfigPaths",
npdo: NodeProblemDetectorOptions{
SystemLogMonitorConfigPaths: []string{"config-a"},
},
expectPanic: true,
},
{
name: "deprecated CustomPluginMonitor option with empty MonitorConfigPaths",
npdo: NodeProblemDetectorOptions{
CustomPluginMonitorConfigPaths: []string{"config-b"},
MonitorConfigPaths: emptyMonitorConfigMap,
},
expectPanic: true,
},
{
name: "deprecated CustomPluginMonitor option with un-initialized MonitorConfigPaths",
npdo: NodeProblemDetectorOptions{
CustomPluginMonitorConfigPaths: []string{"config-b"},
},
expectPanic: true,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
if test.expectPanic {
assert.Panics(t, test.npdo.ValidOrDie, "NPD option %+v is invalid. Expected ValidOrDie to panic.", test.npdo)
} else {
assert.NotPanics(t, test.npdo.ValidOrDie, "NPD option %+v is valid. Expected ValidOrDie to not panic.", test.npdo)
}
})
}
}
func TestSetConfigFromDeprecatedOptionsOrDie(t *testing.T) {
testCases := []struct {
name string
orig NodeProblemDetectorOptions
wanted NodeProblemDetectorOptions
expectPanic bool
}{
{
name: "no deprecated options",
orig: NodeProblemDetectorOptions{
MonitorConfigPaths: types.ProblemDaemonConfigPathMap{
systemlogmonitor.SystemLogMonitorName: &[]string{"config-a", "config-b"},
custompluginmonitor.CustomPluginMonitorName: &[]string{"config-c", "config-d"},
},
},
expectPanic: false,
wanted: NodeProblemDetectorOptions{
MonitorConfigPaths: types.ProblemDaemonConfigPathMap{
systemlogmonitor.SystemLogMonitorName: &[]string{"config-a", "config-b"},
custompluginmonitor.CustomPluginMonitorName: &[]string{"config-c", "config-d"},
},
},
},
{
name: "correctly using deprecated options",
orig: NodeProblemDetectorOptions{
SystemLogMonitorConfigPaths: []string{"config-a", "config-b"},
CustomPluginMonitorConfigPaths: []string{"config-c", "config-d"},
MonitorConfigPaths: types.ProblemDaemonConfigPathMap{},
},
expectPanic: false,
wanted: NodeProblemDetectorOptions{
MonitorConfigPaths: types.ProblemDaemonConfigPathMap{
systemlogmonitor.SystemLogMonitorName: &[]string{"config-a", "config-b"},
custompluginmonitor.CustomPluginMonitorName: &[]string{"config-c", "config-d"},
},
},
},
{
name: "using deprecated SystemLogMonitor option and new CustomPluginMonitor option",
orig: NodeProblemDetectorOptions{
SystemLogMonitorConfigPaths: []string{"config-a", "config-b"},
MonitorConfigPaths: types.ProblemDaemonConfigPathMap{
custompluginmonitor.CustomPluginMonitorName: &[]string{"config-c", "config-d"},
},
},
expectPanic: false,
wanted: NodeProblemDetectorOptions{
MonitorConfigPaths: types.ProblemDaemonConfigPathMap{
systemlogmonitor.SystemLogMonitorName: &[]string{"config-a", "config-b"},
custompluginmonitor.CustomPluginMonitorName: &[]string{"config-c", "config-d"},
},
},
},
{
name: "using deprecated CustomPluginMonitor option and new SystemLogMonitor option",
orig: NodeProblemDetectorOptions{
CustomPluginMonitorConfigPaths: []string{"config-a", "config-b"},
MonitorConfigPaths: types.ProblemDaemonConfigPathMap{
systemlogmonitor.SystemLogMonitorName: &[]string{"config-c", "config-d"},
},
},
expectPanic: false,
wanted: NodeProblemDetectorOptions{
MonitorConfigPaths: types.ProblemDaemonConfigPathMap{
systemlogmonitor.SystemLogMonitorName: &[]string{"config-c", "config-d"},
custompluginmonitor.CustomPluginMonitorName: &[]string{"config-a", "config-b"},
},
},
},
{
name: "using deprecated & new options on SystemLogMonitor",
orig: NodeProblemDetectorOptions{
SystemLogMonitorConfigPaths: []string{"config-a"},
MonitorConfigPaths: types.ProblemDaemonConfigPathMap{
systemlogmonitor.SystemLogMonitorName: &[]string{"config-b"},
},
},
expectPanic: true,
wanted: NodeProblemDetectorOptions{
MonitorConfigPaths: types.ProblemDaemonConfigPathMap{
systemlogmonitor.SystemLogMonitorName: &[]string{"config-b"},
},
},
},
{
name: "using deprecated & new options on CustomPluginMonitor",
orig: NodeProblemDetectorOptions{
CustomPluginMonitorConfigPaths: []string{"config-a"},
MonitorConfigPaths: types.ProblemDaemonConfigPathMap{
custompluginmonitor.CustomPluginMonitorName: &[]string{"config-b"},
},
},
expectPanic: true,
wanted: NodeProblemDetectorOptions{
MonitorConfigPaths: types.ProblemDaemonConfigPathMap{
custompluginmonitor.CustomPluginMonitorName: &[]string{"config-b"},
},
},
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
if test.expectPanic {
assert.Panics(t, test.orig.SetConfigFromDeprecatedOptionsOrDie,
"NPD option %+v is illegal. Expected SetConfigFromDeprecatedOptionsOrDie to panic.", test.orig)
} else {
assert.NotPanics(t, test.orig.SetConfigFromDeprecatedOptionsOrDie,
"NPD option %+v is illegal. Expected SetConfigFromDeprecatedOptionsOrDie to not panic.", test.orig)
if !equalMonitorConfigPaths(test.orig, test.wanted) {
t.Errorf("Expect to get NPD option %+v, but got %+v", test.wanted, test.orig)
}
assert.Len(t, test.orig.SystemLogMonitorConfigPaths, 0,
"SystemLogMonitorConfigPaths is deprecated and should to be cleared.")
assert.Len(t, test.orig.CustomPluginMonitorConfigPaths, 0,
"CustomPluginMonitorConfigPaths is deprecated and should to be cleared.")
}
})
}
}

24
cmd/plugins.go Normal file
View File

@@ -0,0 +1,24 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
// register problem daemons here
import (
_ "k8s.io/node-problem-detector/pkg/custompluginmonitor"
_ "k8s.io/node-problem-detector/pkg/systemlogmonitor"
_ "k8s.io/node-problem-detector/pkg/systemstatsmonitor"
)

View File

@@ -0,0 +1,19 @@
{
"disk": {
"metricsConfigs": {
"disk/io_time": {
"displayName": "disk/io_time"
},
"disk/weighted_io": {
"displayName": "disk/weighted_io"
},
"disk/avg_queue_len": {
"displayName": "disk/avg_queue_len"
}
},
"includeRootBlk": true,
"includeAllAttachedBlk": true,
"lsblkTimeout": "5s"
},
"invokeInterval": "60s"
}

View File

@@ -25,11 +25,22 @@ import (
"k8s.io/node-problem-detector/pkg/custompluginmonitor/plugin"
cpmtypes "k8s.io/node-problem-detector/pkg/custompluginmonitor/types"
"k8s.io/node-problem-detector/pkg/problemdaemon"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util"
"k8s.io/node-problem-detector/pkg/util/tomb"
)
const CustomPluginMonitorName = "custom-plugin-monitor"
func init() {
problemdaemon.Register(
CustomPluginMonitorName,
types.ProblemDaemonHandler{
CreateProblemDaemonOrDie: NewCustomPluginMonitorOrDie,
CmdOptionDescription: "Set to config file paths."})
}
type customPluginMonitor struct {
config cpmtypes.CustomPluginConfig
conditions []types.Condition

View File

@@ -0,0 +1,31 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package custompluginmonitor
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/node-problem-detector/pkg/problemdaemon"
)
func TestRegistration(t *testing.T) {
assert.NotPanics(t,
func() { problemdaemon.GetProblemDaemonHandlerOrDie("custom-plugin-monitor") },
"Custom plugin monitor failed to register itself as a problem daemon.")
}

View File

@@ -261,11 +261,11 @@ func TestCustomPluginConfigValidate(t *testing.T) {
err := utMeta.Conf.Validate()
if err != nil && !utMeta.IsError {
t.Error(desp)
t.Errorf("Error in validating custom plugin configuration %+v. Want an error got nil", utMeta)
t.Errorf("Error in validating custom plugin configuration %+v. Wanted nil got an error", utMeta)
}
if err == nil && utMeta.IsError {
t.Error(desp)
t.Errorf("Error in validating custom plugin configuration %+v. Want nil got an error", utMeta)
t.Errorf("Error in validating custom plugin configuration %+v. Wanted an error got nil", utMeta)
}
}
}

View File

@@ -21,7 +21,7 @@ import (
"sync"
"time"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter/problemclient"
"k8s.io/node-problem-detector/pkg/types"
problemutil "k8s.io/node-problem-detector/pkg/util"

View File

@@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter/problemclient"
"k8s.io/node-problem-detector/pkg/types"
problemutil "k8s.io/node-problem-detector/pkg/util"

View File

@@ -0,0 +1,93 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package k8sexporter
import (
"net"
"net/http"
_ "net/http/pprof"
"strconv"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/node-problem-detector/cmd/options"
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter/condition"
"k8s.io/node-problem-detector/pkg/exporters/k8sexporter/problemclient"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util"
)
type k8sExporter struct {
client problemclient.Client
conditionManager condition.ConditionManager
}
// NewExporterOrDie creates a exporter for Kubernetes apiserver exporting, panics if error occurs.
func NewExporterOrDie(npdo *options.NodeProblemDetectorOptions) types.Exporter {
if !npdo.EnableK8sExporter {
return nil
}
c := problemclient.NewClientOrDie(npdo)
ke := k8sExporter{
client: c,
conditionManager: condition.NewConditionManager(c, clock.RealClock{}),
}
ke.startHTTPReporting(npdo)
ke.conditionManager.Start()
return &ke
}
func (ke *k8sExporter) ExportProblems(status *types.Status) {
for _, event := range status.Events {
ke.client.Eventf(util.ConvertToAPIEventType(event.Severity), status.Source, event.Reason, event.Message)
}
for _, cdt := range status.Conditions {
ke.conditionManager.UpdateCondition(cdt)
}
}
func (ke *k8sExporter) startHTTPReporting(npdo *options.NodeProblemDetectorOptions) {
if npdo.ServerPort <= 0 {
return
}
mux := http.NewServeMux()
// Add healthz http request handler. Always return ok now, add more health check
// logic in the future.
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
})
// Add the handler to serve condition http request.
mux.HandleFunc("/conditions", func(w http.ResponseWriter, r *http.Request) {
util.ReturnHTTPJson(w, ke.conditionManager.GetConditions())
})
addr := net.JoinHostPort(npdo.ServerAddress, strconv.Itoa(npdo.ServerPort))
go func() {
err := http.ListenAndServe(addr, mux)
if err != nil {
glog.Fatalf("Failed to start server: %v", err)
}
}()
}

View File

@@ -0,0 +1,60 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prometheusexporter
import (
"net"
"net/http"
"strconv"
"contrib.go.opencensus.io/exporter/prometheus"
"github.com/golang/glog"
"go.opencensus.io/stats/view"
"k8s.io/node-problem-detector/cmd/options"
"k8s.io/node-problem-detector/pkg/types"
)
type prometheusExporter struct{}
// NewExporterOrDie creates an exporter to export metrics to Prometheus, panics if error occurs.
func NewExporterOrDie(npdo *options.NodeProblemDetectorOptions) types.Exporter {
if npdo.PrometheusServerPort <= 0 {
return nil
}
addr := net.JoinHostPort(npdo.PrometheusServerAddress, strconv.Itoa(npdo.PrometheusServerPort))
pe, err := prometheus.NewExporter(prometheus.Options{})
if err != nil {
glog.Fatalf("Failed to create Prometheus exporter: %v", err)
}
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", pe)
if err := http.ListenAndServe(addr, mux); err != nil {
glog.Fatalf("Failed to start Prometheus scrape endpoint: %v", err)
}
}()
view.RegisterExporter(pe)
return &prometheusExporter{}
}
// ExportProblems does nothing.
// Prometheus exporter only exports metrics.
func (pe *prometheusExporter) ExportProblems(status *types.Status) {
return
}

View File

@@ -0,0 +1,73 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package problemdaemon
import (
"fmt"
"github.com/golang/glog"
"k8s.io/node-problem-detector/pkg/types"
)
var (
handlers = make(map[types.ProblemDaemonType]types.ProblemDaemonHandler)
)
// Register registers a problem daemon factory method, which will be used to create the problem daemon.
func Register(problemDaemonType types.ProblemDaemonType, handler types.ProblemDaemonHandler) {
handlers[problemDaemonType] = handler
}
// GetProblemDaemonNames retrieves all available problem daemon types.
func GetProblemDaemonNames() []types.ProblemDaemonType {
problemDaemonTypes := []types.ProblemDaemonType{}
for problemDaemonType := range handlers {
problemDaemonTypes = append(problemDaemonTypes, problemDaemonType)
}
return problemDaemonTypes
}
// GetProblemDaemonHandlerOrDie retrieves the ProblemDaemonHandler for a specific type of problem daemon, panic if error occurs..
func GetProblemDaemonHandlerOrDie(problemDaemonType types.ProblemDaemonType) types.ProblemDaemonHandler {
handler, ok := handlers[problemDaemonType]
if !ok {
panic(fmt.Sprintf("Problem daemon handler for %v does not exist", problemDaemonType))
}
return handler
}
// NewProblemDaemons creates all problem daemons based on the configurations provided.
func NewProblemDaemons(monitorConfigPaths types.ProblemDaemonConfigPathMap) []types.Monitor {
problemDaemonMap := make(map[string]types.Monitor)
for problemDaemonType, configs := range monitorConfigPaths {
for _, config := range *configs {
if _, ok := problemDaemonMap[config]; ok {
// Skip the config if it's duplicated.
glog.Warningf("Duplicated problem daemon configuration %q", config)
continue
}
problemDaemonMap[config] = handlers[problemDaemonType].CreateProblemDaemonOrDie(config)
}
}
problemDaemons := []types.Monitor{}
for _, problemDaemon := range problemDaemonMap {
problemDaemons = append(problemDaemons, problemDaemon)
}
return problemDaemons
}

View File

@@ -0,0 +1,72 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package problemdaemon
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/node-problem-detector/pkg/types"
)
func TestRegistration(t *testing.T) {
fooMonitorFactory := func(configPath string) types.Monitor {
return nil
}
fooMonitorHandler := types.ProblemDaemonHandler{
CreateProblemDaemonOrDie: fooMonitorFactory,
CmdOptionDescription: "foo option",
}
barMonitorFactory := func(configPath string) types.Monitor {
return nil
}
barMonitorHandler := types.ProblemDaemonHandler{
CreateProblemDaemonOrDie: barMonitorFactory,
CmdOptionDescription: "bar option",
}
Register("foo", fooMonitorHandler)
Register("bar", barMonitorHandler)
expectedProblemDaemonNames := []types.ProblemDaemonType{"foo", "bar"}
problemDaemonNames := GetProblemDaemonNames()
assert.ElementsMatch(t, expectedProblemDaemonNames, problemDaemonNames)
assert.Equal(t, "foo option", GetProblemDaemonHandlerOrDie("foo").CmdOptionDescription)
assert.Equal(t, "bar option", GetProblemDaemonHandlerOrDie("bar").CmdOptionDescription)
handlers = make(map[types.ProblemDaemonType]types.ProblemDaemonHandler)
}
func TestGetProblemDaemonHandlerOrDie(t *testing.T) {
fooMonitorFactory := func(configPath string) types.Monitor {
return nil
}
fooMonitorHandler := types.ProblemDaemonHandler{
CreateProblemDaemonOrDie: fooMonitorFactory,
CmdOptionDescription: "foo option",
}
Register("foo", fooMonitorHandler)
assert.NotPanics(t, func() { GetProblemDaemonHandlerOrDie("foo") })
assert.Panics(t, func() { GetProblemDaemonHandlerOrDie("bar") })
handlers = make(map[types.ProblemDaemonType]types.ProblemDaemonHandler)
}

View File

@@ -18,56 +18,48 @@ package problemdetector
import (
"fmt"
"net/http"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/node-problem-detector/pkg/condition"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util"
)
// ProblemDetector collects statuses from all problem daemons and update the node condition and send node event.
type ProblemDetector interface {
Run() error
RegisterHTTPHandlers()
}
type problemDetector struct {
client problemclient.Client
conditionManager condition.ConditionManager
monitors map[string]types.Monitor
monitors []types.Monitor
exporters []types.Exporter
}
// NewProblemDetector creates the problem detector. Currently we just directly passed in the problem daemons, but
// in the future we may want to let the problem daemons register themselves.
func NewProblemDetector(monitors map[string]types.Monitor, client problemclient.Client) ProblemDetector {
func NewProblemDetector(monitors []types.Monitor, exporters []types.Exporter) ProblemDetector {
return &problemDetector{
client: client,
conditionManager: condition.NewConditionManager(client, clock.RealClock{}),
monitors: monitors,
monitors: monitors,
exporters: exporters,
}
}
// Run starts the problem detector.
func (p *problemDetector) Run() error {
p.conditionManager.Start()
// Start the log monitors one by one.
var chans []<-chan *types.Status
for cfg, m := range p.monitors {
for _, m := range p.monitors {
ch, err := m.Start()
if err != nil {
// Do not return error and keep on trying the following config files.
glog.Errorf("Failed to start log monitor %q: %v", cfg, err)
glog.Errorf("Failed to start problem daemon %v: %v", m, err)
continue
}
chans = append(chans, ch)
if ch != nil {
chans = append(chans, ch)
}
}
if len(chans) == 0 {
return fmt.Errorf("no log monitor is successfully setup")
return fmt.Errorf("no problem daemon is successfully setup")
}
ch := groupChannel(chans)
glog.Info("Problem detector started")
@@ -75,24 +67,13 @@ func (p *problemDetector) Run() error {
for {
select {
case status := <-ch:
for _, event := range status.Events {
p.client.Eventf(util.ConvertToAPIEventType(event.Severity), status.Source, event.Reason, event.Message)
}
for _, cdt := range status.Conditions {
p.conditionManager.UpdateCondition(cdt)
for _, exporter := range p.exporters {
exporter.ExportProblems(status)
}
}
}
}
// RegisterHTTPHandlers registers http handlers of node problem detector.
func (p *problemDetector) RegisterHTTPHandlers() {
// Add the handler to serve condition http request.
http.HandleFunc("/conditions", func(w http.ResponseWriter, r *http.Request) {
util.ReturnHTTPJson(w, p.conditionManager.GetConditions())
})
}
func groupChannel(chans []<-chan *types.Status) <-chan *types.Status {
statuses := make(chan *types.Status)
for _, ch := range chans {

View File

@@ -23,6 +23,7 @@ import (
"github.com/golang/glog"
"k8s.io/node-problem-detector/pkg/problemdaemon"
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers"
watchertypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
@@ -32,6 +33,16 @@ import (
"k8s.io/node-problem-detector/pkg/util/tomb"
)
const SystemLogMonitorName = "system-log-monitor"
func init() {
problemdaemon.Register(
SystemLogMonitorName,
types.ProblemDaemonHandler{
CreateProblemDaemonOrDie: NewLogMonitorOrDie,
CmdOptionDescription: "Set to config file paths."})
}
type logMonitor struct {
watcher watchertypes.LogWatcher
buffer LogBuffer

View File

@@ -21,6 +21,9 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/node-problem-detector/pkg/problemdaemon"
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util"
@@ -32,6 +35,12 @@ const (
testConditionB = "TestConditionB"
)
func TestRegistration(t *testing.T) {
assert.NotPanics(t,
func() { problemdaemon.GetProblemDaemonHandlerOrDie("system-log-monitor") },
"System log monitor failed to register itself as a problem daemon.")
}
func TestGenerateStatus(t *testing.T) {
initConditions := []types.Condition{
{

View File

@@ -0,0 +1,30 @@
# System Stats Monitor
*System Stats Monitor* is a problem daemon in node problem detector. It collects pre-defined health-related metrics from different system components. Each component may allow further detailed configurations.
Currently supported components are:
* disk
See example config file [here](https://github.com/kubernetes/node-problem-detector/blob/master/config/system-stats-monitor.json).
## Detailed Configuration Options
### Global Configurations
Data collection period can be specified globally in the config file, see `invokeInterval` at the [example](https://github.com/kubernetes/node-problem-detector/blob/master/config/system-stats-monitor.json).
### Disk
Below metrics are collected from `disk` component:
* `disk/io_time`: [# of milliseconds spent doing I/Os on this device](https://www.kernel.org/doc/Documentation/iostats.txt)
* `disk/weighted_io`: [# of milliseconds spent doing I/Os on this device](https://www.kernel.org/doc/Documentation/iostats.txt)
* `disk/avg_queue_len`: [average # of requests that was waiting in queue or being serviced during the last `invokeInterval`](https://www.xaprb.com/blog/2010/01/09/how-linux-iostat-computes-its-results/)
By setting the `metricsConfigs` field and `displayName` field ([example](https://github.com/kubernetes/node-problem-detector/blob/master/config/system-stats-monitor.json)), you can specify the list of metrics to be collected, and their display names on the Prometheus scaping endpoint. The name of the disk block device will be reported in the `device` metrics label.
And a few other options:
* `includeRootBlk`: When set to `true`, add all block devices that's [not a slave or holder device](http://man7.org/linux/man-pages/man8/lsblk.8.html) to the list of disks that System Stats Monitor collects metrics from. When set to `false`, do not modify the list of disks that System Stats Monitor collects metrics from.
* `includeAllAttachedBlk`: When set to `true`, add all currently attached block devices to the list of disks that System Stats Monitor collects metrics from. When set to `false`, do not modify the list of disks that System Stats Monitor collects metrics from.
* `lsblkTimeout`: System Stats Monitor uses [`lsblk`](http://man7.org/linux/man-pages/man8/lsblk.8.html) to retrieve block devices information. This option sets the timeout for calling `lsblk` commands.

View File

@@ -0,0 +1,142 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemstatsmonitor
import (
"context"
"os/exec"
"strings"
"time"
"github.com/golang/glog"
"github.com/shirou/gopsutil/disk"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
ssmtypes "k8s.io/node-problem-detector/pkg/systemstatsmonitor/types"
)
type diskCollector struct {
keyDevice tag.Key
mIOTime *stats.Int64Measure
mWeightedIO *stats.Int64Measure
mAvgQueueLen *stats.Float64Measure
config *ssmtypes.DiskStatsConfig
historyIOTime map[string]uint64
historyWeightedIO map[string]uint64
}
func NewDiskCollectorOrDie(diskConfig *ssmtypes.DiskStatsConfig) *diskCollector {
dc := diskCollector{config: diskConfig}
dc.keyDevice, _ = tag.NewKey("device")
dc.mIOTime = newInt64Metric(
diskConfig.MetricsConfigs["disk/io_time"].DisplayName,
"The IO time spent on the disk",
"second",
view.LastValue(),
[]tag.Key{dc.keyDevice})
dc.mWeightedIO = newInt64Metric(
diskConfig.MetricsConfigs["disk/weighted_io"].DisplayName,
"The weighted IO on the disk",
"second",
view.LastValue(),
[]tag.Key{dc.keyDevice})
dc.mAvgQueueLen = newFloat64Metric(
diskConfig.MetricsConfigs["disk/avg_queue_len"].DisplayName,
"The average queue length on the disk",
"second",
view.LastValue(),
[]tag.Key{dc.keyDevice})
dc.historyIOTime = make(map[string]uint64)
dc.historyWeightedIO = make(map[string]uint64)
return &dc
}
func (dc *diskCollector) collect() {
if dc == nil {
return
}
blks := []string{}
if dc.config.IncludeRootBlk {
blks = append(blks, listRootBlockDevices(dc.config.LsblkTimeout)...)
}
if dc.config.IncludeAllAttachedBlk {
blks = append(blks, listAttachedBlockDevices()...)
}
ioCountersStats, _ := disk.IOCounters(blks...)
for deviceName, ioCountersStat := range ioCountersStats {
// Calculate average IO queue length since last measurement.
lastIOTime := dc.historyIOTime[deviceName]
lastWeightedIO := dc.historyWeightedIO[deviceName]
dc.historyIOTime[deviceName] = ioCountersStat.IoTime
dc.historyWeightedIO[deviceName] = ioCountersStat.WeightedIO
avg_queue_len := float64(0.0)
if lastIOTime != ioCountersStat.IoTime {
avg_queue_len = float64(ioCountersStat.WeightedIO-lastWeightedIO) / float64(ioCountersStat.IoTime-lastIOTime)
}
// Attach label {"device": deviceName} to the metrics.
device_ctx, _ := tag.New(context.Background(), tag.Upsert(dc.keyDevice, deviceName))
if dc.mIOTime != nil {
stats.Record(device_ctx, dc.mIOTime.M(int64(ioCountersStat.IoTime)))
}
if dc.mWeightedIO != nil {
stats.Record(device_ctx, dc.mWeightedIO.M(int64(ioCountersStat.WeightedIO)))
}
if dc.mAvgQueueLen != nil {
stats.Record(device_ctx, dc.mAvgQueueLen.M(avg_queue_len))
}
}
}
// listRootBlockDevices lists all block devices that's not a slave or holder.
func listRootBlockDevices(timeout time.Duration) []string {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// "-d" prevents printing slave or holder devices. i.e. /dev/sda1, /dev/sda2...
// "-n" prevents printing the headings.
// "-p NAME" specifies to only print the device name.
cmd := exec.CommandContext(ctx, "lsblk", "-d", "-n", "-o", "NAME")
stdout, err := cmd.Output()
if err != nil {
glog.Errorf("Error calling lsblk")
}
return strings.Split(strings.TrimSpace(string(stdout)), "\n")
}
// listAttachedBlockDevices lists all currently attached block devices.
func listAttachedBlockDevices() []string {
partitions, _ := disk.Partitions(false)
blks := []string{}
for _, partition := range partitions {
blks = append(blks, partition.Device)
}
return blks
}

View File

@@ -0,0 +1,57 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemstatsmonitor
import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
// newInt64Metric create a stats.Int64 metrics, returns nil when name is empty.
func newInt64Metric(name string, description string, unit string, aggregation *view.Aggregation, tagKeys []tag.Key) *stats.Int64Measure {
if name == "" {
return nil
}
measure := stats.Int64(name, description, unit)
newView := &view.View{
Name: name,
Measure: measure,
Description: description,
Aggregation: aggregation,
TagKeys: tagKeys,
}
view.Register(newView)
return measure
}
// newFloat64Metric create a stats.Float64 metrics, returns nil when name is empty.
func newFloat64Metric(name string, description string, unit string, aggregation *view.Aggregation, tagKeys []tag.Key) *stats.Float64Measure {
if name == "" {
return nil
}
measure := stats.Float64(name, description, unit)
newView := &view.View{
Name: name,
Measure: measure,
Description: description,
Aggregation: aggregation,
TagKeys: tagKeys,
}
view.Register(newView)
return measure
}

View File

@@ -0,0 +1,112 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemstatsmonitor
import (
"encoding/json"
"io/ioutil"
"time"
"github.com/golang/glog"
"k8s.io/node-problem-detector/pkg/problemdaemon"
ssmtypes "k8s.io/node-problem-detector/pkg/systemstatsmonitor/types"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util/tomb"
)
const SystemStatsMonitorName = "system-stats-monitor"
func init() {
problemdaemon.Register(SystemStatsMonitorName, types.ProblemDaemonHandler{
CreateProblemDaemonOrDie: NewSystemStatsMonitorOrDie,
CmdOptionDescription: "Set to config file paths."})
}
type systemStatsMonitor struct {
config ssmtypes.SystemStatsConfig
diskCollector *diskCollector
tomb *tomb.Tomb
}
// NewSystemStatsMonitorOrDie creates a system stats monitor.
func NewSystemStatsMonitorOrDie(configPath string) types.Monitor {
ssm := systemStatsMonitor{
tomb: tomb.NewTomb(),
}
// Apply configurations.
f, err := ioutil.ReadFile(configPath)
if err != nil {
glog.Fatalf("Failed to read configuration file %q: %v", configPath, err)
}
err = json.Unmarshal(f, &ssm.config)
if err != nil {
glog.Fatalf("Failed to unmarshal configuration file %q: %v", configPath, err)
}
err = ssm.config.ApplyConfiguration()
if err != nil {
glog.Fatalf("Failed to apply configuration for %q: %v", configPath, err)
}
err = ssm.config.Validate()
if err != nil {
glog.Fatalf("Failed to validate configuration %+v: %v", ssm.config, err)
}
// Initialize diskCollector if needed.
if len(ssm.config.DiskConfig.MetricsConfigs) > 0 {
ssm.diskCollector = NewDiskCollectorOrDie(&ssm.config.DiskConfig)
}
return &ssm
}
func (ssm *systemStatsMonitor) Start() (<-chan *types.Status, error) {
glog.Info("Start system stats monitor")
go ssm.monitorLoop()
return nil, nil
}
func (ssm *systemStatsMonitor) monitorLoop() {
defer ssm.tomb.Done()
runTicker := time.NewTicker(ssm.config.InvokeInterval)
defer runTicker.Stop()
select {
case <-ssm.tomb.Stopping():
glog.Infof("System stats monitor stopped")
return
default:
ssm.diskCollector.collect()
}
for {
select {
case <-runTicker.C:
ssm.diskCollector.collect()
case <-ssm.tomb.Stopping():
glog.Infof("System stats monitor stopped")
return
}
}
}
func (ssm *systemStatsMonitor) Stop() {
glog.Info("Stop system stats monitor")
ssm.tomb.Stop()
}

View File

@@ -0,0 +1,31 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemstatsmonitor
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/node-problem-detector/pkg/problemdaemon"
)
func TestRegistration(t *testing.T) {
assert.NotPanics(t,
func() { problemdaemon.GetProblemDaemonHandlerOrDie(SystemStatsMonitorName) },
"System stats monitor failed to register itself as a problem daemon.")
}

View File

@@ -0,0 +1,82 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"fmt"
"time"
)
var (
defaultInvokeIntervalString = (60 * time.Second).String()
defaultlsblkTimeoutString = (5 * time.Second).String()
)
type MetricConfig struct {
DisplayName string `json:"displayName"`
}
type DiskStatsConfig struct {
MetricsConfigs map[string]MetricConfig `json:"metricsConfigs"`
IncludeRootBlk bool `json:"includeRootBlk"`
IncludeAllAttachedBlk bool `json:"includeAllAttachedBlk"`
LsblkTimeoutString string `json:"lsblkTimeout"`
LsblkTimeout time.Duration `json:"-"`
}
type SystemStatsConfig struct {
DiskConfig DiskStatsConfig `json:"disk"`
InvokeIntervalString string `json:"invokeInterval"`
InvokeInterval time.Duration `json:"-"`
}
// ApplyConfiguration applies default configurations.
func (ssc *SystemStatsConfig) ApplyConfiguration() error {
if ssc.InvokeIntervalString == "" {
ssc.InvokeIntervalString = defaultInvokeIntervalString
}
if ssc.DiskConfig.LsblkTimeoutString == "" {
ssc.DiskConfig.LsblkTimeoutString = defaultlsblkTimeoutString
}
var err error
ssc.InvokeInterval, err = time.ParseDuration(ssc.InvokeIntervalString)
if err != nil {
return fmt.Errorf("error in parsing InvokeIntervalString %q: %v", ssc.InvokeIntervalString, err)
}
ssc.DiskConfig.LsblkTimeout, err = time.ParseDuration(ssc.DiskConfig.LsblkTimeoutString)
if err != nil {
return fmt.Errorf("error in parsing LsblkTimeoutString %q: %v", ssc.DiskConfig.LsblkTimeoutString, err)
}
return nil
}
// Validate verifies whether the settings are valid.
func (ssc *SystemStatsConfig) Validate() error {
if ssc.InvokeInterval <= time.Duration(0) {
return fmt.Errorf("InvokeInterval %v must be above 0s", ssc.InvokeInterval)
}
if ssc.DiskConfig.LsblkTimeout <= time.Duration(0) {
return fmt.Errorf("LsblkTimeout %v must be above 0s", ssc.DiskConfig.LsblkTimeout)
}
if ssc.DiskConfig.LsblkTimeout > ssc.InvokeInterval {
return fmt.Errorf("LsblkTimeout %v must be shorter than ssc.InvokeInterval %v", ssc.DiskConfig.LsblkTimeout, ssc.InvokeInterval)
}
return nil
}

View File

@@ -0,0 +1,158 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"reflect"
"testing"
"time"
)
func TestApplyConfiguration(t *testing.T) {
testCases := []struct {
name string
orignalConfig SystemStatsConfig
wantedConfig SystemStatsConfig
isError bool
}{
{
name: "normal",
orignalConfig: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeoutString: "5s",
},
InvokeIntervalString: "60s",
},
isError: false,
wantedConfig: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeout: 5 * time.Second,
LsblkTimeoutString: "5s",
},
InvokeIntervalString: "60s",
InvokeInterval: 60 * time.Second,
},
},
{
name: "empty",
orignalConfig: SystemStatsConfig{
DiskConfig: DiskStatsConfig{},
},
isError: false,
wantedConfig: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeout: 5 * time.Second,
LsblkTimeoutString: "5s",
},
InvokeIntervalString: "1m0s",
InvokeInterval: 60 * time.Second,
},
},
{
name: "error",
orignalConfig: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeoutString: "foo",
},
},
isError: true,
wantedConfig: SystemStatsConfig{
DiskConfig: DiskStatsConfig{},
},
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
err := test.orignalConfig.ApplyConfiguration()
if err == nil && test.isError {
t.Errorf("Wanted an error got nil")
}
if err != nil && !test.isError {
t.Errorf("Wanted nil got an error")
}
if !test.isError && !reflect.DeepEqual(test.orignalConfig, test.wantedConfig) {
t.Errorf("Wanted: %+v. \nGot: %+v", test.wantedConfig, test.orignalConfig)
}
})
}
}
func TestValidate(t *testing.T) {
testCases := []struct {
name string
config SystemStatsConfig
isError bool
}{
{
name: "normal",
config: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeoutString: "5s",
},
InvokeIntervalString: "60s",
},
isError: false,
},
{
name: "negative-invoke-interval",
config: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeoutString: "5s",
},
InvokeIntervalString: "-1s",
},
isError: true,
},
{
name: "negative-lsblk-timeout",
config: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeoutString: "-1s",
},
InvokeIntervalString: "60s",
},
isError: true,
},
{
name: "lsblk-timeout-bigger-than-invoke-interval",
config: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeoutString: "90s",
},
InvokeIntervalString: "60s",
},
isError: true,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
if err := test.config.ApplyConfiguration(); err != nil {
t.Errorf("Wanted no error with config %+v, got %v", test.config, err)
}
err := test.config.Validate()
if test.isError && err == nil {
t.Errorf("Wanted an error with config %+v, got nil", test.config)
}
if !test.isError && err != nil {
t.Errorf("Wanted nil with config %+v got an error", test.config)
}
})
}
}

View File

@@ -107,3 +107,26 @@ type Monitor interface {
// Stop stops the log monitor.
Stop()
}
// Exporter exports machine health data to certain control plane.
type Exporter interface {
// Export problems to the control plane.
ExportProblems(*Status)
}
// ProblemDaemonType is the type of the problem daemon.
// One type of problem daemon may be used to initialize multiple problem daemon instances.
type ProblemDaemonType string
// ProblemDaemonConfigPathMap represents configurations on all types of problem daemons:
// 1) Each key represents a type of problem daemon.
// 2) Each value represents the config file paths to that type of problem daemon.
type ProblemDaemonConfigPathMap map[ProblemDaemonType]*[]string
// ProblemDaemonHandler represents the initialization handler for a type problem daemon.
type ProblemDaemonHandler struct {
// CreateProblemDaemonOrDie initializes a problem daemon, panic if error occurs.
CreateProblemDaemonOrDie func(string) Monitor
// CmdOptionDescription explains how to configure the problem daemon from command line arguments.
CmdOptionDescription string
}