diff --git a/cmd/node_problem_detector.go b/cmd/node_problem_detector.go index ac061954..ec0110e8 100644 --- a/cmd/node_problem_detector.go +++ b/cmd/node_problem_detector.go @@ -18,15 +18,15 @@ package main import ( "flag" + "fmt" "net/url" "os" + "github.com/golang/glog" + "k8s.io/node-problem-detector/pkg/kernelmonitor" "k8s.io/node-problem-detector/pkg/problemdetector" "k8s.io/node-problem-detector/pkg/version" - - "github.com/golang/glog" - "fmt" ) // TODO: Move flags to options directory. @@ -86,5 +86,7 @@ func main() { k := kernelmonitor.NewKernelMonitorOrDie(*kernelMonitorConfigPath) p := problemdetector.NewProblemDetector(k, *apiServerOverride, nodeName) - p.Run() + if err := p.Run(); err != nil { + glog.Fatalf("Problem detector failed with error: %v", err) + } } diff --git a/config/kernel-monitor.json b/config/kernel-monitor.json index ae88ee2c..6db7afc5 100644 --- a/config/kernel-monitor.json +++ b/config/kernel-monitor.json @@ -1,5 +1,6 @@ { - "logPath": "/log/kern.log", + "plugin": "journald", + "logPath": "/log/journal", "lookback": "10m", "startPattern": "Initializing cgroup subsys cpuset", "bufferSize": 10, diff --git a/node-problem-detector.yaml b/node-problem-detector.yaml index 149fb9ed..247f7fdd 100644 --- a/node-problem-detector.yaml +++ b/node-problem-detector.yaml @@ -26,6 +26,11 @@ spec: - name: log mountPath: /log readOnly: true + # Make sure node problem detector is in the same timezone + # with the host. + - name: localtime + mountPath: /etc/localtime + readOnly: true - name: config mountPath: /config readOnly: true @@ -34,6 +39,9 @@ spec: # Config `log` to your system log directory hostPath: path: /var/log/ + - name: localtime + hostPath: + path: /etc/localtime - name: config configMap: name: node-problem-detector-config diff --git a/pkg/kernelmonitor/config.go b/pkg/kernelmonitor/config.go new file mode 100644 index 00000000..cc14671e --- /dev/null +++ b/pkg/kernelmonitor/config.go @@ -0,0 +1,49 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kernelmonitor + +import ( + watchertypes "k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/types" + kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types" + "k8s.io/node-problem-detector/pkg/types" +) + +// MonitorConfig is the configuration of kernel monitor. +type MonitorConfig struct { + // WatcherConfig is the configuration of kernel log watcher. + watchertypes.WatcherConfig + // BufferSize is the size (in lines) of the log buffer. + BufferSize int `json:"bufferSize"` + // Source is the source name of the kernel monitor + Source string `json:"source"` + // DefaultConditions are the default states of all the conditions kernel monitor should handle. + DefaultConditions []types.Condition `json:"conditions"` + // Rules are the rules kernel monitor will follow to parse the log file. + Rules []kerntypes.Rule `json:"rules"` + // StartPattern is the pattern of the start line + StartPattern string `json:"startPattern, omitempty"` +} + +// applyDefaultConfiguration applies default configurations. +func applyDefaultConfiguration(cfg *MonitorConfig) { + if cfg.BufferSize == 0 { + cfg.BufferSize = 10 + } + if cfg.WatcherConfig.Lookback == "" { + cfg.WatcherConfig.Lookback = "0" + } +} diff --git a/pkg/kernelmonitor/kernel_log_watcher.go b/pkg/kernelmonitor/kernel_log_watcher.go deleted file mode 100644 index ea28c27c..00000000 --- a/pkg/kernelmonitor/kernel_log_watcher.go +++ /dev/null @@ -1,213 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kernelmonitor - -import ( - "bufio" - "bytes" - "fmt" - "io" - "os" - "strings" - "time" - - "k8s.io/node-problem-detector/pkg/kernelmonitor/translator" - "k8s.io/node-problem-detector/pkg/kernelmonitor/types" - "k8s.io/node-problem-detector/pkg/kernelmonitor/util" - - utilclock "code.cloudfoundry.org/clock" - "github.com/coreos/go-systemd/sdjournal" - "github.com/golang/glog" - "github.com/google/cadvisor/utils/tail" -) - -const ( - defaultKernelLogPath = "/var/log/kern.log" -) - -// WatcherConfig is the configuration of kernel log watcher. -type WatcherConfig struct { - // KernelLogPath is the path to the kernel log - KernelLogPath string `json:"logPath, omitempty"` - // StartPattern is the pattern of the start line - StartPattern string `json:"startPattern, omitempty"` - // Lookback is the time kernel watcher looks up - Lookback string `json:"lookback, omitempty"` -} - -// KernelLogWatcher watches and translates the kernel log. Once there is new log line, -// it will translate and report the log. -type KernelLogWatcher interface { - // Watch starts the kernel log watcher and returns a watch channel. - Watch() (<-chan *types.KernelLog, error) - // Stop stops the kernel log watcher. - Stop() -} - -type kernelLogWatcher struct { - // trans is the translator translates the log into internal format. - trans translator.Translator - cfg WatcherConfig - reader *bufio.Reader - logCh chan *types.KernelLog - tomb *util.Tomb - clock utilclock.Clock -} - -// NewKernelLogWatcher creates a new kernel log watcher. -func NewKernelLogWatcher(cfg WatcherConfig) KernelLogWatcher { - return &kernelLogWatcher{ - trans: translator.NewDefaultTranslator(), - cfg: cfg, - tomb: util.NewTomb(), - // A capacity 1000 buffer should be enough - logCh: make(chan *types.KernelLog, 1000), - clock: utilclock.NewClock(), - } -} - -func (k *kernelLogWatcher) Watch() (<-chan *types.KernelLog, error) { - path := defaultKernelLogPath - if k.cfg.KernelLogPath != "" { - path = k.cfg.KernelLogPath - } - // NOTE(random-liu): This is a hack. KernelMonitor doesn't support some OS distros e.g. GCI. Ideally, - // KernelMonitor should only run on nodes with supported OS distro. However, NodeProblemDetector is - // running as DaemonSet, it has to be deployed on each node (There is no node affinity support for - // DaemonSet now #22205). If some nodes have unsupported OS distro e.g. the OS distro of master node - // in gke/gce is GCI, KernelMonitor will keep throwing out error, and NodeProblemDetector will be - // restarted again and again. - // To avoid this, we decide to add this temporarily hack. When KernelMonitor can't find the kernel - // log file, it will print a log and then return nil channel and no error. Since nil channel will - // always be blocked, the NodeProblemDetector will block forever. - if _, err := os.Stat(path); os.IsNotExist(err) { - glog.Infof("kernel log %q is not found, kernel monitor doesn't support the os distro", path) - return nil, nil - } - // TODO(random-liu): Rate limit tail file. - // Notice that, kernel log watcher doesn't look back to the rolled out logs. - reader, err := getLogReader(path) - if err != nil { - return nil, err - } - k.reader = bufio.NewReader(reader) - glog.Info("Start watching kernel log") - go k.watchLoop() - return k.logCh, nil -} - -func (k *kernelLogWatcher) Stop() { - k.tomb.Stop() -} - -// watchLoop is the main watch loop of kernel log watcher. -func (k *kernelLogWatcher) watchLoop() { - defer func() { - close(k.logCh) - k.tomb.Done() - }() - lookback, err := parseDuration(k.cfg.Lookback) - if err != nil { - glog.Fatalf("failed to parse duration %q: %v", k.cfg.Lookback, err) - } - var buffer bytes.Buffer - for { - select { - case <-k.tomb.Stopping(): - glog.Infof("Stop watching kernel log") - return - default: - } - - line, err := k.reader.ReadString('\n') - if err != nil && err != io.EOF { - glog.Errorf("exiting kernel log watch with error: %v", err) - return - } - if err == io.EOF { - buffer.WriteString(line) - time.Sleep(100 * time.Millisecond) - continue - } - if line == "" { - time.Sleep(100 * time.Millisecond) - continue - } - if err == nil { - buffer.WriteString(line) - // trim `\n` - line = strings.TrimRight(buffer.String(), "\n") - buffer.Reset() - log, err := k.trans.Translate(line) - if err != nil { - glog.Infof("Unable to parse line: %q, %v", line, err) - continue - } - // If the log is older than look back duration, discard it. - if k.clock.Since(log.Timestamp) > lookback { - continue - } - k.logCh <- log - } - } -} - -// getLogReader gets a kernel log reader. -func getLogReader(path string) (io.Reader, error) { - if len(path) != 0 { - return tryLogFile(path) - } - return tryJournal() -} - -func tryJournal() (io.Reader, error) { - r, err := sdjournal.NewJournalReader(sdjournal.JournalReaderConfig{ - NumFromTail: uint64(0), - Matches: []sdjournal.Match{ - { - Field: sdjournal.SD_JOURNAL_FIELD_TRANSPORT, - Value: "kernel", - }, - }, - }) - if err != nil { - return nil, fmt.Errorf("error opening journal: %v", err) - } - if r == nil { - return nil, fmt.Errorf("got a nil reader") - } - glog.Info("Kernel log watcher use journal") - return r, nil -} - -func tryLogFile(path string) (io.Reader, error) { - tail, err := tail.NewTail(path) - if err != nil { - return nil, err - } - glog.Infof("Kernel log watcher use log file: %s", path) - time.Sleep(1000 * time.Millisecond) - return tail, nil -} - -func parseDuration(s string) (time.Duration, error) { - // If the duration is not configured, just return 0 by default - if s == "" { - return 0, nil - } - return time.ParseDuration(s) -} diff --git a/pkg/kernelmonitor/kernel_monitor.go b/pkg/kernelmonitor/kernel_monitor.go index 8d03bdb3..fccd4abf 100644 --- a/pkg/kernelmonitor/kernel_monitor.go +++ b/pkg/kernelmonitor/kernel_monitor.go @@ -22,6 +22,8 @@ import ( "regexp" "time" + "k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers" + watchertypes "k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/types" kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types" "k8s.io/node-problem-detector/pkg/kernelmonitor/util" "k8s.io/node-problem-detector/pkg/types" @@ -29,20 +31,6 @@ import ( "github.com/golang/glog" ) -// MonitorConfig is the configuration of kernel monitor. -type MonitorConfig struct { - // WatcherConfig is the configuration of kernel log watcher. - WatcherConfig - // BufferSize is the size (in lines) of the log buffer. - BufferSize int `json:"bufferSize"` - // Source is the source name of the kernel monitor - Source string `json:"source"` - // DefaultConditions are the default states of all the conditions kernel monitor should handle. - DefaultConditions []types.Condition `json:"conditions"` - // Rules are the rules kernel monitor will follow to parse the log file. - Rules []kerntypes.Rule `json:"rules"` -} - // KernelMonitor monitors the kernel log and reports node problem condition and event according to // the rules. type KernelMonitor interface { @@ -53,7 +41,7 @@ type KernelMonitor interface { } type kernelMonitor struct { - watcher KernelLogWatcher + watcher watchertypes.LogWatcher buffer LogBuffer config MonitorConfig conditions []types.Condition @@ -69,18 +57,23 @@ func NewKernelMonitorOrDie(configPath string) KernelMonitor { } f, err := ioutil.ReadFile(configPath) if err != nil { - panic(err) + glog.Fatalf("Failed to read configuration file %q: %v", configPath, err) } err = json.Unmarshal(f, &k.config) if err != nil { - panic(err) + glog.Fatalf("Failed to unmarshal configuration file %q: %v", configPath, err) } + // Apply default configurations + applyDefaultConfiguration(&k.config) err = validateRules(k.config.Rules) if err != nil { - panic(err) + glog.Fatalf("Failed to validate matching rules %#v: %v", k.config.Rules, err) } glog.Infof("Finish parsing log file: %+v", k.config) - k.watcher = NewKernelLogWatcher(k.config.WatcherConfig) + k.watcher, err = logwatchers.GetLogWatcher(k.config.WatcherConfig) + if err != nil { + glog.Fatalf("Failed to create log watcher with watcher config %#v: %v", k.config.WatcherConfig, err) + } k.buffer = NewLogBuffer(k.config.BufferSize) // A 1000 size channel should be big enough. k.output = make(chan *types.Status, 1000) diff --git a/pkg/kernelmonitor/logwatchers/journald/log_watcher.go b/pkg/kernelmonitor/logwatchers/journald/log_watcher.go new file mode 100644 index 00000000..7aecd5ac --- /dev/null +++ b/pkg/kernelmonitor/logwatchers/journald/log_watcher.go @@ -0,0 +1,163 @@ +// +build journald + +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package journald + +import ( + "fmt" + "strings" + "time" + + "github.com/coreos/go-systemd/sdjournal" + "github.com/golang/glog" + + "k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/types" + kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types" + "k8s.io/node-problem-detector/pkg/kernelmonitor/util" +) + +// Compiling go-systemd/sdjournald needs libsystemd-dev or libsystemd-journal-dev, +// which is not always available on all os distros and versions. +// So we add the build tag in this file, so that on unsupported os distro, user can +// disable this build tag. + +// journaldWatcher is the log watcher for journald. +type journaldWatcher struct { + journal *sdjournal.Journal + cfg types.WatcherConfig + logCh chan *kerntypes.KernelLog + tomb *util.Tomb +} + +// NewJournaldWatcher is the create function of journald watcher. +func NewJournaldWatcher(cfg types.WatcherConfig) types.LogWatcher { + return &journaldWatcher{ + cfg: cfg, + tomb: util.NewTomb(), + // A capacity 1000 buffer should be enough + logCh: make(chan *kerntypes.KernelLog, 1000), + } +} + +// Make sure NewJournaldWatcher is types.WatcherCreateFunc . +var _ types.WatcherCreateFunc = NewJournaldWatcher + +// Watch starts the journal watcher. +func (j *journaldWatcher) Watch() (<-chan *kerntypes.KernelLog, error) { + journal, err := getJournal(j.cfg) + if err != nil { + return nil, err + } + j.journal = journal + glog.Info("Start watching journald") + go j.watchLoop() + return j.logCh, nil +} + +// Stop stops the journald watcher. +func (j *journaldWatcher) Stop() { + j.tomb.Stop() +} + +// waitLogTimeout is the timeout waiting for new log. +const waitLogTimeout = 5 * time.Second + +// watchLoop is the main watch loop of journald watcher. +func (j *journaldWatcher) watchLoop() { + defer func() { + if err := j.journal.Close(); err != nil { + glog.Errorf("Failed to close journal client: %v", err) + } + j.tomb.Done() + }() + for { + select { + case <-j.tomb.Stopping(): + glog.Infof("Stop watching journald") + return + default: + } + // Get next log entry. + n, err := j.journal.Next() + if err != nil { + glog.Errorf("Failed to get next journal entry: %v", err) + continue + } + // If next reaches the end, wait for waitLogTimeout. + if n == 0 { + j.journal.Wait(waitLogTimeout) + continue + } + + entry, err := j.journal.GetEntry() + if err != nil { + glog.Errorf("failed to get journal entry: %v", err) + continue + } + + j.logCh <- translate(entry) + } +} + +// defaultJournalLogPath is the default path of journal log. +const defaultJournalLogPath = "/var/log/journal" + +// getJournal returns a journal client. +func getJournal(cfg types.WatcherConfig) (*sdjournal.Journal, error) { + // Get journal log path. + path := defaultJournalLogPath + if cfg.LogPath != "" { + path = cfg.LogPath + } + // Get lookback duration. + since, err := time.ParseDuration(cfg.Lookback) + if err != nil { + return nil, fmt.Errorf("failed to parse lookback duration %q: %v", cfg.Lookback, err) + } + // Get journal client from the log path. + journal, err := sdjournal.NewJournalFromDir(path) + if err != nil { + return nil, fmt.Errorf("failed to create journal client from path %q: %v", path, err) + } + // Seek journal client based on the lookback duration. + start := time.Now().Add(-since) + err = journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)) + if err != nil { + return nil, fmt.Errorf("failed to lookback %q: %v", since, err) + } + // TODO(random-liu): Make this configurable to support parsing other logs. + kernelMatch := sdjournal.Match{ + Field: sdjournal.SD_JOURNAL_FIELD_TRANSPORT, + Value: "kernel", + } + err = journal.AddMatch(kernelMatch.String()) + if err != nil { + return nil, fmt.Errorf("failed to add log filter %#v: %v", kernelMatch, err) + } + return journal, nil +} + +// translate translates journal entry into internal type. +func translate(entry *sdjournal.JournalEntry) *kerntypes.KernelLog { + timestamp := time.Unix(0, int64(time.Duration(entry.RealtimeTimestamp)*time.Microsecond)) + message := strings.TrimSpace(entry.Fields["MESSAGE"]) + return &kerntypes.KernelLog{ + Timestamp: timestamp, + Message: message, + } +} diff --git a/pkg/kernelmonitor/logwatchers/journald/log_watcher_test.go b/pkg/kernelmonitor/logwatchers/journald/log_watcher_test.go new file mode 100644 index 00000000..bb7957cf --- /dev/null +++ b/pkg/kernelmonitor/logwatchers/journald/log_watcher_test.go @@ -0,0 +1,64 @@ +// +build journald + +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package journald + +import ( + "testing" + "time" + + "github.com/coreos/go-systemd/sdjournal" + "github.com/stretchr/testify/assert" + + kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types" +) + +func TestTranslate(t *testing.T) { + testCases := []struct { + entry *sdjournal.JournalEntry + log *kerntypes.KernelLog + }{ + { + // has log message + entry: &sdjournal.JournalEntry{ + Fields: map[string]string{"MESSAGE": "log message"}, + RealtimeTimestamp: 123456789, + }, + log: &kerntypes.KernelLog{ + Timestamp: time.Unix(0, 123456789*1000), + Message: "log message", + }, + }, + { + // no log message + entry: &sdjournal.JournalEntry{ + Fields: map[string]string{}, + RealtimeTimestamp: 987654321, + }, + log: &kerntypes.KernelLog{ + Timestamp: time.Unix(0, 987654321*1000), + Message: "", + }, + }, + } + + for c, test := range testCases { + t.Logf("TestCase #%d: %#v", c+1, test) + assert.Equal(t, test.log, translate(test.entry)) + } +} diff --git a/pkg/kernelmonitor/logwatchers/log_watchers.go b/pkg/kernelmonitor/logwatchers/log_watchers.go new file mode 100644 index 00000000..198be47f --- /dev/null +++ b/pkg/kernelmonitor/logwatchers/log_watchers.go @@ -0,0 +1,43 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logwatchers + +import ( + "fmt" + + "k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/types" + + "github.com/golang/glog" +) + +// createFuncs is a table of createFuncs for all supported log watchers. +var createFuncs = map[string]types.WatcherCreateFunc{} + +// registerLogWatcher registers a createFunc for a log watcher. +func registerLogWatcher(name string, create types.WatcherCreateFunc) { + createFuncs[name] = create +} + +// GetLogWatcher get a log watcher based on the passed in configuration. +func GetLogWatcher(config types.WatcherConfig) (types.LogWatcher, error) { + create, ok := createFuncs[config.Plugin] + if !ok { + return nil, fmt.Errorf("no create function found for plugin %q", config.Plugin) + } + glog.Infof("Use log watcher of plugin %q", config.Plugin) + return create(config), nil +} diff --git a/pkg/kernelmonitor/logwatchers/register_journald.go b/pkg/kernelmonitor/logwatchers/register_journald.go new file mode 100644 index 00000000..9d37f1c7 --- /dev/null +++ b/pkg/kernelmonitor/logwatchers/register_journald.go @@ -0,0 +1,30 @@ +// +build journald + +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logwatchers + +import ( + "k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/journald" +) + +const journaldPluginName = "journald" + +func init() { + // Register the syslog plugin. + registerLogWatcher(journaldPluginName, journald.NewJournaldWatcher) +} diff --git a/pkg/kernelmonitor/logwatchers/register_syslog.go b/pkg/kernelmonitor/logwatchers/register_syslog.go new file mode 100644 index 00000000..c48923bc --- /dev/null +++ b/pkg/kernelmonitor/logwatchers/register_syslog.go @@ -0,0 +1,28 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logwatchers + +import ( + "k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/syslog" +) + +const syslogPluginName = "syslog" + +func init() { + // Register the syslog plugin. + registerLogWatcher(syslogPluginName, syslog.NewSyslogWatcher) +} diff --git a/pkg/kernelmonitor/translator/translator.go b/pkg/kernelmonitor/logwatchers/syslog/helpers.go similarity index 54% rename from pkg/kernelmonitor/translator/translator.go rename to pkg/kernelmonitor/logwatchers/syslog/helpers.go index 1915a6e4..2412c9bb 100644 --- a/pkg/kernelmonitor/translator/translator.go +++ b/pkg/kernelmonitor/logwatchers/syslog/helpers.go @@ -13,50 +13,41 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - -package translator +package syslog import ( "fmt" + "io" + "os" "strings" "time" - "k8s.io/node-problem-detector/pkg/kernelmonitor/types" + kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types" + + "github.com/google/cadvisor/utils/tail" ) -// Translator translates a log line into types.KernelLog, so that kernel monitor -// could parse it whatever the original format is. -type Translator interface { - // Translate translates one log line into types.KernelLog. - Translate(string) (*types.KernelLog, error) -} - -// defaultTranslator works well for ubuntu and debian, but may not work well with -// other os distros. However it is easy to add a new translator for new os distro. -type defaultTranslator struct{} - -// NewDefaultTranslator creates a default translator. -func NewDefaultTranslator() Translator { - return &defaultTranslator{} -} - -func (t *defaultTranslator) Translate(line string) (*types.KernelLog, error) { - timestamp, message, err := t.parseLine(line) +// translate translates the log line into internal type. +func translate(line string) (*kerntypes.KernelLog, error) { + timestamp, message, err := parseLine(line) if err != nil { return nil, err } - return &types.KernelLog{ + return &kerntypes.KernelLog{ Timestamp: timestamp, Message: message, }, nil } -var ( - timestampLen = 15 +const ( + // timestampLen is the length of timestamp in syslog logging format. + timestampLen = 15 + // messagePrefix is the character before real message. messagePrefix = "]" ) -func (t *defaultTranslator) parseLine(line string) (time.Time, string, error) { +// parseLine parses one log line into timestamp and message. +func parseLine(line string) (time.Time, string, error) { // Trim the spaces to make sure timestamp could be found line = strings.TrimSpace(line) if len(line) < timestampLen { @@ -83,3 +74,29 @@ func (t *defaultTranslator) parseLine(line string) (time.Time, string, error) { return timestamp, message, nil } + +// defaultKernelLogPath the default path of syslog kernel log. +const defaultKernelLogPath = "/var/log/kern.log" + +// getLogReader returns log reader for syslog log. Note that getLogReader doesn't look back +// to the rolled out logs. +func getLogReader(path string) (io.ReadCloser, error) { + if path == "" { + path = defaultKernelLogPath + } + // To handle log rotation, tail will not report error immediately if + // the file doesn't exist. So we check file existence frist. + // This could go wrong during mid-rotation. It should recover after + // several restart when the log file is created again. The chance + // is slim but we should still fix this in the future. + // TODO(random-liu): Handle log missing during rotation. + _, err := os.Stat(path) + if err != nil { + return nil, fmt.Errorf("failed to stat the file %q: %v", path, err) + } + tail, err := tail.NewTail(path) + if err != nil { + return nil, fmt.Errorf("failed to tail the file %q: %v", path, err) + } + return tail, nil +} diff --git a/pkg/kernelmonitor/translator/translator_test.go b/pkg/kernelmonitor/logwatchers/syslog/helpers_test.go similarity index 52% rename from pkg/kernelmonitor/translator/translator_test.go rename to pkg/kernelmonitor/logwatchers/syslog/helpers_test.go index 98333ac4..c738ed9f 100644 --- a/pkg/kernelmonitor/translator/translator_test.go +++ b/pkg/kernelmonitor/logwatchers/syslog/helpers_test.go @@ -14,32 +14,38 @@ See the License for the specific language governing permissions and limitations under the License. */ -package translator +package syslog import ( "testing" "time" + + "github.com/stretchr/testify/assert" + + kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types" ) -func TestDefaultTranslator(t *testing.T) { - tr := NewDefaultTranslator() +func TestTranslate(t *testing.T) { year := time.Now().Year() testCases := []struct { - input string - err bool - timestamp time.Time - message string + input string + err bool + log *kerntypes.KernelLog }{ { - input: "May 1 12:23:45 hostname kernel: [0.000000] component: log message", - timestamp: time.Date(year, time.May, 1, 12, 23, 45, 0, time.Local), - message: "component: log message", + input: "May 1 12:23:45 hostname kernel: [0.000000] component: log message", + log: &kerntypes.KernelLog{ + Timestamp: time.Date(year, time.May, 1, 12, 23, 45, 0, time.Local), + Message: "component: log message", + }, }, { // no log message - input: "May 21 12:23:45 hostname kernel: [9.999999]", - timestamp: time.Date(year, time.May, 21, 12, 23, 45, 0, time.Local), - message: "", + input: "May 21 12:23:45 hostname kernel: [9.999999]", + log: &kerntypes.KernelLog{ + Timestamp: time.Date(year, time.May, 21, 12, 23, 45, 0, time.Local), + Message: "", + }, }, { // the right square bracket is missing @@ -49,15 +55,12 @@ func TestDefaultTranslator(t *testing.T) { } for c, test := range testCases { - log, err := tr.Translate(test.input) - if test.err { - if err == nil { - t.Errorf("case %d: expect error should occur, got %+v, %v", c+1, log, err) - } + t.Logf("TestCase #%d: %#v", c+1, test) + log, err := translate(test.input) + if (err != nil) != test.err { + t.Errorf("case %d: error assertion failed, got log: %+v, error: %v", c+1, log, err) continue } - if test.timestamp != log.Timestamp || test.message != log.Message { - t.Errorf("case %d: expect %v, %q; got %v, %q", c+1, test.timestamp, test.message, log.Timestamp, log.Message) - } + assert.Equal(t, test.log, log) } } diff --git a/pkg/kernelmonitor/logwatchers/syslog/log_watcher.go b/pkg/kernelmonitor/logwatchers/syslog/log_watcher.go new file mode 100644 index 00000000..deb3ccff --- /dev/null +++ b/pkg/kernelmonitor/logwatchers/syslog/log_watcher.go @@ -0,0 +1,122 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package syslog + +import ( + "bufio" + "bytes" + "io" + "time" + + utilclock "code.cloudfoundry.org/clock" + "github.com/golang/glog" + + "k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/types" + kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types" + "k8s.io/node-problem-detector/pkg/kernelmonitor/util" +) + +type syslogWatcher struct { + cfg types.WatcherConfig + reader *bufio.Reader + closer io.Closer + logCh chan *kerntypes.KernelLog + tomb *util.Tomb + clock utilclock.Clock +} + +// NewSyslogWatcher creates a new kernel log watcher. +func NewSyslogWatcher(cfg types.WatcherConfig) types.LogWatcher { + return &syslogWatcher{ + cfg: cfg, + tomb: util.NewTomb(), + // A capacity 1000 buffer should be enough + logCh: make(chan *kerntypes.KernelLog, 1000), + clock: utilclock.NewClock(), + } +} + +// Make sure NewSyslogWathcer is types.WatcherCreateFunc. +var _ types.WatcherCreateFunc = NewSyslogWatcher + +// Watch starts the syslog watcher. +func (s *syslogWatcher) Watch() (<-chan *kerntypes.KernelLog, error) { + r, err := getLogReader(s.cfg.LogPath) + if err != nil { + return nil, err + } + s.reader = bufio.NewReader(r) + s.closer = r + glog.Info("Start watching syslog") + go s.watchLoop() + return s.logCh, nil +} + +// Stop stops the syslog watcher. +func (s *syslogWatcher) Stop() { + s.tomb.Stop() +} + +// watchPollInterval is the interval syslog log watcher will +// poll for pod change after reading to the end. +const watchPollInterval = 500 * time.Millisecond + +// watchLoop is the main watch loop of syslog watcher. +func (s *syslogWatcher) watchLoop() { + defer func() { + s.closer.Close() + close(s.logCh) + s.tomb.Done() + }() + lookback, err := time.ParseDuration(s.cfg.Lookback) + if err != nil { + glog.Fatalf("Failed to parse duration %q: %v", s.cfg.Lookback, err) + } + glog.Info("Lookback:", lookback) + var buffer bytes.Buffer + for { + select { + case <-s.tomb.Stopping(): + glog.Infof("Stop watching syslog") + return + default: + } + + line, err := s.reader.ReadString('\n') + if err != nil && err != io.EOF { + glog.Errorf("Exiting syslog watch with error: %v", err) + return + } + buffer.WriteString(line) + if err == io.EOF { + time.Sleep(watchPollInterval) + continue + } + line = buffer.String() + buffer.Reset() + log, err := translate(line) + if err != nil { + glog.Warningf("Unable to parse line: %q, %v", line, err) + continue + } + // If the log is older than look back duration, discard it. + if s.clock.Since(log.Timestamp) > lookback { + continue + } + s.logCh <- log + } +} diff --git a/pkg/kernelmonitor/kernel_log_watcher_test.go b/pkg/kernelmonitor/logwatchers/syslog/log_watcher_test.go similarity index 76% rename from pkg/kernelmonitor/kernel_log_watcher_test.go rename to pkg/kernelmonitor/logwatchers/syslog/log_watcher_test.go index 2cb3da5b..5b161740 100644 --- a/pkg/kernelmonitor/kernel_log_watcher_test.go +++ b/pkg/kernelmonitor/logwatchers/syslog/log_watcher_test.go @@ -14,18 +14,19 @@ See the License for the specific language governing permissions and limitations under the License. */ -package kernelmonitor +package syslog import ( "io/ioutil" - //"os" - "reflect" + "os" "testing" "time" - "k8s.io/node-problem-detector/pkg/kernelmonitor/types" + "k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/types" + kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types" "code.cloudfoundry.org/clock/fakeclock" + "github.com/stretchr/testify/assert" ) func TestWatch(t *testing.T) { @@ -34,7 +35,7 @@ func TestWatch(t *testing.T) { fakeClock := fakeclock.NewFakeClock(now) testCases := []struct { log string - logs []types.KernelLog + logs []kerntypes.KernelLog lookback string }{ { @@ -43,7 +44,8 @@ func TestWatch(t *testing.T) { Jan 2 03:04:06 kernel: [1.000000] 2 Jan 2 03:04:07 kernel: [2.000000] 3 `, - logs: []types.KernelLog{ + lookback: "0", + logs: []kerntypes.KernelLog{ { Timestamp: now, Message: "1", @@ -64,7 +66,8 @@ func TestWatch(t *testing.T) { Jan 2 03:04:05 kernel: [1.000000] 2 Jan 2 03:04:06 kernel: [2.000000] 3 `, - logs: []types.KernelLog{ + lookback: "0", + logs: []kerntypes.KernelLog{ { Timestamp: now, Message: "2", @@ -82,7 +85,7 @@ func TestWatch(t *testing.T) { Jan 2 03:04:05 kernel: [2.000000] 3 `, lookback: "1s", - logs: []types.KernelLog{ + logs: []kerntypes.KernelLog{ { Timestamp: now.Add(-time.Second), Message: "2", @@ -95,31 +98,32 @@ func TestWatch(t *testing.T) { }, } for c, test := range testCases { + t.Logf("TestCase #%d: %#v", c+1, test) f, err := ioutil.TempFile("", "kernel_log_watcher_test") - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) defer func() { f.Close() - //os.Remove(f.Name()) + os.Remove(f.Name()) }() _, err = f.Write([]byte(test.log)) - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) - w := NewKernelLogWatcher(WatcherConfig{KernelLogPath: f.Name(), Lookback: test.lookback}) + w := NewSyslogWatcher(types.WatcherConfig{ + Plugin: "syslog", + LogPath: f.Name(), + Lookback: test.lookback, + }) // Set the fake clock. - w.(*kernelLogWatcher).clock = fakeClock + w.(*syslogWatcher).clock = fakeClock logCh, err := w.Watch() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) defer w.Stop() for _, expected := range test.logs { - got := <-logCh - if !reflect.DeepEqual(&expected, got) { - t.Errorf("case %d: expect %+v, got %+v", c+1, expected, *got) + select { + case got := <-logCh: + assert.Equal(t, &expected, got) + case <-time.After(30 * time.Second): + t.Errorf("timeout waiting for log") } } // The log channel should have already been drained diff --git a/pkg/kernelmonitor/logwatchers/types/log_watcher.go b/pkg/kernelmonitor/logwatchers/types/log_watcher.go new file mode 100644 index 00000000..c916bbfe --- /dev/null +++ b/pkg/kernelmonitor/logwatchers/types/log_watcher.go @@ -0,0 +1,43 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import ( + "k8s.io/node-problem-detector/pkg/kernelmonitor/types" +) + +// LogWatcher is the interface of a log watcher. +type LogWatcher interface { + // Watch starts watching logs and returns logs via a channel. + Watch() (<-chan *types.KernelLog, error) + // Stop stops the log watcher. Resources open should be closed properly. + Stop() +} + +// WatcherConfig is the configuration of the log watcher. +type WatcherConfig struct { + // Plugin is the name of plugin which is currently used. + // Currently supported: syslog, journald. + Plugin string `json:"plugin, omitempty"` + // LogPath is the path to the log + LogPath string `json:"logPath, omitempty"` + // Lookback is the time kernel watcher looks up + Lookback string `json:"lookback, omitempty"` +} + +// WatcherCreateFunc is the create function of a log watcher. +type WatcherCreateFunc func(WatcherConfig) LogWatcher