mirror of
https://github.com/kubernetes/node-problem-detector.git
synced 2026-02-14 18:09:57 +00:00
Merge pull request #223 from wangzhen127/kubelet-crash
Detect kubelet and container runtime frequent restarts
This commit is contained in:
2
Makefile
2
Makefile
@@ -69,7 +69,7 @@ ifeq ($(ENABLE_JOURNALD), 1)
|
||||
endif
|
||||
|
||||
vet:
|
||||
go list ./... | grep -v "./vendor/*" | xargs go vet
|
||||
go list ./... | grep -v "./vendor/*" | xargs go vet $(BUILD_TAGS)
|
||||
|
||||
fmt:
|
||||
find . -type f -name "*.go" | grep -v "./vendor/*" | xargs gofmt -s -w -l
|
||||
|
||||
@@ -156,7 +156,8 @@ For example, to test [KernelMonitor](https://github.com/kubernetes/node-problem-
|
||||
|
||||
**Note**:
|
||||
- You can see more rule examples under [test/kernel_log_generator/problems](https://github.com/kubernetes/node-problem-detector/tree/master/test/kernel_log_generator/problems).
|
||||
- For [KernelMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json) message injection, all messages should have ```kernel: ``` prefix (also note there is a space after ```:```).
|
||||
- For [KernelMonitor](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json) message injection, all messages should have ```kernel: ``` prefix (also note there is a space after ```:```); or use [generator.sh](https://github.com/kubernetes/node-problem-detector/blob/master/test/kernel_log_generator/generator.sh).
|
||||
- To inject other logs into journald like systemd logs, use ```echo 'Some systemd message' | systemd-cat -t systemd```.
|
||||
|
||||
# Remedy Systems
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ func main() {
|
||||
fedo.AddFlags(pflag.CommandLine)
|
||||
pflag.Parse()
|
||||
|
||||
counter, err := logcounter.NewKmsgLogCounter(fedo)
|
||||
counter, err := logcounter.NewJournaldLogCounter(fedo)
|
||||
if err != nil {
|
||||
fmt.Print(err)
|
||||
os.Exit(int(types.Unknown))
|
||||
|
||||
@@ -29,14 +29,21 @@ func NewLogCounterOptions() *LogCounterOptions {
|
||||
// LogCounterOptions contains frequent event detector command line and application options.
|
||||
type LogCounterOptions struct {
|
||||
// command line options. See flag descriptions for the description
|
||||
Lookback string
|
||||
Pattern string
|
||||
Count int
|
||||
JournaldSource string
|
||||
LogPath string
|
||||
Lookback string
|
||||
Delay string
|
||||
Pattern string
|
||||
Count int
|
||||
}
|
||||
|
||||
// AddFlags adds log counter command line options to pflag.
|
||||
func (fedo *LogCounterOptions) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.StringVar(&fedo.JournaldSource, "journald-source", "", "The source configuration of journald, e.g., kernel, kubelet, dockerd, etc")
|
||||
fs.StringVar(&fedo.LogPath, "log-path", "", "The log path that log watcher looks up")
|
||||
fs.StringVar(&fedo.Lookback, "lookback", "", "The time log watcher looks up")
|
||||
fs.StringVar(&fedo.Delay, "delay", "",
|
||||
"The time duration log watcher delays after node boot time. This is useful when log watcher needs to wait for some time until the node is stable.")
|
||||
fs.StringVar(&fedo.Pattern, "pattern", "",
|
||||
"The regular expression to match the problem in log. The pattern must match to the end of the line.")
|
||||
fs.IntVar(&fedo.Count, "count", 1,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"plugin": "journald",
|
||||
"pluginConfig": {
|
||||
"source": "docker"
|
||||
"source": "dockerd"
|
||||
},
|
||||
"logPath": "/var/log/journal",
|
||||
"lookback": "5m",
|
||||
|
||||
@@ -21,6 +21,8 @@
|
||||
"reason": "UnregisterNetDevice",
|
||||
"path": "/home/kubernetes/bin/log-counter",
|
||||
"args": [
|
||||
"--journald-source=kernel",
|
||||
"--log-path=/var/log/journal",
|
||||
"--lookback=20m",
|
||||
"--count=3",
|
||||
"--pattern=unregister_netdevice: waiting for \\w+ to become free. Usage count = \\d+"
|
||||
|
||||
72
config/systemd-monitor-counter.json
Normal file
72
config/systemd-monitor-counter.json
Normal file
@@ -0,0 +1,72 @@
|
||||
{
|
||||
"plugin": "custom",
|
||||
"pluginConfig": {
|
||||
"invoke_interval": "5m",
|
||||
"timeout": "1m",
|
||||
"max_output_length": 80,
|
||||
"concurrency": 1
|
||||
},
|
||||
"source": "systemd-monitor",
|
||||
"conditions": [
|
||||
{
|
||||
"type": "FrequentKubeletRestart",
|
||||
"reason": "NoFrequentKubeletRestart",
|
||||
"message": "kubelet is functioning properly"
|
||||
},
|
||||
{
|
||||
"type": "FrequentDockerRestart",
|
||||
"reason": "NoFrequentDockerRestart",
|
||||
"message": "docker is functioning properly"
|
||||
},
|
||||
{
|
||||
"type": "FrequentContainerdRestart",
|
||||
"reason": "NoFrequentContainerdRestart",
|
||||
"message": "containerd is functioning properly"
|
||||
}
|
||||
],
|
||||
"rules": [
|
||||
{
|
||||
"type": "permanent",
|
||||
"condition": "FrequentKubeletRestart",
|
||||
"reason": "FrequentKubeletRestart",
|
||||
"path": "/home/kubernetes/bin/log-counter",
|
||||
"args": [
|
||||
"--journald-source=systemd",
|
||||
"--log-path=/var/log/journal",
|
||||
"--lookback=20m",
|
||||
"--delay=5m",
|
||||
"--count=5",
|
||||
"--pattern=Started Kubernetes kubelet."
|
||||
],
|
||||
"timeout": "1m"
|
||||
},
|
||||
{
|
||||
"type": "permanent",
|
||||
"condition": "FrequentDockerRestart",
|
||||
"reason": "FrequentDockerRestart",
|
||||
"path": "/home/kubernetes/bin/log-counter",
|
||||
"args": [
|
||||
"--journald-source=systemd",
|
||||
"--log-path=/var/log/journal",
|
||||
"--lookback=20m",
|
||||
"--count=5",
|
||||
"--pattern=Starting Docker Application Container Engine..."
|
||||
],
|
||||
"timeout": "1m"
|
||||
},
|
||||
{
|
||||
"type": "permanent",
|
||||
"condition": "FrequentContainerdRestart",
|
||||
"reason": "FrequentContainerdRestart",
|
||||
"path": "/home/kubernetes/bin/log-counter",
|
||||
"args": [
|
||||
"--journald-source=systemd",
|
||||
"--log-path=/var/log/journal",
|
||||
"--lookback=20m",
|
||||
"--count=5",
|
||||
"--pattern=Starting containerd container runtime..."
|
||||
],
|
||||
"timeout": "1m"
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -69,7 +69,7 @@ data:
|
||||
{
|
||||
"plugin": "journald",
|
||||
"pluginConfig": {
|
||||
"source": "docker"
|
||||
"source": "dockerd"
|
||||
},
|
||||
"logPath": "/var/log/journal",
|
||||
"lookback": "5m",
|
||||
|
||||
@@ -37,26 +37,26 @@ var (
|
||||
|
||||
type pluginGlobalConfig struct {
|
||||
// InvokeIntervalString is the interval string at which plugins will be invoked.
|
||||
InvokeIntervalString *string `json:"invoke_interval, omitempty"`
|
||||
InvokeIntervalString *string `json:"invoke_interval,omitempty"`
|
||||
// TimeoutString is the global plugin execution timeout string.
|
||||
TimeoutString *string `json:"timeout, omitempty"`
|
||||
TimeoutString *string `json:"timeout,omitempty"`
|
||||
// InvokeInterval is the interval at which plugins will be invoked.
|
||||
InvokeInterval *time.Duration `json:"-"`
|
||||
// Timeout is the global plugin execution timeout.
|
||||
Timeout *time.Duration `json:"-"`
|
||||
// MaxOutputLength is the maximum plugin output message length.
|
||||
MaxOutputLength *int `json:"max_output_length, omitempty"`
|
||||
MaxOutputLength *int `json:"max_output_length,omitempty"`
|
||||
// Concurrency is the number of concurrent running plugins.
|
||||
Concurrency *int `json:"concurrency, omitempty"`
|
||||
Concurrency *int `json:"concurrency,omitempty"`
|
||||
}
|
||||
|
||||
// Custom plugin config is the configuration of custom plugin monitor.
|
||||
type CustomPluginConfig struct {
|
||||
// Plugin is the name of plugin which is currently used.
|
||||
// Currently supported: custom.
|
||||
Plugin string `json:"plugin, omitempty"`
|
||||
Plugin string `json:"plugin,omitempty"`
|
||||
// PluginConfig is global plugin configuration.
|
||||
PluginGlobalConfig pluginGlobalConfig `json:"pluginConfig, omitempty"`
|
||||
PluginGlobalConfig pluginGlobalConfig `json:"pluginConfig,omitempty"`
|
||||
// Source is the source name of the custom plugin monitor
|
||||
Source string `json:"source"`
|
||||
// DefaultConditions are the default states of all the conditions custom plugin monitor should handle.
|
||||
|
||||
@@ -25,14 +25,15 @@ import (
|
||||
"k8s.io/node-problem-detector/cmd/logcounter/options"
|
||||
"k8s.io/node-problem-detector/pkg/logcounter/types"
|
||||
"k8s.io/node-problem-detector/pkg/systemlogmonitor"
|
||||
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/kmsg"
|
||||
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/journald"
|
||||
watchertypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
|
||||
systemtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
|
||||
)
|
||||
|
||||
const (
|
||||
bufferSize = 1000
|
||||
timeout = 1 * time.Second
|
||||
bufferSize = 1000
|
||||
timeout = 1 * time.Second
|
||||
journaldSourceKey = "source"
|
||||
)
|
||||
|
||||
type logCounter struct {
|
||||
@@ -42,11 +43,17 @@ type logCounter struct {
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
func NewKmsgLogCounter(options *options.LogCounterOptions) (types.LogCounter, error) {
|
||||
watcher := kmsg.NewKmsgWatcher(watchertypes.WatcherConfig{Lookback: options.Lookback})
|
||||
func NewJournaldLogCounter(options *options.LogCounterOptions) (types.LogCounter, error) {
|
||||
watcher := journald.NewJournaldWatcher(watchertypes.WatcherConfig{
|
||||
Plugin: "journald",
|
||||
PluginConfig: map[string]string{journaldSourceKey: options.JournaldSource},
|
||||
LogPath: options.LogPath,
|
||||
Lookback: options.Lookback,
|
||||
Delay: options.Delay,
|
||||
})
|
||||
logCh, err := watcher.Watch()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error watching kmsg: %v", err)
|
||||
return nil, fmt.Errorf("error watching journald: %v", err)
|
||||
}
|
||||
return &logCounter{
|
||||
logCh: logCh,
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
utilclock "code.cloudfoundry.org/clock"
|
||||
@@ -32,6 +31,7 @@ import (
|
||||
|
||||
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
|
||||
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
|
||||
"k8s.io/node-problem-detector/pkg/util"
|
||||
"k8s.io/node-problem-detector/pkg/util/tomb"
|
||||
)
|
||||
|
||||
@@ -41,7 +41,7 @@ type filelogWatcher struct {
|
||||
closer io.Closer
|
||||
translator *translator
|
||||
logCh chan *logtypes.Log
|
||||
uptime time.Time
|
||||
startTime time.Time
|
||||
tomb *tomb.Tomb
|
||||
clock utilclock.Clock
|
||||
}
|
||||
@@ -49,14 +49,19 @@ type filelogWatcher struct {
|
||||
// NewSyslogWatcherOrDie creates a new log watcher. The function panics
|
||||
// when encounters an error.
|
||||
func NewSyslogWatcherOrDie(cfg types.WatcherConfig) types.LogWatcher {
|
||||
var info syscall.Sysinfo_t
|
||||
if err := syscall.Sysinfo(&info); err != nil {
|
||||
glog.Fatalf("Failed to get system info: %v", err)
|
||||
uptime, err := util.GetUptimeDuration()
|
||||
if err != nil {
|
||||
glog.Fatalf("failed to get uptime: %v", err)
|
||||
}
|
||||
startTime, err := util.GetStartTime(time.Now(), uptime, cfg.Lookback, cfg.Delay)
|
||||
if err != nil {
|
||||
glog.Fatalf("failed to get start time: %v", err)
|
||||
}
|
||||
|
||||
return &filelogWatcher{
|
||||
cfg: cfg,
|
||||
translator: newTranslatorOrDie(cfg.PluginConfig),
|
||||
uptime: time.Now().Add(time.Duration(-info.Uptime * int64(time.Second))),
|
||||
startTime: startTime,
|
||||
tomb: tomb.NewTomb(),
|
||||
// A capacity 1000 buffer should be enough
|
||||
logCh: make(chan *logtypes.Log, 1000),
|
||||
@@ -96,11 +101,6 @@ func (s *filelogWatcher) watchLoop() {
|
||||
close(s.logCh)
|
||||
s.tomb.Done()
|
||||
}()
|
||||
lookback, err := time.ParseDuration(s.cfg.Lookback)
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to parse duration %q: %v", s.cfg.Lookback, err)
|
||||
}
|
||||
glog.Info("Lookback:", lookback)
|
||||
var buffer bytes.Buffer
|
||||
for {
|
||||
select {
|
||||
@@ -127,8 +127,9 @@ func (s *filelogWatcher) watchLoop() {
|
||||
glog.Warningf("Unable to parse line: %q, %v", line, err)
|
||||
continue
|
||||
}
|
||||
// If the log is older than look back duration or system boot time, discard it.
|
||||
if s.clock.Since(log.Timestamp) > lookback || log.Timestamp.Before(s.uptime) {
|
||||
// Discard messages before start time.
|
||||
if log.Timestamp.Before(s.startTime) {
|
||||
glog.V(5).Infof("Throwing away msg %q before start time: %v < %v", log.Message, log.Timestamp, s.startTime)
|
||||
continue
|
||||
}
|
||||
s.logCh <- log
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
|
||||
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
|
||||
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
|
||||
"k8s.io/node-problem-detector/pkg/util"
|
||||
|
||||
"code.cloudfoundry.org/clock/fakeclock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -45,18 +46,21 @@ func TestWatch(t *testing.T) {
|
||||
now := time.Date(time.Now().Year(), time.January, 2, 3, 4, 5, 0, time.Local)
|
||||
fakeClock := fakeclock.NewFakeClock(now)
|
||||
testCases := []struct {
|
||||
uptime time.Duration
|
||||
lookback string
|
||||
delay string
|
||||
log string
|
||||
logs []logtypes.Log
|
||||
uptime time.Time
|
||||
lookback string
|
||||
}{
|
||||
{
|
||||
// The start point is at the head of the log file.
|
||||
uptime: 0,
|
||||
lookback: "0",
|
||||
delay: "0",
|
||||
log: `Jan 2 03:04:05 kernel: [0.000000] 1
|
||||
Jan 2 03:04:06 kernel: [1.000000] 2
|
||||
Jan 2 03:04:07 kernel: [2.000000] 3
|
||||
`,
|
||||
lookback: "0",
|
||||
logs: []logtypes.Log{
|
||||
{
|
||||
Timestamp: now,
|
||||
@@ -74,11 +78,13 @@ Jan 2 03:04:07 kernel: [2.000000] 3
|
||||
},
|
||||
{
|
||||
// The start point is in the middle of the log file.
|
||||
uptime: 0,
|
||||
lookback: "0",
|
||||
delay: "0",
|
||||
log: `Jan 2 03:04:04 kernel: [0.000000] 1
|
||||
Jan 2 03:04:05 kernel: [1.000000] 2
|
||||
Jan 2 03:04:06 kernel: [2.000000] 3
|
||||
`,
|
||||
lookback: "0",
|
||||
logs: []logtypes.Log{
|
||||
{
|
||||
Timestamp: now,
|
||||
@@ -92,11 +98,13 @@ Jan 2 03:04:06 kernel: [2.000000] 3
|
||||
},
|
||||
{
|
||||
// The start point is at the end of the log file, but we look back.
|
||||
uptime: 2 * time.Second,
|
||||
lookback: "1s",
|
||||
delay: "0",
|
||||
log: `Jan 2 03:04:03 kernel: [0.000000] 1
|
||||
Jan 2 03:04:04 kernel: [1.000000] 2
|
||||
Jan 2 03:04:05 kernel: [2.000000] 3
|
||||
`,
|
||||
lookback: "1s",
|
||||
logs: []logtypes.Log{
|
||||
{
|
||||
Timestamp: now.Add(-time.Second),
|
||||
@@ -111,12 +119,13 @@ Jan 2 03:04:05 kernel: [2.000000] 3
|
||||
{
|
||||
// The start point is at the end of the log file, we look back, but
|
||||
// system rebooted at in the middle of the log file.
|
||||
uptime: time.Second,
|
||||
lookback: "2s",
|
||||
delay: "0",
|
||||
log: `Jan 2 03:04:03 kernel: [0.000000] 1
|
||||
Jan 2 03:04:04 kernel: [1.000000] 2
|
||||
Jan 2 03:04:05 kernel: [2.000000] 3
|
||||
`,
|
||||
uptime: time.Date(time.Now().Year(), time.January, 2, 3, 4, 4, 0, time.Local),
|
||||
lookback: "2s",
|
||||
logs: []logtypes.Log{
|
||||
{
|
||||
Timestamp: now.Add(-time.Second),
|
||||
@@ -146,8 +155,8 @@ Jan 2 03:04:05 kernel: [2.000000] 3
|
||||
LogPath: f.Name(),
|
||||
Lookback: test.lookback,
|
||||
})
|
||||
// Set the uptime.
|
||||
w.(*filelogWatcher).uptime = test.uptime
|
||||
// Set the startTime.
|
||||
w.(*filelogWatcher).startTime, _ = util.GetStartTime(fakeClock.Now(), test.uptime, test.lookback, test.delay)
|
||||
// Set the fake clock.
|
||||
w.(*filelogWatcher).clock = fakeClock
|
||||
logCh, err := w.Watch()
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-systemd/sdjournal"
|
||||
@@ -30,6 +29,7 @@ import (
|
||||
|
||||
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
|
||||
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
|
||||
"k8s.io/node-problem-detector/pkg/util"
|
||||
"k8s.io/node-problem-detector/pkg/util/tomb"
|
||||
)
|
||||
|
||||
@@ -40,17 +40,28 @@ import (
|
||||
|
||||
// journaldWatcher is the log watcher for journald.
|
||||
type journaldWatcher struct {
|
||||
journal *sdjournal.Journal
|
||||
cfg types.WatcherConfig
|
||||
logCh chan *logtypes.Log
|
||||
tomb *tomb.Tomb
|
||||
journal *sdjournal.Journal
|
||||
cfg types.WatcherConfig
|
||||
startTime time.Time
|
||||
logCh chan *logtypes.Log
|
||||
tomb *tomb.Tomb
|
||||
}
|
||||
|
||||
// NewJournaldWatcher is the create function of journald watcher.
|
||||
func NewJournaldWatcher(cfg types.WatcherConfig) types.LogWatcher {
|
||||
uptime, err := util.GetUptimeDuration()
|
||||
if err != nil {
|
||||
glog.Fatalf("failed to get uptime: %v", err)
|
||||
}
|
||||
startTime, err := util.GetStartTime(time.Now(), uptime, cfg.Lookback, cfg.Delay)
|
||||
if err != nil {
|
||||
glog.Fatalf("failed to get start time: %v", err)
|
||||
}
|
||||
|
||||
return &journaldWatcher{
|
||||
cfg: cfg,
|
||||
tomb: tomb.NewTomb(),
|
||||
cfg: cfg,
|
||||
startTime: startTime,
|
||||
tomb: tomb.NewTomb(),
|
||||
// A capacity 1000 buffer should be enough
|
||||
logCh: make(chan *logtypes.Log, 1000),
|
||||
}
|
||||
@@ -61,7 +72,7 @@ var _ types.WatcherCreateFunc = NewJournaldWatcher
|
||||
|
||||
// Watch starts the journal watcher.
|
||||
func (j *journaldWatcher) Watch() (<-chan *logtypes.Log, error) {
|
||||
journal, err := getJournal(j.cfg)
|
||||
journal, err := getJournal(j.cfg, j.startTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -81,6 +92,7 @@ const waitLogTimeout = 5 * time.Second
|
||||
|
||||
// watchLoop is the main watch loop of journald watcher.
|
||||
func (j *journaldWatcher) watchLoop() {
|
||||
startTimestamp := timeToJournalTimestamp(j.startTime)
|
||||
defer func() {
|
||||
if err := j.journal.Close(); err != nil {
|
||||
glog.Errorf("Failed to close journal client: %v", err)
|
||||
@@ -112,6 +124,12 @@ func (j *journaldWatcher) watchLoop() {
|
||||
continue
|
||||
}
|
||||
|
||||
if entry.RealtimeTimestamp < startTimestamp {
|
||||
glog.V(5).Infof("Throwing away journal entry %q before start time: %v < %v",
|
||||
entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE], entry.RealtimeTimestamp, startTimestamp)
|
||||
continue
|
||||
}
|
||||
|
||||
j.logCh <- translate(entry)
|
||||
}
|
||||
}
|
||||
@@ -125,17 +143,12 @@ const (
|
||||
)
|
||||
|
||||
// getJournal returns a journal client.
|
||||
func getJournal(cfg types.WatcherConfig) (*sdjournal.Journal, error) {
|
||||
func getJournal(cfg types.WatcherConfig, startTime time.Time) (*sdjournal.Journal, error) {
|
||||
// Get journal log path.
|
||||
path := defaultJournalLogPath
|
||||
if cfg.LogPath != "" {
|
||||
path = cfg.LogPath
|
||||
}
|
||||
// Get lookback duration.
|
||||
lookback, err := time.ParseDuration(cfg.Lookback)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse lookback duration %q: %v", cfg.Lookback, err)
|
||||
}
|
||||
// If the path doesn't present, NewJournalFromDir will create it instead of
|
||||
// returning error. So check the path existence ourselves.
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
@@ -146,24 +159,15 @@ func getJournal(cfg types.WatcherConfig) (*sdjournal.Journal, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create journal client from path %q: %v", path, err)
|
||||
}
|
||||
// Use system uptime if lookback duration is longer than it.
|
||||
// Ideally, we should use monotonic timestamp + boot id in journald. However, it doesn't seem
|
||||
// to work with go-system/journal package.
|
||||
// TODO(random-liu): Use monotonic timestamp + boot id.
|
||||
var info syscall.Sysinfo_t
|
||||
if err := syscall.Sysinfo(&info); err != nil {
|
||||
return nil, fmt.Errorf("failed to get system info: %v", err)
|
||||
// Seek journal client based on startTime.
|
||||
seekTime := startTime
|
||||
now := time.Now()
|
||||
if now.Before(seekTime) {
|
||||
seekTime = now
|
||||
}
|
||||
uptime := time.Duration(info.Uptime) * time.Second
|
||||
if lookback > uptime {
|
||||
lookback = uptime
|
||||
glog.Infof("Lookback changed to system uptime: %v", lookback)
|
||||
}
|
||||
// Seek journal client based on the lookback duration.
|
||||
start := time.Now().Add(-lookback)
|
||||
err = journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000))
|
||||
err = journal.SeekRealtimeUsec(timeToJournalTimestamp(seekTime))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to lookback %q: %v", lookback, err)
|
||||
return nil, fmt.Errorf("failed to seek journal at %v (now %v): %v", seekTime, now, err)
|
||||
}
|
||||
// Empty source is not allowed and treated as an error.
|
||||
source := cfg.PluginConfig[configSourceKey]
|
||||
@@ -190,3 +194,7 @@ func translate(entry *sdjournal.JournalEntry) *logtypes.Log {
|
||||
Message: message,
|
||||
}
|
||||
}
|
||||
|
||||
func timeToJournalTimestamp(t time.Time) uint64 {
|
||||
return uint64(t.UnixNano() / 1000)
|
||||
}
|
||||
|
||||
@@ -27,13 +27,15 @@ import (
|
||||
|
||||
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
|
||||
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
|
||||
"k8s.io/node-problem-detector/pkg/util"
|
||||
"k8s.io/node-problem-detector/pkg/util/tomb"
|
||||
)
|
||||
|
||||
type kernelLogWatcher struct {
|
||||
cfg types.WatcherConfig
|
||||
logCh chan *logtypes.Log
|
||||
tomb *tomb.Tomb
|
||||
cfg types.WatcherConfig
|
||||
startTime time.Time
|
||||
logCh chan *logtypes.Log
|
||||
tomb *tomb.Tomb
|
||||
|
||||
kmsgParser kmsgparser.Parser
|
||||
clock utilclock.Clock
|
||||
@@ -41,9 +43,19 @@ type kernelLogWatcher struct {
|
||||
|
||||
// NewKmsgWatcher creates a watcher which will read messages from /dev/kmsg
|
||||
func NewKmsgWatcher(cfg types.WatcherConfig) types.LogWatcher {
|
||||
uptime, err := util.GetUptimeDuration()
|
||||
if err != nil {
|
||||
glog.Fatalf("failed to get uptime: %v", err)
|
||||
}
|
||||
startTime, err := util.GetStartTime(time.Now(), uptime, cfg.Lookback, cfg.Delay)
|
||||
if err != nil {
|
||||
glog.Fatalf("failed to get start time: %v", err)
|
||||
}
|
||||
|
||||
return &kernelLogWatcher{
|
||||
cfg: cfg,
|
||||
tomb: tomb.NewTomb(),
|
||||
cfg: cfg,
|
||||
startTime: startTime,
|
||||
tomb: tomb.NewTomb(),
|
||||
// Arbitrary capacity
|
||||
logCh: make(chan *logtypes.Log, 100),
|
||||
clock: utilclock.NewClock(),
|
||||
@@ -62,12 +74,7 @@ func (k *kernelLogWatcher) Watch() (<-chan *logtypes.Log, error) {
|
||||
k.kmsgParser = parser
|
||||
}
|
||||
|
||||
lookback, err := time.ParseDuration(k.cfg.Lookback)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse lookback duration %q: %v", k.cfg.Lookback, err)
|
||||
}
|
||||
|
||||
go k.watchLoop(lookback)
|
||||
go k.watchLoop()
|
||||
return k.logCh, nil
|
||||
}
|
||||
|
||||
@@ -78,7 +85,7 @@ func (k *kernelLogWatcher) Stop() {
|
||||
}
|
||||
|
||||
// watchLoop is the main watch loop of kernel log watcher.
|
||||
func (k *kernelLogWatcher) watchLoop(lookback time.Duration) {
|
||||
func (k *kernelLogWatcher) watchLoop() {
|
||||
defer func() {
|
||||
close(k.logCh)
|
||||
k.tomb.Done()
|
||||
@@ -99,9 +106,9 @@ func (k *kernelLogWatcher) watchLoop(lookback time.Duration) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Discard too old messages
|
||||
if k.clock.Since(msg.Timestamp) > lookback {
|
||||
glog.V(5).Infof("Throwing away msg %v for being too old: %v > %v", msg.Message, msg.Timestamp.String(), lookback.String())
|
||||
// Discard messages before start time.
|
||||
if msg.Timestamp.Before(k.startTime) {
|
||||
glog.V(5).Infof("Throwing away msg %q before start time: %v < %v", msg.Message, msg.Timestamp, k.startTime)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
|
||||
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
|
||||
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
|
||||
"k8s.io/node-problem-detector/pkg/util"
|
||||
)
|
||||
|
||||
type mockKmsgParser struct {
|
||||
@@ -50,12 +51,17 @@ func TestWatch(t *testing.T) {
|
||||
now := time.Date(time.Now().Year(), time.January, 2, 3, 4, 5, 0, time.Local)
|
||||
fakeClock := fakeclock.NewFakeClock(now)
|
||||
testCases := []struct {
|
||||
uptime time.Duration
|
||||
lookback string
|
||||
delay string
|
||||
log *mockKmsgParser
|
||||
logs []logtypes.Log
|
||||
lookback string
|
||||
}{
|
||||
{
|
||||
// The start point is at the head of the log file.
|
||||
uptime: 0,
|
||||
lookback: "0",
|
||||
delay: "0",
|
||||
log: &mockKmsgParser{kmsgs: []kmsgparser.Message{
|
||||
{Message: "1", Timestamp: now.Add(0 * time.Second)},
|
||||
{Message: "2", Timestamp: now.Add(1 * time.Second)},
|
||||
@@ -75,10 +81,12 @@ func TestWatch(t *testing.T) {
|
||||
Message: "3",
|
||||
},
|
||||
},
|
||||
lookback: "0",
|
||||
},
|
||||
{
|
||||
// The start point is in the middle of the log file.
|
||||
uptime: 0,
|
||||
lookback: "0",
|
||||
delay: "0",
|
||||
log: &mockKmsgParser{kmsgs: []kmsgparser.Message{
|
||||
{Message: "1", Timestamp: now.Add(-1 * time.Second)},
|
||||
{Message: "2", Timestamp: now.Add(0 * time.Second)},
|
||||
@@ -94,16 +102,17 @@ func TestWatch(t *testing.T) {
|
||||
Message: "3",
|
||||
},
|
||||
},
|
||||
lookback: "0",
|
||||
},
|
||||
{
|
||||
// The start point is at the end of the log file, but we look back.
|
||||
uptime: 2 * time.Second,
|
||||
lookback: "1s",
|
||||
delay: "0",
|
||||
log: &mockKmsgParser{kmsgs: []kmsgparser.Message{
|
||||
{Message: "1", Timestamp: now.Add(-2 * time.Second)},
|
||||
{Message: "2", Timestamp: now.Add(-1 * time.Second)},
|
||||
{Message: "3", Timestamp: now.Add(0 * time.Second)},
|
||||
}},
|
||||
lookback: "1s",
|
||||
logs: []logtypes.Log{
|
||||
{
|
||||
Timestamp: now.Add(-time.Second),
|
||||
@@ -115,9 +124,32 @@ func TestWatch(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// The start point is at the end of the log file, but we look back up to start time.
|
||||
uptime: time.Second,
|
||||
lookback: "3s",
|
||||
delay: "0",
|
||||
log: &mockKmsgParser{kmsgs: []kmsgparser.Message{
|
||||
{Message: "1", Timestamp: now.Add(-3 * time.Second)},
|
||||
{Message: "2", Timestamp: now.Add(-2 * time.Second)},
|
||||
{Message: "3", Timestamp: now.Add(-1 * time.Second)},
|
||||
{Message: "4", Timestamp: now.Add(0 * time.Second)},
|
||||
}},
|
||||
logs: []logtypes.Log{
|
||||
{
|
||||
Timestamp: now.Add(-time.Second),
|
||||
Message: "3",
|
||||
},
|
||||
{
|
||||
Timestamp: now,
|
||||
Message: "4",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, test := range testCases {
|
||||
w := NewKmsgWatcher(types.WatcherConfig{Lookback: test.lookback})
|
||||
w.(*kernelLogWatcher).startTime, _ = util.GetStartTime(fakeClock.Now(), test.uptime, test.lookback, test.delay)
|
||||
w.(*kernelLogWatcher).clock = fakeClock
|
||||
w.(*kernelLogWatcher).kmsgParser = test.log
|
||||
logCh, err := w.Watch()
|
||||
@@ -130,7 +162,7 @@ func TestWatch(t *testing.T) {
|
||||
assert.Equal(t, &expected, got)
|
||||
}
|
||||
// The log channel should have already been drained
|
||||
// There could stil be future messages sent into the channel, but the chance is really slim.
|
||||
// There could still be future messages sent into the channel, but the chance is really slim.
|
||||
timeout := time.After(100 * time.Millisecond)
|
||||
select {
|
||||
case log := <-logCh:
|
||||
|
||||
@@ -32,14 +32,18 @@ type LogWatcher interface {
|
||||
type WatcherConfig struct {
|
||||
// Plugin is the name of plugin which is currently used.
|
||||
// Currently supported: filelog, journald, kmsg.
|
||||
Plugin string `json:"plugin, omitempty"`
|
||||
Plugin string `json:"plugin,omitempty"`
|
||||
// PluginConfig is a key/value configuration of a plugin. Valid configurations
|
||||
// are defined in different log watcher plugin.
|
||||
PluginConfig map[string]string `json:"pluginConfig, omitempty"`
|
||||
PluginConfig map[string]string `json:"pluginConfig,omitempty"`
|
||||
// LogPath is the path to the log
|
||||
LogPath string `json:"logPath, omitempty"`
|
||||
LogPath string `json:"logPath,omitempty"`
|
||||
// Lookback is the time log watcher looks up
|
||||
Lookback string `json:"lookback, omitempty"`
|
||||
Lookback string `json:"lookback,omitempty"`
|
||||
// Delay is the time duration log watcher delays after node boot time. This is
|
||||
// useful when the log watcher needs to wait for some time until the node
|
||||
// becomes stable.
|
||||
Delay string `json:"delay,omitempty"`
|
||||
}
|
||||
|
||||
// WatcherCreateFunc is the create function of a log watcher.
|
||||
|
||||
@@ -17,6 +17,7 @@ package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"k8s.io/node-problem-detector/pkg/types"
|
||||
@@ -31,3 +32,41 @@ func GenerateConditionChangeEvent(t string, status types.ConditionStatus, reason
|
||||
Message: fmt.Sprintf("Node condition %s is now: %s, reason: %s", t, status, reason),
|
||||
}
|
||||
}
|
||||
|
||||
func GetUptimeDuration() (time.Duration, error) {
|
||||
var info syscall.Sysinfo_t
|
||||
if err := syscall.Sysinfo(&info); err != nil {
|
||||
return 0, fmt.Errorf("failed to get system info: %v", err)
|
||||
}
|
||||
return time.Duration(info.Uptime) * time.Second, nil
|
||||
}
|
||||
|
||||
func GetStartTime(now time.Time, uptimeDuration time.Duration, lookbackStr string, delayStr string) (time.Time, error) {
|
||||
startTime := now.Add(-uptimeDuration)
|
||||
|
||||
// Delay startTime if delay duration is set, so that the log watcher can skip
|
||||
// the logs in delay duration and wait until the node is stable.
|
||||
if delayStr != "" {
|
||||
delay, err := time.ParseDuration(delayStr)
|
||||
if err != nil {
|
||||
return time.Time{}, fmt.Errorf("failed to parse delay duration %q: %v", delayStr, err)
|
||||
}
|
||||
// Notice that when delay > uptime, startTime is actually after now, which is fine.
|
||||
startTime = startTime.Add(delay)
|
||||
}
|
||||
|
||||
// Addjust startTime according to lookback duration
|
||||
lookbackStartTime := now
|
||||
if lookbackStr != "" {
|
||||
lookback, err := time.ParseDuration(lookbackStr)
|
||||
if err != nil {
|
||||
return time.Time{}, fmt.Errorf("failed to parse lookback duration %q: %v", lookbackStr, err)
|
||||
}
|
||||
lookbackStartTime = now.Add(-lookback)
|
||||
}
|
||||
if startTime.Before(lookbackStartTime) {
|
||||
startTime = lookbackStartTime
|
||||
}
|
||||
|
||||
return startTime, nil
|
||||
}
|
||||
|
||||
137
pkg/util/helpers_test.go
Normal file
137
pkg/util/helpers_test.go
Normal file
@@ -0,0 +1,137 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestGetStartTime(t *testing.T) {
|
||||
now := time.Now()
|
||||
testCases := []struct {
|
||||
name string
|
||||
uptime time.Duration
|
||||
lookback string
|
||||
delay string
|
||||
expectErr bool
|
||||
expectedStartTime time.Time
|
||||
}{
|
||||
{
|
||||
name: "bad lookback value",
|
||||
uptime: 0,
|
||||
lookback: "abc",
|
||||
delay: "",
|
||||
expectErr: true,
|
||||
expectedStartTime: time.Time{},
|
||||
},
|
||||
{
|
||||
name: "bad delay value",
|
||||
uptime: 0,
|
||||
lookback: "",
|
||||
delay: "abc",
|
||||
expectErr: true,
|
||||
expectedStartTime: time.Time{},
|
||||
},
|
||||
{
|
||||
name: "node is just up, no lookback and delay",
|
||||
uptime: 0,
|
||||
lookback: "",
|
||||
delay: "",
|
||||
expectErr: false,
|
||||
expectedStartTime: now,
|
||||
},
|
||||
{
|
||||
name: "no delay, lookback > uptime",
|
||||
uptime: 5 * time.Second,
|
||||
lookback: "7s",
|
||||
delay: "",
|
||||
expectErr: false,
|
||||
expectedStartTime: now.Add(-5 * time.Second),
|
||||
},
|
||||
{
|
||||
name: "no delay, lookback < uptime",
|
||||
uptime: 5 * time.Second,
|
||||
lookback: "3s",
|
||||
delay: "",
|
||||
expectErr: false,
|
||||
expectedStartTime: now.Add(-3 * time.Second),
|
||||
},
|
||||
{
|
||||
name: "no lookback, delay > uptime",
|
||||
uptime: 5 * time.Second,
|
||||
lookback: "",
|
||||
delay: "7s",
|
||||
expectErr: false,
|
||||
expectedStartTime: now.Add(2 * time.Second),
|
||||
},
|
||||
{
|
||||
name: "no lookback, delay < uptime",
|
||||
uptime: 5 * time.Second,
|
||||
lookback: "",
|
||||
delay: "3s",
|
||||
expectErr: false,
|
||||
expectedStartTime: now,
|
||||
},
|
||||
{
|
||||
name: "uptime < delay",
|
||||
uptime: 10 * time.Second,
|
||||
lookback: "6s",
|
||||
delay: "12s",
|
||||
expectErr: false,
|
||||
expectedStartTime: now.Add(2 * time.Second),
|
||||
},
|
||||
{
|
||||
name: "uptime > delay, uptime < lookback",
|
||||
uptime: 10 * time.Second,
|
||||
lookback: "12s",
|
||||
delay: "7s",
|
||||
expectErr: false,
|
||||
expectedStartTime: now.Add(-3 * time.Second),
|
||||
},
|
||||
{
|
||||
name: "uptime > delay, uptime > lookback, lookback > uptime - delay",
|
||||
uptime: 10 * time.Second,
|
||||
lookback: "6s",
|
||||
delay: "7s",
|
||||
expectErr: false,
|
||||
expectedStartTime: now.Add(-3 * time.Second),
|
||||
},
|
||||
{
|
||||
name: "uptime > delay, uptime > lookback, lookback < uptime - delay",
|
||||
uptime: 10 * time.Second,
|
||||
lookback: "2s",
|
||||
delay: "7s",
|
||||
expectErr: false,
|
||||
expectedStartTime: now.Add(-2 * time.Second),
|
||||
},
|
||||
}
|
||||
for _, test := range testCases {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
startTime, err := GetStartTime(now, test.uptime, test.lookback, test.delay)
|
||||
if test.expectErr && err == nil {
|
||||
t.Fatalf("Expect to get error, but got no returned error.")
|
||||
}
|
||||
if !test.expectErr && err != nil {
|
||||
t.Fatalf("Expect to get no error, but got returned error: %v", err)
|
||||
}
|
||||
if test.expectedStartTime != startTime {
|
||||
t.Fatalf("Expect to get start time %v, but got %v", test.expectedStartTime, startTime)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user