add journald support

This commit is contained in:
AdoHe
2016-10-08 20:28:30 -04:00
parent 6af7bbbe86
commit 84c25077da
60 changed files with 2668 additions and 3906 deletions

View File

@@ -17,15 +17,20 @@ limitations under the License.
package kernelmonitor
import (
"bufio"
"bytes"
"io"
"os"
"strings"
"time"
"k8s.io/node-problem-detector/pkg/kernelmonitor/translator"
"k8s.io/node-problem-detector/pkg/kernelmonitor/types"
"k8s.io/node-problem-detector/pkg/kernelmonitor/util"
"github.com/coreos/go-systemd/sdjournal"
"github.com/golang/glog"
"github.com/hpcloud/tail"
"github.com/google/cadvisor/utils/tail"
utilclock "github.com/pivotal-golang/clock"
)
@@ -54,12 +59,12 @@ type KernelLogWatcher interface {
type kernelLogWatcher struct {
// trans is the translator translates the log into internal format.
trans translator.Translator
cfg WatcherConfig
tl *tail.Tail
logCh chan *types.KernelLog
tomb *util.Tomb
clock utilclock.Clock
trans translator.Translator
cfg WatcherConfig
reader *bufio.Reader
logCh chan *types.KernelLog
tomb *util.Tomb
clock utilclock.Clock
}
// NewKernelLogWatcher creates a new kernel log watcher.
@@ -88,25 +93,17 @@ func (k *kernelLogWatcher) Watch() (<-chan *types.KernelLog, error) {
// To avoid this, we decide to add this temporarily hack. When KernelMonitor can't find the kernel
// log file, it will print a log and then return nil channel and no error. Since nil channel will
// always be blocked, the NodeProblemDetector will block forever.
// TODO(random-liu):
// 1. Add journald supports to support GCI.
// 2. Schedule KernelMonitor only on supported node (with node label and selector)
if _, err := os.Stat(path); os.IsNotExist(err) {
glog.Infof("kernel log %q is not found, kernel monitor doesn't support the os distro", path)
return nil, nil
}
// TODO(random-liu): Rate limit tail file.
// TODO(random-liu): Figure out what happens if log lines are removed.
// Notice that, kernel log watcher doesn't look back to the rolled out logs.
var err error
k.tl, err = tail.TailFile(path, tail.Config{
Poll: true,
ReOpen: true,
Follow: true,
})
reader, err := getLogReader(path)
if err != nil {
return nil, err
}
k.reader = bufio.NewReader(reader)
glog.Info("Start watching kernel log")
go k.watchLoop()
return k.logCh, nil
@@ -126,15 +123,31 @@ func (k *kernelLogWatcher) watchLoop() {
if err != nil {
glog.Fatalf("failed to parse duration %q: %v", k.cfg.Lookback, err)
}
var buffer bytes.Buffer
for {
select {
case line := <-k.tl.Lines:
// Notice that tail has trimmed '\n'
if line.Err != nil {
glog.Errorf("Tail error: %v", line.Err)
continue
}
log, err := k.trans.Translate(line.Text)
case <-k.tomb.Stopping():
glog.Infof("Stop watching kernel log")
return
default:
}
line, err := k.reader.ReadString('\n')
if err != nil && err != io.EOF {
glog.Errorf("exiting kernel log watch with error: %v", err)
return
}
if line == "" {
time.Sleep(100 * time.Millisecond)
continue
}
if err == nil {
buffer.WriteString(line)
// trime `\n`
line = strings.TrimRight(buffer.String(), "\n")
buffer.Reset()
log, err := k.trans.Translate(line)
if err != nil {
glog.Infof("Unable to parse line: %q, %v", line, err)
continue
@@ -144,14 +157,54 @@ func (k *kernelLogWatcher) watchLoop() {
continue
}
k.logCh <- log
case <-k.tomb.Stopping():
k.tl.Stop()
glog.Infof("Stop watching kernel log")
return
} else { // err == io.EOF
buffer.WriteString(line)
}
}
}
// getLogReader gets a kernel log reader.
func getLogReader(path string) (io.Reader, error) {
reader, err := tryJournal()
if err == nil {
return reader, nil
}
reader, err = tryLogFile(path)
if err == nil {
return reader, nil
}
return nil, err
}
func tryJournal() (io.Reader, error) {
r, err := sdjournal.NewJournalReader(sdjournal.JournalReaderConfig{
NumFromTail: uint64(0),
Matches: []sdjournal.Match{
{
Field: sdjournal.SD_JOURNAL_FIELD_TRANSPORT,
Value: "kernel",
},
},
})
if err != nil {
return nil, fmt.Errorf("Error opening journal: %v", err)
}
if r == nil {
return nil, fmt.Errorf("Got a nil reader")
}
glog.Info("Kernel log watcher use journal")
return r, nil
}
func tryLogFile(path string) (io.Reader, error) {
tail, err := tail.NewTail(path)
if err != nil {
return nil, err
}
glog.Infof("Kernel log watcher use log file: %s", path)
return tail, nil
}
func parseDuration(s string) (time.Duration, error) {
// If the duration is not configured, just return 0 by default
if s == "" {