diff --git a/README.md b/README.md index 048501a9..5da3d12a 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,8 @@ enabled by default in the GCE cluster. # Background There are tons of node problems could possibly affect the pods running on the node such as: -* Hardware issues: Bad cpu, memory or disk; +* Infrastructure daemon issues: ntp service down; +* Hardware issues: Bad cpu, memory or disk, ntp service down; * Kernel issues: Kernel deadlock, corrupted file system; * Container runtime issues: Unresponsive runtime daemon; * ... @@ -53,23 +54,30 @@ List of supported problem daemons: |----------------|:---------------:|:------------| | [KernelMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json) | KernelDeadlock | A system log monitor monitors kernel log and reports problem according to predefined rules. | | [AbrtAdaptor](https://github.com/kubernetes/node-problem-detector/blob/master/config/abrt-adaptor.json) | None | Monitor ABRT log messages and report them further. ABRT (Automatic Bug Report Tool) is health monitoring daemon able to catch kernel problems as well as application crashes of various kinds occurred on the host. For more information visit the [link](https://github.com/abrt). | +| [CustomPluginMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/custom-plugin-monitor.json) | On-demand(According to users configuration) | A custom plugin monitor for node-problem-detector to invoke and check various node problems with user defined check scripts. See proposal [here](https://docs.google.com/document/d/1jK_5YloSYtboj-DtfjmYKxfNnUxCAvohLnsH5aGCAYQ/edit#). | # Usage ## Flags * `--version`: Print current version of node-problem-detector. +* `--address`: The address to bind the node problem detector server. +* `--port`: The port to bind the node problem detector server. Use 0 to disable. * `--system-log-monitors`: List of paths to system log monitor configuration files, comma separated, e.g. [config/kernel-monitor.json](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json). Node problem detector will start a separate log monitor for each configuration. You can use different log monitors to monitor different system log. +* `--custom-plugin-monitors`: List of paths to custom plugin monitor config files, comma separated, e.g. + [config/custom-plugin-monitor.json](https://github.com/kubernetes/node-problem-detector/blob/master/config/custom-plugin-monitor.json). + Node problem detector will start a separate custom plugin monitor for each configuration. You can + use different custom plugin monitors to monitor different node problems. * `--apiserver-override`: A URI parameter used to customize how node-problem-detector connects the apiserver. The format is same as the [`source`](https://github.com/kubernetes/heapster/blob/master/docs/source-configuration.md#kubernetes) flag of [Heapster](https://github.com/kubernetes/heapster). For example, to run without auth, use the following config: -``` -http://APISERVER_IP:APISERVER_PORT?inClusterConfig=false -``` -Refer [heapster docs](https://github.com/kubernetes/heapster/blob/master/docs/source-configuration.md#kubernetes) for a complete list of available options. + ``` + http://APISERVER_IP:APISERVER_PORT?inClusterConfig=false + ``` + Refer [heapster docs](https://github.com/kubernetes/heapster/blob/master/docs/source-configuration.md#kubernetes) for a complete list of available options. * `--hostname-override`: A customized node name used for node-problem-detector to update conditions and emit events. node-problem-detector gets node name first from `hostname-override`, then `NODE_NAME` environment variable and finally fall back to `os.Hostname`. ## Build Image @@ -138,4 +146,5 @@ For more scenarios, see [here](https://github.com/kubernetes/heapster/blob/maste # Links * [Design Doc](https://docs.google.com/document/d/1cs1kqLziG-Ww145yN6vvlKguPbQQ0psrSBnEqpy0pzE/edit?usp=sharing) * [Slides](https://docs.google.com/presentation/d/1bkJibjwWXy8YnB5fna6p-Ltiy-N5p01zUsA22wCNkXA/edit?usp=sharing) +* [Plugin Interface Proposal](https://docs.google.com/document/d/1jK_5YloSYtboj-DtfjmYKxfNnUxCAvohLnsH5aGCAYQ/edit#) * [Addon Manifest](https://github.com/kubernetes/kubernetes/tree/master/cluster/addons/node-problem-detector) diff --git a/cmd/node_problem_detector.go b/cmd/node_problem_detector.go index 1bc6daca..529219b5 100644 --- a/cmd/node_problem_detector.go +++ b/cmd/node_problem_detector.go @@ -27,9 +27,11 @@ import ( "github.com/spf13/pflag" "k8s.io/node-problem-detector/cmd/options" + "k8s.io/node-problem-detector/pkg/custompluginmonitor" "k8s.io/node-problem-detector/pkg/problemclient" "k8s.io/node-problem-detector/pkg/problemdetector" "k8s.io/node-problem-detector/pkg/systemlogmonitor" + "k8s.io/node-problem-detector/pkg/types" "k8s.io/node-problem-detector/pkg/version" ) @@ -67,15 +69,24 @@ func main() { os.Exit(0) } - monitors := make(map[string]systemlogmonitor.LogMonitor) + monitors := make(map[string]types.Monitor) for _, config := range npdo.SystemLogMonitorConfigPaths { if _, ok := monitors[config]; ok { - // Skip the config if it's duplictaed. - glog.Warningf("Duplicated log monitor configuration %q", config) + // Skip the config if it's duplicated. + glog.Warningf("Duplicated monitor configuration %q", config) continue } monitors[config] = systemlogmonitor.NewLogMonitorOrDie(config) } + + for _, config := range npdo.CustomPluginMonitorConfigPaths { + if _, ok := monitors[config]; ok { + // Skip the config if it's duplicated. + glog.Warningf("Duplicated monitor configuration %q", config) + continue + } + monitors[config] = custompluginmonitor.NewCustomPluginMonitorOrDie(config) + } c := problemclient.NewClientOrDie(npdo) p := problemdetector.NewProblemDetector(monitors, c) diff --git a/cmd/options/options.go b/cmd/options/options.go index 286583b0..473ed930 100644 --- a/cmd/options/options.go +++ b/cmd/options/options.go @@ -33,6 +33,9 @@ type NodeProblemDetectorOptions struct { // SystemLogMonitorConfigPaths specifies the list of paths to system log monitor configuration // files. SystemLogMonitorConfigPaths []string + // CustomPluginMonitorConfigPaths specifies the list of paths to custom plugin monitor configuration + // files. + CustomPluginMonitorConfigPaths []string // ApiServerOverride is the custom URI used to connect to Kubernetes ApiServer. ApiServerOverride string // PrintVersion is the flag determining whether version information is printed. @@ -58,6 +61,8 @@ func NewNodeProblemDetectorOptions() *NodeProblemDetectorOptions { func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) { fs.StringSliceVar(&npdo.SystemLogMonitorConfigPaths, "system-log-monitors", []string{}, "List of paths to system log monitor config files, comma separated.") + fs.StringSliceVar(&npdo.CustomPluginMonitorConfigPaths, "custom-plugin-monitors", + []string{}, "List of paths to custom plugin monitor config files, comma separated.") fs.StringVar(&npdo.ApiServerOverride, "apiserver-override", "", "Custom URI used to connect to Kubernetes ApiServer") fs.BoolVar(&npdo.PrintVersion, "version", false, "Print version information and quit") diff --git a/config/custom-plugin-monitor.json b/config/custom-plugin-monitor.json new file mode 100644 index 00000000..166cea1e --- /dev/null +++ b/config/custom-plugin-monitor.json @@ -0,0 +1,32 @@ +{ + "plugin": "custom", + "pluginConfig": { + "invoke_interval": "30s", + "timeout": "5s", + "max_output_length": 80, + "concurrency": 3 + }, + "source": "ntp-custom-plugin-monitor", + "conditions": [ + { + "type": "NTPProblem", + "reason": "NTPIsUp", + "message": "ntp service is up" + } + ], + "rules": [ + { + "type": "temporary", + "reason": "NTPIsDown", + "path": "./config/plugin/check_ntp.sh", + "timeout": "3s" + }, + { + "type": "permanent", + "condition": "NTPProblem", + "reason": "NTPIsDown", + "path": "./config/plugin/check_ntp.sh", + "timeout": "3s" + } + ] +} diff --git a/config/plugin/check_ntp.sh b/config/plugin/check_ntp.sh new file mode 100755 index 00000000..46be5b51 --- /dev/null +++ b/config/plugin/check_ntp.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +# NOTE: THIS NTP SERVICE CHECK SCRIPT ASSUME THAT NTP SERVICE IS RUNNING UNDER SYSTEMD. +# THIS IS JUST AN EXAMPLE. YOU CAN WRITE YOUR OWN NODE PROBLEM PLUGIN ON DEMAND. + +OK=0 +NONOK=1 +UNKNOWN=2 + +which systemctl >/dev/null +if [ $? -ne 0 ]; then + echo "Systemd is not supported" + exit $UNKNOWN +fi + +systemctl status ntp.service | grep 'Active:' | grep -q running +if [ $? -ne 0 ]; then + echo "NTP service is not running" + exit $NONOK +fi + +echo "NTP service is running" +exit $OK diff --git a/pkg/custompluginmonitor/README.md b/pkg/custompluginmonitor/README.md new file mode 100644 index 00000000..70946c20 --- /dev/null +++ b/pkg/custompluginmonitor/README.md @@ -0,0 +1,7 @@ +# Custom Plugin Monitor + +Custom plugin monitor is a plugin mechanism for node-problem-detector. It will +extend node-problem-detector to execute any monitor scripts written in any language. +The monitor scripts must conform to the plugin protocol in exit code and standard +output. For more info about the plugin protocol, please refer to the +[node-problem-detector plugin interface proposal](https://docs.google.com/document/d/1jK_5YloSYtboj-DtfjmYKxfNnUxCAvohLnsH5aGCAYQ/edit#) \ No newline at end of file diff --git a/pkg/custompluginmonitor/custom_plugin_monitor.go b/pkg/custompluginmonitor/custom_plugin_monitor.go new file mode 100644 index 00000000..8c2a36ea --- /dev/null +++ b/pkg/custompluginmonitor/custom_plugin_monitor.go @@ -0,0 +1,167 @@ +/* +Copyright 2017 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package custompluginmonitor + +import ( + "encoding/json" + "io/ioutil" + "time" + + "github.com/golang/glog" + + "k8s.io/node-problem-detector/pkg/custompluginmonitor/plugin" + cpmtypes "k8s.io/node-problem-detector/pkg/custompluginmonitor/types" + "k8s.io/node-problem-detector/pkg/types" + "k8s.io/node-problem-detector/pkg/util/tomb" +) + +type customPluginMonitor struct { + config cpmtypes.CustomPluginConfig + conditions []types.Condition + plugin *plugin.Plugin + resultChan <-chan cpmtypes.Result + statusChan chan *types.Status + tomb *tomb.Tomb +} + +// NewCustomPluginMonitorOrDie create a new customPluginMonitor, panic if error occurs. +func NewCustomPluginMonitorOrDie(configPath string) types.Monitor { + c := &customPluginMonitor{ + tomb: tomb.NewTomb(), + } + f, err := ioutil.ReadFile(configPath) + if err != nil { + glog.Fatalf("Failed to read configuration file %q: %v", configPath, err) + } + err = json.Unmarshal(f, &c.config) + if err != nil { + glog.Fatalf("Failed to unmarshal configuration file %q: %v", configPath, err) + } + // Apply configurations + err = (&c.config).ApplyConfiguration() + if err != nil { + glog.Fatalf("Failed to apply configuration for %q: %v", configPath, err) + } + + // Validate configurations + err = c.config.Validate() + if err != nil { + glog.Fatalf("Failed to validate custom plugin config %+v: %v", c.config, err) + } + + glog.Infof("Finish parsing custom plugin monitor config file: %+v", c.config) + + c.plugin = plugin.NewPlugin(c.config) + // A 1000 size channel should be big enough. + c.statusChan = make(chan *types.Status, 1000) + return c +} + +func (c *customPluginMonitor) Start() (<-chan *types.Status, error) { + glog.Info("Start custom plugin monitor") + go c.plugin.Run() + go c.monitorLoop() + return c.statusChan, nil +} + +func (c *customPluginMonitor) Stop() { + glog.Info("Stop custom plugin monitor") + c.tomb.Stop() +} + +// monitorLoop is the main loop of log monitor. +func (c *customPluginMonitor) monitorLoop() { + c.initializeStatus() + + resultChan := c.plugin.GetResultChan() + + for { + select { + case result := <-resultChan: + glog.V(3).Infof("Receive new plugin result: %+v", result) + status := c.generateStatus(result) + glog.Infof("New status generated: %+v", status) + c.statusChan <- status + case <-c.tomb.Stopping(): + c.plugin.Stop() + glog.Infof("Custom plugin monitor stopped") + c.tomb.Done() + break + } + } +} + +// generateStatus generates status from the plugin check result. +func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Status { + timestamp := time.Now() + var events []types.Event + if result.Rule.Type == types.Temp { + // For temporary error only generate event when exit status is above warning + if result.ExitStatus >= cpmtypes.NonOK { + events = append(events, types.Event{ + Severity: types.Warn, + Timestamp: timestamp, + Reason: result.Rule.Reason, + Message: result.Message, + }) + } + } else { + // For permanent error changes the condition + for i := range c.conditions { + condition := &c.conditions[i] + if condition.Type == result.Rule.Condition { + status := result.ExitStatus >= cpmtypes.NonOK + if condition.Status != status || condition.Reason != result.Rule.Reason { + condition.Transition = timestamp + condition.Message = result.Message + } + condition.Status = status + condition.Reason = result.Rule.Reason + break + } + } + } + return &types.Status{ + Source: c.config.Source, + // TODO(random-liu): Aggregate events and conditions and then do periodically report. + Events: events, + Conditions: c.conditions, + } +} + +// initializeStatus initializes the internal condition and also reports it to the node problem detector. +func (c *customPluginMonitor) initializeStatus() { + // Initialize the default node conditions + c.conditions = initialConditions(c.config.DefaultConditions) + glog.Infof("Initialize condition generated: %+v", c.conditions) + // Update the initial status + c.statusChan <- &types.Status{ + Source: c.config.Source, + Conditions: c.conditions, + } +} + +func initialConditions(defaults []types.Condition) []types.Condition { + conditions := make([]types.Condition, len(defaults)) + copy(conditions, defaults) + for i := range conditions { + // TODO(random-liu): Validate default conditions + conditions[i].Status = false + conditions[i].Transition = time.Now() + } + return conditions +} diff --git a/pkg/custompluginmonitor/plugin/plugin.go b/pkg/custompluginmonitor/plugin/plugin.go new file mode 100644 index 00000000..71813a0e --- /dev/null +++ b/pkg/custompluginmonitor/plugin/plugin.go @@ -0,0 +1,147 @@ +/* +Copyright 2017 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "context" + "fmt" + "os/exec" + "strings" + "sync" + "syscall" + "time" + + "github.com/golang/glog" + cpmtypes "k8s.io/node-problem-detector/pkg/custompluginmonitor/types" + "k8s.io/node-problem-detector/pkg/util/tomb" +) + +type Plugin struct { + config cpmtypes.CustomPluginConfig + syncChan chan struct{} + resultChan chan cpmtypes.Result + tomb *tomb.Tomb + sync.WaitGroup +} + +func NewPlugin(config cpmtypes.CustomPluginConfig) *Plugin { + return &Plugin{ + config: config, + syncChan: make(chan struct{}, *config.PluginGlobalConfig.Concurrency), + // A 1000 size channel should be big enough. + resultChan: make(chan cpmtypes.Result, 1000), + tomb: tomb.NewTomb(), + } +} + +func (p *Plugin) GetResultChan() <-chan cpmtypes.Result { + return p.resultChan +} + +func (p *Plugin) Run() { + runTicker := time.NewTicker(*p.config.PluginGlobalConfig.InvokeInterval) + + for { + select { + case <-runTicker.C: + glog.Info("Start to run custom plugins") + + for _, rule := range p.config.Rules { + p.syncChan <- struct{}{} + p.Add(1) + + go func(rule *cpmtypes.CustomRule) { + defer p.Done() + defer func() { + <-p.syncChan + }() + + start := time.Now() + exitStatus, message := p.run(*rule) + end := time.Now() + + glog.V(3).Infof("Rule: %+v. Start time: %v. End time: %v. Duration: %v", rule, start, end, end.Sub(start)) + + result := cpmtypes.Result{ + Rule: rule, + ExitStatus: exitStatus, + Message: message, + } + + p.resultChan <- result + + glog.Infof("Add check result %+v for rule %+v", result, rule) + }(rule) + } + + p.Wait() + glog.Info("Finish running custom plugins") + case <-p.tomb.Stopping(): + glog.Info("Stopping plugin execution") + p.tomb.Done() + } + } +} + +func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, output string) { + var ctx context.Context + var cancel context.CancelFunc + + if rule.Timeout != nil && *rule.Timeout < *p.config.PluginGlobalConfig.Timeout { + ctx, cancel = context.WithTimeout(context.Background(), *rule.Timeout) + } else { + ctx, cancel = context.WithTimeout(context.Background(), *p.config.PluginGlobalConfig.Timeout) + } + defer cancel() + + cmd := exec.CommandContext(ctx, rule.Path) + stdout, err := cmd.Output() + if err != nil { + if _, ok := err.(*exec.ExitError); !ok { + glog.Errorf("Error in running plugin %q: error - %v. output - %q", rule.Path, err, string(stdout)) + return cpmtypes.Unknown, "Error in running plugin. Please check the error log" + } + } + + // trim suffix useless bytes + output = string(stdout) + output = strings.TrimSpace(output) + + if cmd.ProcessState.Sys().(syscall.WaitStatus).Signaled() { + output = fmt.Sprintf("Timeout when running plugin %q: state - %s. output - %q", rule.Path, cmd.ProcessState.String(), output) + } + + // cut at position max_output_length if stdout is longer than max_output_length bytes + if len(output) > *p.config.PluginGlobalConfig.MaxOutputLength { + output = output[:*p.config.PluginGlobalConfig.MaxOutputLength] + } + + exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus() + switch exitCode { + case 0: + return cpmtypes.OK, output + case 1: + return cpmtypes.NonOK, output + default: + return cpmtypes.Unknown, output + } +} + +func (p *Plugin) Stop() { + p.tomb.Stop() + glog.Info("Stop plugin execution") +} diff --git a/pkg/custompluginmonitor/plugin/plugin_test.go b/pkg/custompluginmonitor/plugin/plugin_test.go new file mode 100644 index 00000000..7503879c --- /dev/null +++ b/pkg/custompluginmonitor/plugin/plugin_test.go @@ -0,0 +1,109 @@ +/* +Copyright 2017 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "testing" + "time" + + cpmtypes "k8s.io/node-problem-detector/pkg/custompluginmonitor/types" +) + +func TestNewPluginRun(t *testing.T) { + ruleTimeout := 1 * time.Second + + utMetas := map[string]struct { + Rule cpmtypes.CustomRule + ExitStatus cpmtypes.Status + Output string + }{ + "ok": { + Rule: cpmtypes.CustomRule{ + Path: "./test-data/ok.sh", + Timeout: &ruleTimeout, + }, + ExitStatus: cpmtypes.OK, + Output: "OK", + }, + "non-ok": { + Rule: cpmtypes.CustomRule{ + Path: "./test-data/non-ok.sh", + Timeout: &ruleTimeout, + }, + ExitStatus: cpmtypes.NonOK, + Output: "NonOK", + }, + "unknown": { + Rule: cpmtypes.CustomRule{ + Path: "./test-data/unknown.sh", + Timeout: &ruleTimeout, + }, + ExitStatus: cpmtypes.Unknown, + Output: "UNKNOWN", + }, + "non executable": { + Rule: cpmtypes.CustomRule{ + Path: "./test-data/non-executable.sh", + Timeout: &ruleTimeout, + }, + ExitStatus: cpmtypes.Unknown, + Output: "Error in running plugin. Please check the error log", + }, + "longer than 80 stdout with ok exit status": { + Rule: cpmtypes.CustomRule{ + Path: "./test-data/longer-than-80-stdout-with-ok-exit-status.sh", + Timeout: &ruleTimeout, + }, + ExitStatus: cpmtypes.OK, + Output: "01234567890123456789012345678901234567890123456789012345678901234567890123456789", + }, + "non defined exit status": { + Rule: cpmtypes.CustomRule{ + Path: "./test-data/non-defined-exit-status.sh", + Timeout: &ruleTimeout, + }, + ExitStatus: cpmtypes.Unknown, + Output: "NON-DEFINED-EXIT-STATUS", + }, + "sleep 3 second with ok exit status": { + Rule: cpmtypes.CustomRule{ + Path: "./test-data/sleep-3-second-with-ok-exit-status.sh", + Timeout: &ruleTimeout, + }, + ExitStatus: cpmtypes.Unknown, + Output: `Timeout when running plugin "./test-data/sleep-3-second-with-ok-exit-status.sh": state - signal: killed. output - ""`, + }, + } + + conf := cpmtypes.CustomPluginConfig{} + (&conf).ApplyConfiguration() + p := Plugin{config: conf} + for desp, utMeta := range utMetas { + gotExitStatus, gotOutput := p.run(utMeta.Rule) + // cut at position max_output_length if expected output is longer than max_output_length bytes + if len(utMeta.Output) > *p.config.PluginGlobalConfig.MaxOutputLength { + utMeta.Output = utMeta.Output[:*p.config.PluginGlobalConfig.MaxOutputLength] + } + if gotExitStatus != utMeta.ExitStatus || gotOutput != utMeta.Output { + t.Errorf("%s", desp) + t.Errorf("Error in run plugin and get exit status and output for %q. "+ + "Got exit status: %v, Expected exit status: %v. "+ + "Got output: %q, Expected output: %q", + utMeta.Rule.Path, gotExitStatus, utMeta.ExitStatus, gotOutput, utMeta.Output) + } + } +} diff --git a/pkg/custompluginmonitor/plugin/test-data/longer-than-80-stdout-with-ok-exit-status.sh b/pkg/custompluginmonitor/plugin/test-data/longer-than-80-stdout-with-ok-exit-status.sh new file mode 100755 index 00000000..5ada303a --- /dev/null +++ b/pkg/custompluginmonitor/plugin/test-data/longer-than-80-stdout-with-ok-exit-status.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +echo "012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +exit 0 + diff --git a/pkg/custompluginmonitor/plugin/test-data/non-defined-exit-status.sh b/pkg/custompluginmonitor/plugin/test-data/non-defined-exit-status.sh new file mode 100755 index 00000000..fc510a03 --- /dev/null +++ b/pkg/custompluginmonitor/plugin/test-data/non-defined-exit-status.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +echo "NON-DEFINED-EXIT-STATUS" +exit 100 + diff --git a/pkg/custompluginmonitor/plugin/test-data/non-executable.sh b/pkg/custompluginmonitor/plugin/test-data/non-executable.sh new file mode 100644 index 00000000..11675db1 --- /dev/null +++ b/pkg/custompluginmonitor/plugin/test-data/non-executable.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +echo "NON-EXECUTABLE" +exit -1 + diff --git a/pkg/custompluginmonitor/plugin/test-data/non-ok.sh b/pkg/custompluginmonitor/plugin/test-data/non-ok.sh new file mode 100755 index 00000000..1916f5c1 --- /dev/null +++ b/pkg/custompluginmonitor/plugin/test-data/non-ok.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +echo "NonOK" +exit 1 + diff --git a/pkg/custompluginmonitor/plugin/test-data/ok.sh b/pkg/custompluginmonitor/plugin/test-data/ok.sh new file mode 100755 index 00000000..1dea4815 --- /dev/null +++ b/pkg/custompluginmonitor/plugin/test-data/ok.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +echo "OK" +exit 0 + diff --git a/pkg/custompluginmonitor/plugin/test-data/sleep-3-second-with-ok-exit-status.sh b/pkg/custompluginmonitor/plugin/test-data/sleep-3-second-with-ok-exit-status.sh new file mode 100755 index 00000000..a0dd23e8 --- /dev/null +++ b/pkg/custompluginmonitor/plugin/test-data/sleep-3-second-with-ok-exit-status.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +sleep 3 +echo "SLEEP 3 SECOND" +exit 0 + diff --git a/pkg/custompluginmonitor/plugin/test-data/unknown.sh b/pkg/custompluginmonitor/plugin/test-data/unknown.sh new file mode 100755 index 00000000..feff401a --- /dev/null +++ b/pkg/custompluginmonitor/plugin/test-data/unknown.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +echo "UNKNOWN" +exit 3 + diff --git a/pkg/custompluginmonitor/types/config.go b/pkg/custompluginmonitor/types/config.go new file mode 100644 index 00000000..ee33acd7 --- /dev/null +++ b/pkg/custompluginmonitor/types/config.go @@ -0,0 +1,132 @@ +/* +Copyright 2017 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import ( + "fmt" + "os" + "time" + + "k8s.io/node-problem-detector/pkg/types" +) + +var ( + defaultGlobalTimeout = 5 * time.Second + defaultGlobalTimeoutString = defaultGlobalTimeout.String() + defaultInvokeInterval = 30 * time.Second + defaultInvokeIntervalString = defaultInvokeInterval.String() + defaultMaxOutputLength = 80 + defaultConcurrency = 3 + + customPluginName = "custom" +) + +type pluginGlobalConfig struct { + // InvokeIntervalString is the interval string at which plugins will be invoked. + InvokeIntervalString *string `json:"invoke_interval, omitempty"` + // TimeoutString is the global plugin execution timeout string. + TimeoutString *string `json:"timeout, omitempty"` + // InvokeInterval is the interval at which plugins will be invoked. + InvokeInterval *time.Duration `json:"-"` + // Timeout is the global plugin execution timeout. + Timeout *time.Duration `json:"-"` + // MaxOutputLength is the maximum plugin output message length. + MaxOutputLength *int `json:"max_output_length, omitempty"` + // Concurrency is the number of concurrent running plugins. + Concurrency *int `json:"concurrency, omitempty"` +} + +// Custom plugin config is the configuration of custom plugin monitor. +type CustomPluginConfig struct { + // Plugin is the name of plugin which is currently used. + // Currently supported: custom. + Plugin string `json:"plugin, omitempty"` + // PluginConfig is global plugin configuration. + PluginGlobalConfig pluginGlobalConfig `json:"pluginConfig, omitempty"` + // Source is the source name of the custom plugin monitor + Source string `json:"source"` + // DefaultConditions are the default states of all the conditions custom plugin monitor should handle. + DefaultConditions []types.Condition `json:"conditions"` + // Rules are the rules custom plugin monitor will follow to parse and invoke plugins. + Rules []*CustomRule `json:"rules"` +} + +// ApplyConfiguration applies default configurations. +func (cpc *CustomPluginConfig) ApplyConfiguration() error { + if cpc.PluginGlobalConfig.TimeoutString == nil { + cpc.PluginGlobalConfig.TimeoutString = &defaultGlobalTimeoutString + } + + timeout, err := time.ParseDuration(*cpc.PluginGlobalConfig.TimeoutString) + if err != nil { + return fmt.Errorf("error in parsing global timeout %q: %v", *cpc.PluginGlobalConfig.TimeoutString, err) + } + + cpc.PluginGlobalConfig.Timeout = &timeout + + if cpc.PluginGlobalConfig.InvokeIntervalString == nil { + cpc.PluginGlobalConfig.InvokeIntervalString = &defaultInvokeIntervalString + } + + invoke_interval, err := time.ParseDuration(*cpc.PluginGlobalConfig.InvokeIntervalString) + if err != nil { + return fmt.Errorf("error in parsing invoke interval %q: %v", *cpc.PluginGlobalConfig.InvokeIntervalString, err) + } + + cpc.PluginGlobalConfig.InvokeInterval = &invoke_interval + + if cpc.PluginGlobalConfig.MaxOutputLength == nil { + cpc.PluginGlobalConfig.MaxOutputLength = &defaultMaxOutputLength + } + if cpc.PluginGlobalConfig.Concurrency == nil { + cpc.PluginGlobalConfig.Concurrency = &defaultConcurrency + } + + for _, rule := range cpc.Rules { + if rule.TimeoutString != nil { + timeout, err := time.ParseDuration(*rule.TimeoutString) + if err != nil { + return fmt.Errorf("error in parsing rule timeout %+v: %v", rule, err) + } + rule.Timeout = &timeout + } + } + + return nil +} + +// Validate verifies whether the settings in CustomPluginConfig are valid. +func (cpc CustomPluginConfig) Validate() error { + if cpc.Plugin != customPluginName { + return fmt.Errorf("NPD does not support %q plugin for now. Only support \"custom\"", cpc.Plugin) + } + + for _, rule := range cpc.Rules { + if rule.Timeout != nil && *rule.Timeout > *cpc.PluginGlobalConfig.Timeout { + return fmt.Errorf("plugin timeout is greater than global timeout. "+ + "Rule: %+v. Global timeout: %v", rule, cpc.PluginGlobalConfig.Timeout) + } + } + + for _, rule := range cpc.Rules { + if _, err := os.Stat(rule.Path); os.IsNotExist(err) { + return fmt.Errorf("rule path %q does not exist. Rule: %+v", rule.Path, rule) + } + } + + return nil +} diff --git a/pkg/custompluginmonitor/types/config_test.go b/pkg/custompluginmonitor/types/config_test.go new file mode 100644 index 00000000..41ff134c --- /dev/null +++ b/pkg/custompluginmonitor/types/config_test.go @@ -0,0 +1,247 @@ +/* +Copyright 2017 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import ( + "reflect" + "testing" + "time" +) + +func TestCustomPluginConfigApplyConfiguration(t *testing.T) { + globalTimeout := 6 * time.Second + globalTimeoutString := globalTimeout.String() + invokeInterval := 31 * time.Second + invokeIntervalString := invokeInterval.String() + maxOutputLength := 79 + concurrency := 2 + + ruleTimeout := 1 * time.Second + ruleTimeoutString := ruleTimeout.String() + + utMetas := map[string]struct { + Orig CustomPluginConfig + Wanted CustomPluginConfig + }{ + "global default settings": { + Orig: CustomPluginConfig{ + Rules: []*CustomRule{ + { + Path: "../plugin/test-data/ok.sh", + }, + { + Path: "../plugin/test-data/warning.sh", + TimeoutString: &ruleTimeoutString, + }, + }, + }, + Wanted: CustomPluginConfig{ + PluginGlobalConfig: pluginGlobalConfig{ + InvokeIntervalString: &defaultInvokeIntervalString, + InvokeInterval: &defaultInvokeInterval, + TimeoutString: &defaultGlobalTimeoutString, + Timeout: &defaultGlobalTimeout, + MaxOutputLength: &defaultMaxOutputLength, + Concurrency: &defaultConcurrency, + }, + Rules: []*CustomRule{ + { + Path: "../plugin/test-data/ok.sh", + }, + { + Path: "../plugin/test-data/warning.sh", + Timeout: &ruleTimeout, + TimeoutString: &ruleTimeoutString, + }, + }, + }, + }, + "custom invoke interval": { + Orig: CustomPluginConfig{ + PluginGlobalConfig: pluginGlobalConfig{ + InvokeIntervalString: &invokeIntervalString, + }, + }, + Wanted: CustomPluginConfig{ + PluginGlobalConfig: pluginGlobalConfig{ + InvokeIntervalString: &invokeIntervalString, + InvokeInterval: &invokeInterval, + TimeoutString: &defaultGlobalTimeoutString, + Timeout: &defaultGlobalTimeout, + MaxOutputLength: &defaultMaxOutputLength, + Concurrency: &defaultConcurrency, + }, + }, + }, + "custom default timeout": { + Orig: CustomPluginConfig{ + PluginGlobalConfig: pluginGlobalConfig{ + TimeoutString: &globalTimeoutString, + }, + }, + Wanted: CustomPluginConfig{ + PluginGlobalConfig: pluginGlobalConfig{ + InvokeIntervalString: &defaultInvokeIntervalString, + InvokeInterval: &defaultInvokeInterval, + TimeoutString: &globalTimeoutString, + Timeout: &globalTimeout, + MaxOutputLength: &defaultMaxOutputLength, + Concurrency: &defaultConcurrency, + }, + }, + }, + "custom max output length": { + Orig: CustomPluginConfig{ + PluginGlobalConfig: pluginGlobalConfig{ + MaxOutputLength: &maxOutputLength, + }, + }, + Wanted: CustomPluginConfig{ + PluginGlobalConfig: pluginGlobalConfig{ + InvokeIntervalString: &defaultInvokeIntervalString, + InvokeInterval: &defaultInvokeInterval, + TimeoutString: &defaultGlobalTimeoutString, + Timeout: &defaultGlobalTimeout, + MaxOutputLength: &maxOutputLength, + Concurrency: &defaultConcurrency, + }, + }, + }, + "custom concurrency": { + Orig: CustomPluginConfig{ + PluginGlobalConfig: pluginGlobalConfig{ + Concurrency: &concurrency, + }, + }, + Wanted: CustomPluginConfig{ + PluginGlobalConfig: pluginGlobalConfig{ + InvokeIntervalString: &defaultInvokeIntervalString, + InvokeInterval: &defaultInvokeInterval, + TimeoutString: &defaultGlobalTimeoutString, + Timeout: &defaultGlobalTimeout, + MaxOutputLength: &defaultMaxOutputLength, + Concurrency: &concurrency, + }, + }, + }, + } + + for desp, utMeta := range utMetas { + (&utMeta.Orig).ApplyConfiguration() + if !reflect.DeepEqual(utMeta.Orig, utMeta.Wanted) { + t.Errorf("Error in apply configuration for %q", desp) + t.Errorf("Wanted: %+v. \nGot: %+v", utMeta.Wanted, utMeta.Orig) + } + } +} + +func TestCustomPluginConfigValidate(t *testing.T) { + normalRuleTimeout := defaultGlobalTimeout - 1*time.Second + exceededRuleTimeout := defaultGlobalTimeout + 1*time.Second + + utMetas := map[string]struct { + Conf CustomPluginConfig + IsError bool + }{ + "normal": { + Conf: CustomPluginConfig{ + Plugin: customPluginName, + PluginGlobalConfig: pluginGlobalConfig{ + InvokeInterval: &defaultInvokeInterval, + Timeout: &defaultGlobalTimeout, + MaxOutputLength: &defaultMaxOutputLength, + Concurrency: &defaultConcurrency, + }, + Rules: []*CustomRule{ + { + Path: "../plugin/test-data/ok.sh", + Timeout: &normalRuleTimeout, + }, + }, + }, + IsError: false, + }, + "non exist plugin path": { + Conf: CustomPluginConfig{ + Plugin: customPluginName, + PluginGlobalConfig: pluginGlobalConfig{ + InvokeInterval: &defaultInvokeInterval, + Timeout: &defaultGlobalTimeout, + MaxOutputLength: &defaultMaxOutputLength, + Concurrency: &defaultConcurrency, + }, + Rules: []*CustomRule{ + { + Path: "../plugin/test-data/non-exist-plugin-path.sh", + Timeout: &normalRuleTimeout, + }, + }, + }, + IsError: true, + }, + "non supported plugin": { + // non supported plugin + Conf: CustomPluginConfig{ + Plugin: "non-supported-plugin", + PluginGlobalConfig: pluginGlobalConfig{ + InvokeInterval: &defaultInvokeInterval, + Timeout: &defaultGlobalTimeout, + MaxOutputLength: &defaultMaxOutputLength, + Concurrency: &defaultConcurrency, + }, + Rules: []*CustomRule{ + { + Path: "../plugin/test-data/non-exist-plugin-path.sh", + Timeout: &normalRuleTimeout, + }, + }, + }, + IsError: true, + }, + "exceed global timeout": { + // exceed global timeout + Conf: CustomPluginConfig{ + Plugin: customPluginName, + PluginGlobalConfig: pluginGlobalConfig{ + InvokeInterval: &defaultInvokeInterval, + Timeout: &defaultGlobalTimeout, + MaxOutputLength: &defaultMaxOutputLength, + Concurrency: &defaultConcurrency, + }, + Rules: []*CustomRule{ + { + Path: "../plugin/test-data/ok.sh", + Timeout: &exceededRuleTimeout, + }, + }, + }, + IsError: true, + }, + } + + for desp, utMeta := range utMetas { + err := utMeta.Conf.Validate() + if err != nil && !utMeta.IsError { + t.Error(desp) + t.Errorf("Error in validating custom plugin configuration %+v. Want an error got nil", utMeta) + } + if err == nil && utMeta.IsError { + t.Error(desp) + t.Errorf("Error in validating custom plugin configuration %+v. Want nil got an error", utMeta) + } + } +} diff --git a/pkg/custompluginmonitor/types/types.go b/pkg/custompluginmonitor/types/types.go new file mode 100644 index 00000000..eb5b3af2 --- /dev/null +++ b/pkg/custompluginmonitor/types/types.go @@ -0,0 +1,56 @@ +/* +Copyright 2017 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import ( + "k8s.io/node-problem-detector/pkg/types" + "time" +) + +type Status int + +const ( + OK Status = 0 + NonOK Status = 1 + Unknown Status = 2 +) + +// Result is the custom plugin check result returned by plugin. +type Result struct { + Rule *CustomRule + ExitStatus Status + Message string +} + +// CustomRule describes how custom plugin monitor should invoke and analyze plugins. +type CustomRule struct { + // Type is the type of the problem. + Type types.Type `json:"type"` + // Condition is the type of the condition the problem triggered. Notice that + // the Condition field should be set only when the problem is permanent, or + // else the field will be ignored. + Condition string `json:"condition"` + // Reason is the short reason of the problem. + Reason string `json:"reason"` + // Path is the path to the custom plugin. + Path string `json:"path"` + // Timeout is the timeout string for the custom plugin to execute. + TimeoutString *string `json:"timeout"` + // Timeout is the timeout for the custom plugin to execute. + Timeout *time.Duration `json:"-"` + // TODO(andyxning) Add support for per-rule interval. +} diff --git a/pkg/problemdetector/problem_detector.go b/pkg/problemdetector/problem_detector.go index 1a8262a5..d60d778d 100644 --- a/pkg/problemdetector/problem_detector.go +++ b/pkg/problemdetector/problem_detector.go @@ -26,7 +26,6 @@ import ( "k8s.io/node-problem-detector/pkg/condition" "k8s.io/node-problem-detector/pkg/problemclient" - "k8s.io/node-problem-detector/pkg/systemlogmonitor" "k8s.io/node-problem-detector/pkg/types" "k8s.io/node-problem-detector/pkg/util" ) @@ -40,12 +39,12 @@ type ProblemDetector interface { type problemDetector struct { client problemclient.Client conditionManager condition.ConditionManager - monitors map[string]systemlogmonitor.LogMonitor + monitors map[string]types.Monitor } // NewProblemDetector creates the problem detector. Currently we just directly passed in the problem daemons, but // in the future we may want to let the problem daemons register themselves. -func NewProblemDetector(monitors map[string]systemlogmonitor.LogMonitor, client problemclient.Client) ProblemDetector { +func NewProblemDetector(monitors map[string]types.Monitor, client problemclient.Client) ProblemDetector { return &problemDetector{ client: client, conditionManager: condition.NewConditionManager(client, clock.RealClock{}), diff --git a/pkg/systemlogmonitor/config.go b/pkg/systemlogmonitor/config.go index 1e8813ec..6c73f824 100644 --- a/pkg/systemlogmonitor/config.go +++ b/pkg/systemlogmonitor/config.go @@ -17,8 +17,10 @@ limitations under the License. package systemlogmonitor import ( + "regexp" + watchertypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" - logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" + systemlogtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" "k8s.io/node-problem-detector/pkg/types" ) @@ -33,15 +35,26 @@ type MonitorConfig struct { // DefaultConditions are the default states of all the conditions log monitor should handle. DefaultConditions []types.Condition `json:"conditions"` // Rules are the rules log monitor will follow to parse the log file. - Rules []logtypes.Rule `json:"rules"` + Rules []systemlogtypes.Rule `json:"rules"` } -// applyDefaultConfiguration applies default configurations. -func applyDefaultConfiguration(cfg *MonitorConfig) { - if cfg.BufferSize == 0 { - cfg.BufferSize = 10 +// ApplyConfiguration applies default configurations. +func (mc *MonitorConfig) ApplyDefaultConfiguration() { + if mc.BufferSize == 0 { + mc.BufferSize = 10 } - if cfg.WatcherConfig.Lookback == "" { - cfg.WatcherConfig.Lookback = "0" + if mc.WatcherConfig.Lookback == "" { + mc.WatcherConfig.Lookback = "0" } } + +// ValidateRules verifies whether the regular expressions in the rules are valid. +func (mc MonitorConfig) ValidateRules() error { + for _, rule := range mc.Rules { + _, err := regexp.Compile(rule.Pattern) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/systemlogmonitor/log_monitor.go b/pkg/systemlogmonitor/log_monitor.go index 188c95d2..9e292b8e 100644 --- a/pkg/systemlogmonitor/log_monitor.go +++ b/pkg/systemlogmonitor/log_monitor.go @@ -19,27 +19,18 @@ package systemlogmonitor import ( "encoding/json" "io/ioutil" - "regexp" "time" "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers" watchertypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" - "k8s.io/node-problem-detector/pkg/systemlogmonitor/util" + systemlogtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" "k8s.io/node-problem-detector/pkg/types" "github.com/golang/glog" + "k8s.io/node-problem-detector/pkg/util/tomb" ) -// LogMonitor monitors the log and reports node problem condition and event according to -// the rules. -type LogMonitor interface { - // Start starts the log monitor. - Start() (<-chan *types.Status, error) - // Stop stops the log monitor. - Stop() -} - type logMonitor struct { watcher watchertypes.LogWatcher buffer LogBuffer @@ -47,13 +38,13 @@ type logMonitor struct { conditions []types.Condition logCh <-chan *logtypes.Log output chan *types.Status - tomb *util.Tomb + tomb *tomb.Tomb } // NewLogMonitorOrDie create a new LogMonitor, panic if error occurs. -func NewLogMonitorOrDie(configPath string) LogMonitor { +func NewLogMonitorOrDie(configPath string) types.Monitor { l := &logMonitor{ - tomb: util.NewTomb(), + tomb: tomb.NewTomb(), } f, err := ioutil.ReadFile(configPath) if err != nil { @@ -64,10 +55,10 @@ func NewLogMonitorOrDie(configPath string) LogMonitor { glog.Fatalf("Failed to unmarshal configuration file %q: %v", configPath, err) } // Apply default configurations - applyDefaultConfiguration(&l.config) - err = validateRules(l.config.Rules) + (&l.config).ApplyDefaultConfiguration() + err = l.config.ValidateRules() if err != nil { - glog.Fatalf("Failed to validate matching rules %#v: %v", l.config.Rules, err) + glog.Fatalf("Failed to validate matching rules %+v: %v", l.config.Rules, err) } glog.Infof("Finish parsing log monitor config file: %+v", l.config) l.watcher = logwatchers.GetLogWatcherOrDie(l.config.WatcherConfig) @@ -126,12 +117,12 @@ func (l *logMonitor) parseLog(log *logtypes.Log) { } // generateStatus generates status from the logs. -func (l *logMonitor) generateStatus(logs []*logtypes.Log, rule logtypes.Rule) *types.Status { +func (l *logMonitor) generateStatus(logs []*logtypes.Log, rule systemlogtypes.Rule) *types.Status { // We use the timestamp of the first log line as the timestamp of the status. timestamp := logs[0].Timestamp message := generateMessage(logs) var events []types.Event - if rule.Type == logtypes.Temp { + if rule.Type == types.Temp { // For temporary error only generate event events = append(events, types.Event{ Severity: types.Warn, @@ -144,7 +135,6 @@ func (l *logMonitor) generateStatus(logs []*logtypes.Log, rule logtypes.Rule) *t for i := range l.conditions { condition := &l.conditions[i] if condition.Type == rule.Condition { - condition.Type = rule.Condition // Update transition timestamp and message when the condition // changes. Condition is considered to be changed only when // status or reason changes. @@ -189,17 +179,6 @@ func initialConditions(defaults []types.Condition) []types.Condition { return conditions } -// validateRules verifies whether the regular expressions in the rules are valid. -func validateRules(rules []logtypes.Rule) error { - for _, rule := range rules { - _, err := regexp.Compile(rule.Pattern) - if err != nil { - return err - } - } - return nil -} - func generateMessage(logs []*logtypes.Log) string { messages := []string{} for _, log := range logs { diff --git a/pkg/systemlogmonitor/log_monitor_test.go b/pkg/systemlogmonitor/log_monitor_test.go index 2bd47c2b..ddf9d2fc 100644 --- a/pkg/systemlogmonitor/log_monitor_test.go +++ b/pkg/systemlogmonitor/log_monitor_test.go @@ -67,7 +67,7 @@ func TestGenerateStatus(t *testing.T) { // Do not need Pattern because we don't do pattern match in this test { rule: logtypes.Rule{ - Type: logtypes.Perm, + Type: types.Perm, Condition: testConditionA, Reason: "test reason", }, @@ -88,7 +88,7 @@ func TestGenerateStatus(t *testing.T) { // Should not update transition time when status and reason are not changed. { rule: logtypes.Rule{ - Type: logtypes.Perm, + Type: types.Perm, Condition: testConditionA, Reason: "initial reason", }, @@ -107,7 +107,7 @@ func TestGenerateStatus(t *testing.T) { }, { rule: logtypes.Rule{ - Type: logtypes.Temp, + Type: types.Temp, Reason: "test reason", }, expected: types.Status{ diff --git a/pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go b/pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go index 922312a2..9366a1cd 100644 --- a/pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/filelog/log_watcher.go @@ -32,7 +32,7 @@ import ( "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" - "k8s.io/node-problem-detector/pkg/systemlogmonitor/util" + "k8s.io/node-problem-detector/pkg/util/tomb" ) type filelogWatcher struct { @@ -42,7 +42,7 @@ type filelogWatcher struct { translator *translator logCh chan *logtypes.Log uptime time.Time - tomb *util.Tomb + tomb *tomb.Tomb clock utilclock.Clock } @@ -57,7 +57,7 @@ func NewSyslogWatcherOrDie(cfg types.WatcherConfig) types.LogWatcher { cfg: cfg, translator: newTranslatorOrDie(cfg.PluginConfig), uptime: time.Now().Add(time.Duration(-info.Uptime * int64(time.Second))), - tomb: util.NewTomb(), + tomb: tomb.NewTomb(), // A capacity 1000 buffer should be enough logCh: make(chan *logtypes.Log, 1000), clock: utilclock.NewClock(), diff --git a/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go b/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go index f31b6bb1..61c9aa26 100644 --- a/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go @@ -30,7 +30,7 @@ import ( "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" - "k8s.io/node-problem-detector/pkg/systemlogmonitor/util" + "k8s.io/node-problem-detector/pkg/util/tomb" ) // Compiling go-systemd/sdjournald needs libsystemd-dev or libsystemd-journal-dev, @@ -43,14 +43,14 @@ type journaldWatcher struct { journal *sdjournal.Journal cfg types.WatcherConfig logCh chan *logtypes.Log - tomb *util.Tomb + tomb *tomb.Tomb } // NewJournaldWatcher is the create function of journald watcher. func NewJournaldWatcher(cfg types.WatcherConfig) types.LogWatcher { return &journaldWatcher{ cfg: cfg, - tomb: util.NewTomb(), + tomb: tomb.NewTomb(), // A capacity 1000 buffer should be enough logCh: make(chan *logtypes.Log, 1000), } diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go index 6d392d2e..38e0f226 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go @@ -27,13 +27,13 @@ import ( "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" - "k8s.io/node-problem-detector/pkg/systemlogmonitor/util" + "k8s.io/node-problem-detector/pkg/util/tomb" ) type kernelLogWatcher struct { cfg types.WatcherConfig logCh chan *logtypes.Log - tomb *util.Tomb + tomb *tomb.Tomb kmsgParser kmsgparser.Parser clock utilclock.Clock @@ -43,7 +43,7 @@ type kernelLogWatcher struct { func NewKmsgWatcher(cfg types.WatcherConfig) types.LogWatcher { return &kernelLogWatcher{ cfg: cfg, - tomb: util.NewTomb(), + tomb: tomb.NewTomb(), // Arbitrary capacity logCh: make(chan *logtypes.Log, 100), clock: utilclock.NewClock(), diff --git a/pkg/systemlogmonitor/logwatchers/types/log_watcher.go b/pkg/systemlogmonitor/logwatchers/types/log_watcher.go index 4dc1d72a..afb2c510 100644 --- a/pkg/systemlogmonitor/logwatchers/types/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/types/log_watcher.go @@ -31,7 +31,7 @@ type LogWatcher interface { // WatcherConfig is the configuration of the log watcher. type WatcherConfig struct { // Plugin is the name of plugin which is currently used. - // Currently supported: filelog, journald. + // Currently supported: filelog, journald, kmsg. Plugin string `json:"plugin, omitempty"` // PluginConfig is a key/value configuration of a plugin. Valid configurations // are defined in different log watcher plugin. diff --git a/pkg/systemlogmonitor/types/types.go b/pkg/systemlogmonitor/types/types.go index b20a8df2..c2494b86 100644 --- a/pkg/systemlogmonitor/types/types.go +++ b/pkg/systemlogmonitor/types/types.go @@ -17,6 +17,7 @@ limitations under the License. package types import ( + "k8s.io/node-problem-detector/pkg/types" "time" ) @@ -27,20 +28,10 @@ type Log struct { Message string } -// Type is the type of the problem. -type Type string - -const ( - // Temp means the problem is temporary, only need to report an event. - Temp Type = "temporary" - // Perm means the problem is permanent, need to change the node condition. - Perm Type = "permanent" -) - // Rule describes how log monitor should analyze the log. type Rule struct { // Type is the type of matched problem. - Type Type `json:"type"` + Type types.Type `json:"type"` // Condition is the type of the condition the problem triggered. Notice that // the Condition field should be set only when the problem is permanent, or // else the field will be ignored. diff --git a/pkg/types/types.go b/pkg/types/types.go index d1e64ab3..5016eeb3 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -76,3 +76,22 @@ type Status struct { // newest node conditions in this field. Conditions []Condition `json:"conditions"` } + +// Type is the type of the problem. +type Type string + +const ( + // Temp means the problem is temporary, only need to report an event. + Temp Type = "temporary" + // Perm means the problem is permanent, need to change the node condition. + Perm Type = "permanent" +) + +// Monitor monitors log and custom plugins and reports node problem condition and event according to +// the rules. +type Monitor interface { + // Start starts the log monitor. + Start() (<-chan *Status, error) + // Stop stops the log monitor. + Stop() +} diff --git a/pkg/systemlogmonitor/util/tomb.go b/pkg/util/tomb/tomb.go similarity index 98% rename from pkg/systemlogmonitor/util/tomb.go rename to pkg/util/tomb/tomb.go index 9df2aa06..12efa930 100644 --- a/pkg/systemlogmonitor/util/tomb.go +++ b/pkg/util/tomb/tomb.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package tomb // Tomb is used to control the lifecycle of a goroutine. type Tomb struct { diff --git a/pkg/systemlogmonitor/util/tomb_test.go b/pkg/util/tomb/tomb_test.go similarity index 98% rename from pkg/systemlogmonitor/util/tomb_test.go rename to pkg/util/tomb/tomb_test.go index 6a3ba144..765944dc 100644 --- a/pkg/systemlogmonitor/util/tomb_test.go +++ b/pkg/util/tomb/tomb_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package tomb import ( "reflect"