Report metrics from custom-plugin-monitor

This commit is contained in:
Xuewei Zhang
2019-07-22 16:48:44 -07:00
parent fbebcf311b
commit 94af7de97b
8 changed files with 110 additions and 26 deletions

View File

@@ -8,6 +8,7 @@
"enable_message_change_based_condition_update": false
},
"source": "ntp-custom-plugin-monitor",
"metricsReporting": true,
"conditions": [
{
"type": "NTPProblem",

View File

@@ -7,6 +7,7 @@
"concurrency": 1
},
"source": "kernel-monitor",
"metricsReporting": true,
"conditions": [
{
"type": "FrequentUnregisterNetDevice",

View File

@@ -7,6 +7,7 @@
"concurrency": 3
},
"source": "network-custom-plugin-monitor",
"metricsReporting": true,
"conditions": [],
"rules": [
{

View File

@@ -7,6 +7,7 @@
"concurrency": 1
},
"source": "systemd-monitor",
"metricsReporting": true,
"conditions": [
{
"type": "FrequentKubeletRestart",

View File

@@ -26,6 +26,7 @@ import (
"k8s.io/node-problem-detector/pkg/custompluginmonitor/plugin"
cpmtypes "k8s.io/node-problem-detector/pkg/custompluginmonitor/types"
"k8s.io/node-problem-detector/pkg/problemdaemon"
"k8s.io/node-problem-detector/pkg/problemmetrics"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util"
"k8s.io/node-problem-detector/pkg/util/tomb"
@@ -80,9 +81,31 @@ func NewCustomPluginMonitorOrDie(configPath string) types.Monitor {
c.plugin = plugin.NewPlugin(c.config)
// A 1000 size channel should be big enough.
c.statusChan = make(chan *types.Status, 1000)
if *c.config.EnableMetricsReporting {
initializeProblemMetricsOrDie(c.config.Rules)
}
return c
}
// initializeProblemMetricsOrDie creates problem metrics for all problems and set the value to 0,
// panic if error occurs.
func initializeProblemMetricsOrDie(rules []*cpmtypes.CustomRule) {
for _, rule := range rules {
if rule.Type == types.Perm {
err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(rule.Condition, rule.Reason, false)
if err != nil {
glog.Fatalf("Failed to initialize problem gauge metrics for problem %q, reason %q: %v",
rule.Condition, rule.Reason, err)
}
}
err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(rule.Reason, 0)
if err != nil {
glog.Fatalf("Failed to initialize problem counter metrics for %q: %v", rule.Reason, err)
}
}
}
func (c *customPluginMonitor) Start() (<-chan *types.Status, error) {
glog.Info("Start custom plugin monitor")
go c.plugin.Run()
@@ -120,11 +143,12 @@ func (c *customPluginMonitor) monitorLoop() {
// generateStatus generates status from the plugin check result.
func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Status {
timestamp := time.Now()
var events []types.Event
var activeProblemEvents []types.Event
var inactiveProblemEvents []types.Event
if result.Rule.Type == types.Temp {
// For temporary error only generate event when exit status is above warning
if result.ExitStatus >= cpmtypes.NonOK {
events = append(events, types.Event{
activeProblemEvents = append(activeProblemEvents, types.Event{
Severity: types.Warn,
Timestamp: timestamp,
Reason: result.Rule.Reason,
@@ -151,7 +175,7 @@ func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Stat
}
}
events = append(events, util.GenerateConditionChangeEvent(
inactiveProblemEvents = append(inactiveProblemEvents, util.GenerateConditionChangeEvent(
condition.Type,
status,
defaultConditionReason,
@@ -165,7 +189,7 @@ func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Stat
// change 2: Condition status change from False/Unknown to True
condition.Transition = timestamp
condition.Message = result.Message
events = append(events, util.GenerateConditionChangeEvent(
activeProblemEvents = append(activeProblemEvents, util.GenerateConditionChangeEvent(
condition.Type,
status,
result.Rule.Reason,
@@ -178,7 +202,7 @@ func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Stat
// change 3: Condition status change from False to Unknown or vice versa
condition.Transition = timestamp
condition.Message = result.Message
events = append(events, util.GenerateConditionChangeEvent(
inactiveProblemEvents = append(inactiveProblemEvents, util.GenerateConditionChangeEvent(
condition.Type,
status,
result.Rule.Reason,
@@ -196,22 +220,46 @@ func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Stat
condition.Transition = timestamp
condition.Reason = result.Rule.Reason
condition.Message = result.Message
events = append(events, util.GenerateConditionChangeEvent(
updateEvent := util.GenerateConditionChangeEvent(
condition.Type,
status,
condition.Reason,
timestamp,
))
)
if condition.Status == types.True {
activeProblemEvents = append(activeProblemEvents, updateEvent)
} else {
inactiveProblemEvents = append(inactiveProblemEvents, updateEvent)
}
}
break
}
}
}
if *c.config.EnableMetricsReporting {
// Increment problem counter only for active problems which just got detected.
for _, event := range activeProblemEvents {
err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(
event.Reason, 1)
if err != nil {
glog.Errorf("Failed to update problem counter metrics for %q: %v",
event.Reason, err)
}
}
for _, condition := range c.conditions {
err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(
condition.Type, condition.Reason, condition.Status == types.True)
if err != nil {
glog.Errorf("Failed to update problem gauge metrics for problem %q, reason %q: %v",
condition.Type, condition.Reason, err)
}
}
}
return &types.Status{
Source: c.config.Source,
// TODO(random-liu): Aggregate events and conditions and then do periodically report.
Events: events,
Events: append(activeProblemEvents, inactiveProblemEvents...),
Conditions: c.conditions,
}
}

View File

@@ -32,6 +32,7 @@ var (
defaultMaxOutputLength = 80
defaultConcurrency = 3
defaultMessageChangeBasedConditionUpdate = false
defaultEnableMetricsReporting = true
customPluginName = "custom"
)
@@ -66,6 +67,8 @@ type CustomPluginConfig struct {
DefaultConditions []types.Condition `json:"conditions"`
// Rules are the rules custom plugin monitor will follow to parse and invoke plugins.
Rules []*CustomRule `json:"rules"`
// EnableMetricsReporting describes whether to report problems as metrics or not.
EnableMetricsReporting *bool `json:"metricsReporting,omitempty"`
}
// ApplyConfiguration applies default configurations.
@@ -112,6 +115,10 @@ func (cpc *CustomPluginConfig) ApplyConfiguration() error {
}
}
if cpc.EnableMetricsReporting == nil {
cpc.EnableMetricsReporting = &defaultEnableMetricsReporting
}
return nil
}

View File

@@ -30,6 +30,7 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
maxOutputLength := 79
concurrency := 2
messageChangeBasedConditionUpdate := true
disableMetricsReporting := false
ruleTimeout := 1 * time.Second
ruleTimeoutString := ruleTimeout.String()
@@ -60,6 +61,7 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
Concurrency: &defaultConcurrency,
EnableMessageChangeBasedConditionUpdate: &defaultMessageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &defaultEnableMetricsReporting,
Rules: []*CustomRule{
{
Path: "../plugin/test-data/ok.sh",
@@ -88,6 +90,7 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
Concurrency: &defaultConcurrency,
EnableMessageChangeBasedConditionUpdate: &defaultMessageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &defaultEnableMetricsReporting,
},
},
"custom default timeout": {
@@ -106,6 +109,7 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
Concurrency: &defaultConcurrency,
EnableMessageChangeBasedConditionUpdate: &defaultMessageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &defaultEnableMetricsReporting,
},
},
"custom max output length": {
@@ -124,6 +128,7 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
Concurrency: &defaultConcurrency,
EnableMessageChangeBasedConditionUpdate: &defaultMessageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &defaultEnableMetricsReporting,
},
},
"custom concurrency": {
@@ -142,6 +147,7 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
Concurrency: &concurrency,
EnableMessageChangeBasedConditionUpdate: &defaultMessageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &defaultEnableMetricsReporting,
},
},
"custom message change based condition update": {
@@ -160,6 +166,24 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
Concurrency: &defaultConcurrency,
EnableMessageChangeBasedConditionUpdate: &messageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &defaultEnableMetricsReporting,
},
},
"disable metrics reporting": {
Orig: CustomPluginConfig{
EnableMetricsReporting: &disableMetricsReporting,
},
Wanted: CustomPluginConfig{
PluginGlobalConfig: pluginGlobalConfig{
InvokeIntervalString: &defaultInvokeIntervalString,
InvokeInterval: &defaultInvokeInterval,
TimeoutString: &defaultGlobalTimeoutString,
Timeout: &defaultGlobalTimeout,
MaxOutputLength: &defaultMaxOutputLength,
Concurrency: &defaultConcurrency,
EnableMessageChangeBasedConditionUpdate: &defaultMessageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &disableMetricsReporting,
},
},
}

View File

@@ -157,6 +157,7 @@ func (l *logMonitor) generateStatus(logs []*logtypes.Log, rule systemlogtypes.Ru
timestamp := logs[0].Timestamp
message := generateMessage(logs)
var events []types.Event
var changedConditions []*types.Condition
if rule.Type == types.Temp {
// For temporary error only generate event
events = append(events, types.Event{
@@ -165,12 +166,6 @@ func (l *logMonitor) generateStatus(logs []*logtypes.Log, rule systemlogtypes.Ru
Reason: rule.Reason,
Message: message,
})
if *l.config.EnableMetricsReporting {
err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(rule.Reason, 1)
if err != nil {
glog.Errorf("Failed to update problem counter metrics for %q: %v", rule.Reason, err)
}
}
} else {
// For permanent error changes the condition
for i := range l.conditions {
@@ -188,26 +183,32 @@ func (l *logMonitor) generateStatus(logs []*logtypes.Log, rule systemlogtypes.Ru
rule.Reason,
timestamp,
))
if *l.config.EnableMetricsReporting {
err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(rule.Condition, rule.Reason, true)
if err != nil {
glog.Errorf("Failed to update problem gauge metrics for problem %q, reason %q: %v",
rule.Condition, rule.Reason, err)
}
err = problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(rule.Reason, 1)
if err != nil {
glog.Errorf("Failed to update problem counter metrics for %q: %v", rule.Reason, err)
}
}
}
condition.Status = types.True
condition.Reason = rule.Reason
changedConditions = append(changedConditions, condition)
break
}
}
}
if *l.config.EnableMetricsReporting {
for _, event := range events {
err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(event.Reason, 1)
if err != nil {
glog.Errorf("Failed to update problem counter metrics for %q: %v", event.Reason, err)
}
}
for _, condition := range changedConditions {
err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(
condition.Type, condition.Reason, condition.Status == types.True)
if err != nil {
glog.Errorf("Failed to update problem gauge metrics for problem %q, reason %q: %v",
condition.Type, condition.Reason, err)
}
}
}
return &types.Status{
Source: l.config.Source,
// TODO(random-liu): Aggregate events and conditions and then do periodically report.