From 6170b0c87ff094032dc3367e9ee45769f1785362 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Tue, 14 Feb 2017 17:56:02 -0800 Subject: [PATCH 1/2] Add multiple log monitoring support. --- Dockerfile.in | 2 +- README.md | 4 +- cmd/node_problem_detector.go | 12 +++++- pkg/options/options.go | 9 +++-- pkg/problemdetector/problem_detector.go | 39 +++++++++++++++---- .../logwatchers/journald/log_watcher.go | 6 +++ 6 files changed, 57 insertions(+), 15 deletions(-) diff --git a/Dockerfile.in b/Dockerfile.in index 40181d23..604d2f28 100644 --- a/Dockerfile.in +++ b/Dockerfile.in @@ -20,4 +20,4 @@ RUN test -h /etc/localtime && rm -f /etc/localtime && cp /usr/share/zoneinfo/UTC ADD ./bin/node-problem-detector /node-problem-detector ADD config /config -ENTRYPOINT ["/node-problem-detector", "--system-log-monitor=/config/kernel-monitor.json"] +ENTRYPOINT ["/node-problem-detector", "--system-log-monitors=/config/kernel-monitor.json"] diff --git a/README.md b/README.md index f6a31a1e..3fb5b2b0 100644 --- a/README.md +++ b/README.md @@ -54,8 +54,10 @@ List of supported problem daemons: # Usage ## Flags * `--version`: Print current version of node-problem-detector. -* `--system-log-monitor`: The configuration used by the system log monitor, e.g. +* `--system-log-monitors`: List of paths to system log monitor configuration files, comma separated, e.g. [config/kernel-monitor.json](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json). + Node problem detector will start a separate log monitor for each configuration. You can + use different log monitors to monitor different system log. * `--apiserver-override`: A URI parameter used to customize how node-problem-detector connects the apiserver. The format is same as the [`source`](https://github.com/kubernetes/heapster/blob/master/docs/source-configuration.md#kubernetes) diff --git a/cmd/node_problem_detector.go b/cmd/node_problem_detector.go index 9234eadd..07a3986d 100644 --- a/cmd/node_problem_detector.go +++ b/cmd/node_problem_detector.go @@ -67,9 +67,17 @@ func main() { os.Exit(0) } - l := systemlogmonitor.NewLogMonitorOrDie(npdo.SystemLogMonitorConfigPath) + monitors := make(map[string]systemlogmonitor.LogMonitor) + for _, config := range npdo.SystemLogMonitorConfigPaths { + if _, ok := monitors[config]; ok { + // Skip the config if it's duplictaed. + glog.Warningf("Duplicated log monitor configuration %q", config) + continue + } + monitors[config] = systemlogmonitor.NewLogMonitorOrDie(config) + } c := problemclient.NewClientOrDie(npdo) - p := problemdetector.NewProblemDetector(l, c) + p := problemdetector.NewProblemDetector(monitors, c) // Start http server. if npdo.ServerPort > 0 { diff --git a/pkg/options/options.go b/pkg/options/options.go index 81c4dac9..8d49cee6 100644 --- a/pkg/options/options.go +++ b/pkg/options/options.go @@ -30,8 +30,9 @@ import ( type NodeProblemDetectorOptions struct { // command line options - // SystemLogMonitorConfigPath specifies the path to system log monitor configuration file. - SystemLogMonitorConfigPath string + // SystemLogMonitorConfigPaths specifies the list of paths to system log monitor configuration + // files. + SystemLogMonitorConfigPaths []string // ApiServerOverride is the custom URI used to connect to Kubernetes ApiServer. ApiServerOverride string // PrintVersion is the flag determining whether version information is printed. @@ -55,8 +56,8 @@ func NewNodeProblemDetectorOptions() *NodeProblemDetectorOptions { // AddFlags adds node problem detector command line options to pflag. func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) { - fs.StringVar(&npdo.SystemLogMonitorConfigPath, "system-log-monitor", - "/config/kernel-monitor.json", "The path to the system log monitor config file") + fs.StringSliceVar(&npdo.SystemLogMonitorConfigPaths, "system-log-monitors", + []string{}, "List of paths to system log monitor config files, comma separated.") fs.StringVar(&npdo.ApiServerOverride, "apiserver-override", "", "Custom URI used to connect to Kubernetes ApiServer") fs.BoolVar(&npdo.PrintVersion, "version", false, "Print version information and quit") diff --git a/pkg/problemdetector/problem_detector.go b/pkg/problemdetector/problem_detector.go index d489cc8c..1a8262a5 100644 --- a/pkg/problemdetector/problem_detector.go +++ b/pkg/problemdetector/problem_detector.go @@ -17,6 +17,7 @@ limitations under the License. package problemdetector import ( + "fmt" "net/http" "github.com/golang/glog" @@ -26,6 +27,7 @@ import ( "k8s.io/node-problem-detector/pkg/condition" "k8s.io/node-problem-detector/pkg/problemclient" "k8s.io/node-problem-detector/pkg/systemlogmonitor" + "k8s.io/node-problem-detector/pkg/types" "k8s.io/node-problem-detector/pkg/util" ) @@ -38,28 +40,39 @@ type ProblemDetector interface { type problemDetector struct { client problemclient.Client conditionManager condition.ConditionManager - // TODO(random-liu): Use slices of problem daemons if multiple monitors are needed in the future - monitor systemlogmonitor.LogMonitor + monitors map[string]systemlogmonitor.LogMonitor } // NewProblemDetector creates the problem detector. Currently we just directly passed in the problem daemons, but // in the future we may want to let the problem daemons register themselves. -func NewProblemDetector(monitor systemlogmonitor.LogMonitor, client problemclient.Client) ProblemDetector { +func NewProblemDetector(monitors map[string]systemlogmonitor.LogMonitor, client problemclient.Client) ProblemDetector { return &problemDetector{ client: client, conditionManager: condition.NewConditionManager(client, clock.RealClock{}), - monitor: monitor, + monitors: monitors, } } // Run starts the problem detector. func (p *problemDetector) Run() error { p.conditionManager.Start() - ch, err := p.monitor.Start() - if err != nil { - return err + // Start the log monitors one by one. + var chans []<-chan *types.Status + for cfg, m := range p.monitors { + ch, err := m.Start() + if err != nil { + // Do not return error and keep on trying the following config files. + glog.Errorf("Failed to start log monitor %q: %v", cfg, err) + continue + } + chans = append(chans, ch) } + if len(chans) == 0 { + return fmt.Errorf("no log montior is successfully setup") + } + ch := groupChannel(chans) glog.Info("Problem detector started") + for { select { case status := <-ch: @@ -80,3 +93,15 @@ func (p *problemDetector) RegisterHTTPHandlers() { util.ReturnHTTPJson(w, p.conditionManager.GetConditions()) }) } + +func groupChannel(chans []<-chan *types.Status) <-chan *types.Status { + statuses := make(chan *types.Status) + for _, ch := range chans { + go func(c <-chan *types.Status) { + for status := range c { + statuses <- status + } + }(ch) + } + return statuses +} diff --git a/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go b/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go index 1291c4c0..3edd825d 100644 --- a/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/journald/log_watcher.go @@ -20,6 +20,7 @@ package journald import ( "fmt" + "os" "strings" "time" @@ -134,6 +135,11 @@ func getJournal(cfg types.WatcherConfig) (*sdjournal.Journal, error) { if err != nil { return nil, fmt.Errorf("failed to parse lookback duration %q: %v", cfg.Lookback, err) } + // If the path doesn't present, NewJournalFromDir will create it instead of + // returning error. So check the path existence ourselves. + if _, err := os.Stat(path); err != nil { + return nil, fmt.Errorf("failed to stat the log path %q: %v", path, err) + } // Get journal client from the log path. journal, err := sdjournal.NewJournalFromDir(path) if err != nil { From 889d9efbc1b296396fba700e62c77779df6aa472 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Thu, 16 Feb 2017 00:04:48 -0800 Subject: [PATCH 2/2] Add unit test for goroutine leak. --- pkg/systemlogmonitor/log_monitor_test.go | 15 +++++ .../logwatchers/filelog/log_watcher_test.go | 14 +++++ .../logwatchers/journald/log_watcher_test.go | 15 +++++ .../logwatchers/testing/fake_log_watcher.go | 59 +++++++++++++++++++ 4 files changed, 103 insertions(+) create mode 100644 pkg/systemlogmonitor/logwatchers/testing/fake_log_watcher.go diff --git a/pkg/systemlogmonitor/log_monitor_test.go b/pkg/systemlogmonitor/log_monitor_test.go index ec8d3634..2bd47c2b 100644 --- a/pkg/systemlogmonitor/log_monitor_test.go +++ b/pkg/systemlogmonitor/log_monitor_test.go @@ -17,10 +17,15 @@ limitations under the License. package systemlogmonitor import ( + "fmt" "reflect" + "runtime" "testing" "time" + "github.com/stretchr/testify/assert" + + watchertest "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/testing" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" "k8s.io/node-problem-detector/pkg/types" ) @@ -131,3 +136,13 @@ func TestGenerateStatus(t *testing.T) { } } } + +func TestGoroutineLeak(t *testing.T) { + orignal := runtime.NumGoroutine() + f := watchertest.NewFakeLogWatcher(10) + f.InjectError(fmt.Errorf("unexpected error")) + l := &logMonitor{watcher: f} + _, err := l.Start() + assert.Error(t, err) + assert.Equal(t, orignal, runtime.NumGoroutine()) +} diff --git a/pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go b/pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go index 730de818..06a5b5b3 100644 --- a/pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go +++ b/pkg/systemlogmonitor/logwatchers/filelog/log_watcher_test.go @@ -19,6 +19,7 @@ package filelog import ( "io/ioutil" "os" + "runtime" "testing" "time" @@ -170,3 +171,16 @@ Jan 2 03:04:05 kernel: [2.000000] 3 } } } + +func TestGoroutineLeak(t *testing.T) { + orignal := runtime.NumGoroutine() + w := NewSyslogWatcherOrDie(types.WatcherConfig{ + Plugin: "filelog", + PluginConfig: getTestPluginConfig(), + LogPath: "/not/exist/path", + Lookback: "10m", + }) + _, err := w.Watch() + assert.Error(t, err) + assert.Equal(t, orignal, runtime.NumGoroutine()) +} diff --git a/pkg/systemlogmonitor/logwatchers/journald/log_watcher_test.go b/pkg/systemlogmonitor/logwatchers/journald/log_watcher_test.go index 356a4262..dab60491 100644 --- a/pkg/systemlogmonitor/logwatchers/journald/log_watcher_test.go +++ b/pkg/systemlogmonitor/logwatchers/journald/log_watcher_test.go @@ -19,12 +19,14 @@ limitations under the License. package journald import ( + "runtime" "testing" "time" "github.com/coreos/go-systemd/sdjournal" "github.com/stretchr/testify/assert" + "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" ) @@ -62,3 +64,16 @@ func TestTranslate(t *testing.T) { assert.Equal(t, test.log, translate(test.entry)) } } + +func TestGoroutineLeak(t *testing.T) { + orignal := runtime.NumGoroutine() + w := NewJournaldWatcher(types.WatcherConfig{ + Plugin: "journald", + PluginConfig: map[string]string{"source": "not-exist-service"}, + LogPath: "/not/exist/path", + Lookback: "10m", + }) + _, err := w.Watch() + assert.Error(t, err) + assert.Equal(t, orignal, runtime.NumGoroutine()) +} diff --git a/pkg/systemlogmonitor/logwatchers/testing/fake_log_watcher.go b/pkg/systemlogmonitor/logwatchers/testing/fake_log_watcher.go new file mode 100644 index 00000000..f9e717de --- /dev/null +++ b/pkg/systemlogmonitor/logwatchers/testing/fake_log_watcher.go @@ -0,0 +1,59 @@ +/* +Copyright 2017 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "sync" + + "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" + logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" +) + +// FakeLogWatcher is a fake mock of log watcher. +type FakeLogWatcher struct { + sync.Mutex + buf chan *logtypes.Log + err error +} + +var _ types.LogWatcher = &FakeLogWatcher{} + +func NewFakeLogWatcher(bufferSize int) *FakeLogWatcher { + return &FakeLogWatcher{buf: make(chan *logtypes.Log, bufferSize)} +} + +// InjectLog injects a fake log into the watch channel +func (f *FakeLogWatcher) InjectLog(log *logtypes.Log) { + f.buf <- log +} + +// InjectError injects an error of Watch function. +func (f *FakeLogWatcher) InjectError(err error) { + f.Lock() + defer f.Unlock() + f.err = err +} + +// Watch is the fake watch function. +func (f *FakeLogWatcher) Watch() (<-chan *logtypes.Log, error) { + return f.buf, f.err +} + +// Stop is the fake stop function. +func (f *FakeLogWatcher) Stop() { + close(f.buf) +}