Add multiple log monitoring support.

This commit is contained in:
Random-Liu
2017-02-14 17:56:02 -08:00
parent 92e67b8615
commit 6170b0c87f
6 changed files with 57 additions and 15 deletions

View File

@@ -20,4 +20,4 @@ RUN test -h /etc/localtime && rm -f /etc/localtime && cp /usr/share/zoneinfo/UTC
ADD ./bin/node-problem-detector /node-problem-detector
ADD config /config
ENTRYPOINT ["/node-problem-detector", "--system-log-monitor=/config/kernel-monitor.json"]
ENTRYPOINT ["/node-problem-detector", "--system-log-monitors=/config/kernel-monitor.json"]

View File

@@ -54,8 +54,10 @@ List of supported problem daemons:
# Usage
## Flags
* `--version`: Print current version of node-problem-detector.
* `--system-log-monitor`: The configuration used by the system log monitor, e.g.
* `--system-log-monitors`: List of paths to system log monitor configuration files, comma separated, e.g.
[config/kernel-monitor.json](https://github.com/kubernetes/node-problem-detector/blob/master/config/kernel-monitor.json).
Node problem detector will start a separate log monitor for each configuration. You can
use different log monitors to monitor different system log.
* `--apiserver-override`: A URI parameter used to customize how node-problem-detector
connects the apiserver. The format is same as the
[`source`](https://github.com/kubernetes/heapster/blob/master/docs/source-configuration.md#kubernetes)

View File

@@ -67,9 +67,17 @@ func main() {
os.Exit(0)
}
l := systemlogmonitor.NewLogMonitorOrDie(npdo.SystemLogMonitorConfigPath)
monitors := make(map[string]systemlogmonitor.LogMonitor)
for _, config := range npdo.SystemLogMonitorConfigPaths {
if _, ok := monitors[config]; ok {
// Skip the config if it's duplictaed.
glog.Warningf("Duplicated log monitor configuration %q", config)
continue
}
monitors[config] = systemlogmonitor.NewLogMonitorOrDie(config)
}
c := problemclient.NewClientOrDie(npdo)
p := problemdetector.NewProblemDetector(l, c)
p := problemdetector.NewProblemDetector(monitors, c)
// Start http server.
if npdo.ServerPort > 0 {

View File

@@ -30,8 +30,9 @@ import (
type NodeProblemDetectorOptions struct {
// command line options
// SystemLogMonitorConfigPath specifies the path to system log monitor configuration file.
SystemLogMonitorConfigPath string
// SystemLogMonitorConfigPaths specifies the list of paths to system log monitor configuration
// files.
SystemLogMonitorConfigPaths []string
// ApiServerOverride is the custom URI used to connect to Kubernetes ApiServer.
ApiServerOverride string
// PrintVersion is the flag determining whether version information is printed.
@@ -55,8 +56,8 @@ func NewNodeProblemDetectorOptions() *NodeProblemDetectorOptions {
// AddFlags adds node problem detector command line options to pflag.
func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&npdo.SystemLogMonitorConfigPath, "system-log-monitor",
"/config/kernel-monitor.json", "The path to the system log monitor config file")
fs.StringSliceVar(&npdo.SystemLogMonitorConfigPaths, "system-log-monitors",
[]string{}, "List of paths to system log monitor config files, comma separated.")
fs.StringVar(&npdo.ApiServerOverride, "apiserver-override",
"", "Custom URI used to connect to Kubernetes ApiServer")
fs.BoolVar(&npdo.PrintVersion, "version", false, "Print version information and quit")

View File

@@ -17,6 +17,7 @@ limitations under the License.
package problemdetector
import (
"fmt"
"net/http"
"github.com/golang/glog"
@@ -26,6 +27,7 @@ import (
"k8s.io/node-problem-detector/pkg/condition"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/systemlogmonitor"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util"
)
@@ -38,28 +40,39 @@ type ProblemDetector interface {
type problemDetector struct {
client problemclient.Client
conditionManager condition.ConditionManager
// TODO(random-liu): Use slices of problem daemons if multiple monitors are needed in the future
monitor systemlogmonitor.LogMonitor
monitors map[string]systemlogmonitor.LogMonitor
}
// NewProblemDetector creates the problem detector. Currently we just directly passed in the problem daemons, but
// in the future we may want to let the problem daemons register themselves.
func NewProblemDetector(monitor systemlogmonitor.LogMonitor, client problemclient.Client) ProblemDetector {
func NewProblemDetector(monitors map[string]systemlogmonitor.LogMonitor, client problemclient.Client) ProblemDetector {
return &problemDetector{
client: client,
conditionManager: condition.NewConditionManager(client, clock.RealClock{}),
monitor: monitor,
monitors: monitors,
}
}
// Run starts the problem detector.
func (p *problemDetector) Run() error {
p.conditionManager.Start()
ch, err := p.monitor.Start()
if err != nil {
return err
// Start the log monitors one by one.
var chans []<-chan *types.Status
for cfg, m := range p.monitors {
ch, err := m.Start()
if err != nil {
// Do not return error and keep on trying the following config files.
glog.Errorf("Failed to start log monitor %q: %v", cfg, err)
continue
}
chans = append(chans, ch)
}
if len(chans) == 0 {
return fmt.Errorf("no log montior is successfully setup")
}
ch := groupChannel(chans)
glog.Info("Problem detector started")
for {
select {
case status := <-ch:
@@ -80,3 +93,15 @@ func (p *problemDetector) RegisterHTTPHandlers() {
util.ReturnHTTPJson(w, p.conditionManager.GetConditions())
})
}
func groupChannel(chans []<-chan *types.Status) <-chan *types.Status {
statuses := make(chan *types.Status)
for _, ch := range chans {
go func(c <-chan *types.Status) {
for status := range c {
statuses <- status
}
}(ch)
}
return statuses
}

View File

@@ -20,6 +20,7 @@ package journald
import (
"fmt"
"os"
"strings"
"time"
@@ -134,6 +135,11 @@ func getJournal(cfg types.WatcherConfig) (*sdjournal.Journal, error) {
if err != nil {
return nil, fmt.Errorf("failed to parse lookback duration %q: %v", cfg.Lookback, err)
}
// If the path doesn't present, NewJournalFromDir will create it instead of
// returning error. So check the path existence ourselves.
if _, err := os.Stat(path); err != nil {
return nil, fmt.Errorf("failed to stat the log path %q: %v", path, err)
}
// Get journal client from the log path.
journal, err := sdjournal.NewJournalFromDir(path)
if err != nil {