mirror of
https://github.com/kubernetes/node-problem-detector.git
synced 2026-02-19 20:40:07 +00:00
210 lines
5.9 KiB
Go
210 lines
5.9 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors All rights reserved.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package systemlogmonitor
|
|
|
|
import (
|
|
"encoding/json"
|
|
"io/ioutil"
|
|
"regexp"
|
|
"time"
|
|
|
|
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers"
|
|
watchertypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
|
|
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
|
|
"k8s.io/node-problem-detector/pkg/systemlogmonitor/util"
|
|
"k8s.io/node-problem-detector/pkg/types"
|
|
|
|
"github.com/golang/glog"
|
|
)
|
|
|
|
// LogMonitor monitors the log and reports node problem condition and event according to
|
|
// the rules.
|
|
type LogMonitor interface {
|
|
// Start starts the log monitor.
|
|
Start() (<-chan *types.Status, error)
|
|
// Stop stops the log monitor.
|
|
Stop()
|
|
}
|
|
|
|
type logMonitor struct {
|
|
watcher watchertypes.LogWatcher
|
|
buffer LogBuffer
|
|
config MonitorConfig
|
|
conditions []types.Condition
|
|
logCh <-chan *logtypes.Log
|
|
output chan *types.Status
|
|
tomb *util.Tomb
|
|
}
|
|
|
|
// NewLogMonitorOrDie create a new LogMonitor, panic if error occurs.
|
|
func NewLogMonitorOrDie(configPath string) LogMonitor {
|
|
l := &logMonitor{
|
|
tomb: util.NewTomb(),
|
|
}
|
|
f, err := ioutil.ReadFile(configPath)
|
|
if err != nil {
|
|
glog.Fatalf("Failed to read configuration file %q: %v", configPath, err)
|
|
}
|
|
err = json.Unmarshal(f, &l.config)
|
|
if err != nil {
|
|
glog.Fatalf("Failed to unmarshal configuration file %q: %v", configPath, err)
|
|
}
|
|
// Apply default configurations
|
|
applyDefaultConfiguration(&l.config)
|
|
err = validateRules(l.config.Rules)
|
|
if err != nil {
|
|
glog.Fatalf("Failed to validate matching rules %#v: %v", l.config.Rules, err)
|
|
}
|
|
glog.Infof("Finish parsing log monitor config file: %+v", l.config)
|
|
l.watcher = logwatchers.GetLogWatcherOrDie(l.config.WatcherConfig)
|
|
l.buffer = NewLogBuffer(l.config.BufferSize)
|
|
// A 1000 size channel should be big enough.
|
|
l.output = make(chan *types.Status, 1000)
|
|
return l
|
|
}
|
|
|
|
func (l *logMonitor) Start() (<-chan *types.Status, error) {
|
|
glog.Info("Start log monitor")
|
|
var err error
|
|
l.logCh, err = l.watcher.Watch()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
go l.monitorLoop()
|
|
return l.output, nil
|
|
}
|
|
|
|
func (l *logMonitor) Stop() {
|
|
glog.Info("Stop log monitor")
|
|
l.tomb.Stop()
|
|
}
|
|
|
|
// monitorLoop is the main loop of log monitor.
|
|
func (l *logMonitor) monitorLoop() {
|
|
defer l.tomb.Done()
|
|
l.initializeStatus()
|
|
for {
|
|
select {
|
|
case log := <-l.logCh:
|
|
l.parseLog(log)
|
|
case <-l.tomb.Stopping():
|
|
l.watcher.Stop()
|
|
glog.Infof("Log monitor stopped")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// parseLog parses one log line.
|
|
func (l *logMonitor) parseLog(log *logtypes.Log) {
|
|
// Once there is new log, log monitor will push it into the log buffer and try
|
|
// to match each rule. If any rule is matched, log monitor will report a status.
|
|
l.buffer.Push(log)
|
|
for _, rule := range l.config.Rules {
|
|
matched := l.buffer.Match(rule.Pattern)
|
|
if len(matched) == 0 {
|
|
continue
|
|
}
|
|
status := l.generateStatus(matched, rule)
|
|
glog.Infof("New status generated: %+v", status)
|
|
l.output <- status
|
|
}
|
|
}
|
|
|
|
// generateStatus generates status from the logs.
|
|
func (l *logMonitor) generateStatus(logs []*logtypes.Log, rule logtypes.Rule) *types.Status {
|
|
// We use the timestamp of the first log line as the timestamp of the status.
|
|
timestamp := logs[0].Timestamp
|
|
message := generateMessage(logs)
|
|
var events []types.Event
|
|
if rule.Type == logtypes.Temp {
|
|
// For temporary error only generate event
|
|
events = append(events, types.Event{
|
|
Severity: types.Warn,
|
|
Timestamp: timestamp,
|
|
Reason: rule.Reason,
|
|
Message: message,
|
|
})
|
|
} else {
|
|
// For permanent error changes the condition
|
|
for i := range l.conditions {
|
|
condition := &l.conditions[i]
|
|
if condition.Type == rule.Condition {
|
|
condition.Type = rule.Condition
|
|
// Update transition timestamp and message when the condition
|
|
// changes. Condition is considered to be changed only when
|
|
// status or reason changes.
|
|
if !condition.Status || condition.Reason != rule.Reason {
|
|
condition.Transition = timestamp
|
|
condition.Message = message
|
|
}
|
|
condition.Status = true
|
|
condition.Reason = rule.Reason
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return &types.Status{
|
|
Source: l.config.Source,
|
|
// TODO(random-liu): Aggregate events and conditions and then do periodically report.
|
|
Events: events,
|
|
Conditions: l.conditions,
|
|
}
|
|
}
|
|
|
|
// initializeStatus initializes the internal condition and also reports it to the node problem detector.
|
|
func (l *logMonitor) initializeStatus() {
|
|
// Initialize the default node conditions
|
|
l.conditions = initialConditions(l.config.DefaultConditions)
|
|
glog.Infof("Initialize condition generated: %+v", l.conditions)
|
|
// Update the initial status
|
|
l.output <- &types.Status{
|
|
Source: l.config.Source,
|
|
Conditions: l.conditions,
|
|
}
|
|
}
|
|
|
|
func initialConditions(defaults []types.Condition) []types.Condition {
|
|
conditions := make([]types.Condition, len(defaults))
|
|
copy(conditions, defaults)
|
|
for i := range conditions {
|
|
// TODO(random-liu): Validate default conditions
|
|
conditions[i].Status = false
|
|
conditions[i].Transition = time.Now()
|
|
}
|
|
return conditions
|
|
}
|
|
|
|
// validateRules verifies whether the regular expressions in the rules are valid.
|
|
func validateRules(rules []logtypes.Rule) error {
|
|
for _, rule := range rules {
|
|
_, err := regexp.Compile(rule.Pattern)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func generateMessage(logs []*logtypes.Log) string {
|
|
messages := []string{}
|
|
for _, log := range logs {
|
|
messages = append(messages, log.Message)
|
|
}
|
|
return concatLogs(messages)
|
|
}
|