Finish the journald support

This commit is contained in:
Random-Liu
2016-11-09 23:40:50 -08:00
parent f0ed07a0b4
commit c15d463ad5
16 changed files with 663 additions and 306 deletions

View File

@@ -0,0 +1,163 @@
// +build journald
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package journald
import (
"fmt"
"strings"
"time"
"github.com/coreos/go-systemd/sdjournal"
"github.com/golang/glog"
"k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/types"
kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types"
"k8s.io/node-problem-detector/pkg/kernelmonitor/util"
)
// Compiling go-systemd/sdjournald needs libsystemd-dev or libsystemd-journal-dev,
// which is not always available on all os distros and versions.
// So we add the build tag in this file, so that on unsupported os distro, user can
// disable this build tag.
// journaldWatcher is the log watcher for journald.
type journaldWatcher struct {
journal *sdjournal.Journal
cfg types.WatcherConfig
logCh chan *kerntypes.KernelLog
tomb *util.Tomb
}
// NewJournaldWatcher is the create function of journald watcher.
func NewJournaldWatcher(cfg types.WatcherConfig) types.LogWatcher {
return &journaldWatcher{
cfg: cfg,
tomb: util.NewTomb(),
// A capacity 1000 buffer should be enough
logCh: make(chan *kerntypes.KernelLog, 1000),
}
}
// Make sure NewJournaldWatcher is types.WatcherCreateFunc .
var _ types.WatcherCreateFunc = NewJournaldWatcher
// Watch starts the journal watcher.
func (j *journaldWatcher) Watch() (<-chan *kerntypes.KernelLog, error) {
journal, err := getJournal(j.cfg)
if err != nil {
return nil, err
}
j.journal = journal
glog.Info("Start watching journald")
go j.watchLoop()
return j.logCh, nil
}
// Stop stops the journald watcher.
func (j *journaldWatcher) Stop() {
j.tomb.Stop()
}
// waitLogTimeout is the timeout waiting for new log.
const waitLogTimeout = 5 * time.Second
// watchLoop is the main watch loop of journald watcher.
func (j *journaldWatcher) watchLoop() {
defer func() {
if err := j.journal.Close(); err != nil {
glog.Errorf("Failed to close journal client: %v", err)
}
j.tomb.Done()
}()
for {
select {
case <-j.tomb.Stopping():
glog.Infof("Stop watching journald")
return
default:
}
// Get next log entry.
n, err := j.journal.Next()
if err != nil {
glog.Errorf("Failed to get next journal entry: %v", err)
continue
}
// If next reaches the end, wait for waitLogTimeout.
if n == 0 {
j.journal.Wait(waitLogTimeout)
continue
}
entry, err := j.journal.GetEntry()
if err != nil {
glog.Errorf("failed to get journal entry: %v", err)
continue
}
j.logCh <- translate(entry)
}
}
// defaultJournalLogPath is the default path of journal log.
const defaultJournalLogPath = "/var/log/journal"
// getJournal returns a journal client.
func getJournal(cfg types.WatcherConfig) (*sdjournal.Journal, error) {
// Get journal log path.
path := defaultJournalLogPath
if cfg.LogPath != "" {
path = cfg.LogPath
}
// Get lookback duration.
since, err := time.ParseDuration(cfg.Lookback)
if err != nil {
return nil, fmt.Errorf("failed to parse lookback duration %q: %v", cfg.Lookback, err)
}
// Get journal client from the log path.
journal, err := sdjournal.NewJournalFromDir(path)
if err != nil {
return nil, fmt.Errorf("failed to create journal client from path %q: %v", path, err)
}
// Seek journal client based on the lookback duration.
start := time.Now().Add(-since)
err = journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000))
if err != nil {
return nil, fmt.Errorf("failed to lookback %q: %v", since, err)
}
// TODO(random-liu): Make this configurable to support parsing other logs.
kernelMatch := sdjournal.Match{
Field: sdjournal.SD_JOURNAL_FIELD_TRANSPORT,
Value: "kernel",
}
err = journal.AddMatch(kernelMatch.String())
if err != nil {
return nil, fmt.Errorf("failed to add log filter %#v: %v", kernelMatch, err)
}
return journal, nil
}
// translate translates journal entry into internal type.
func translate(entry *sdjournal.JournalEntry) *kerntypes.KernelLog {
timestamp := time.Unix(0, int64(time.Duration(entry.RealtimeTimestamp)*time.Microsecond))
message := strings.TrimSpace(entry.Fields["MESSAGE"])
return &kerntypes.KernelLog{
Timestamp: timestamp,
Message: message,
}
}

View File

@@ -0,0 +1,64 @@
// +build journald
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package journald
import (
"testing"
"time"
"github.com/coreos/go-systemd/sdjournal"
"github.com/stretchr/testify/assert"
kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types"
)
func TestTranslate(t *testing.T) {
testCases := []struct {
entry *sdjournal.JournalEntry
log *kerntypes.KernelLog
}{
{
// has log message
entry: &sdjournal.JournalEntry{
Fields: map[string]string{"MESSAGE": "log message"},
RealtimeTimestamp: 123456789,
},
log: &kerntypes.KernelLog{
Timestamp: time.Unix(0, 123456789*1000),
Message: "log message",
},
},
{
// no log message
entry: &sdjournal.JournalEntry{
Fields: map[string]string{},
RealtimeTimestamp: 987654321,
},
log: &kerntypes.KernelLog{
Timestamp: time.Unix(0, 987654321*1000),
Message: "",
},
},
}
for c, test := range testCases {
t.Logf("TestCase #%d: %#v", c+1, test)
assert.Equal(t, test.log, translate(test.entry))
}
}

View File

@@ -0,0 +1,43 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logwatchers
import (
"fmt"
"k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/types"
"github.com/golang/glog"
)
// createFuncs is a table of createFuncs for all supported log watchers.
var createFuncs = map[string]types.WatcherCreateFunc{}
// registerLogWatcher registers a createFunc for a log watcher.
func registerLogWatcher(name string, create types.WatcherCreateFunc) {
createFuncs[name] = create
}
// GetLogWatcher get a log watcher based on the passed in configuration.
func GetLogWatcher(config types.WatcherConfig) (types.LogWatcher, error) {
create, ok := createFuncs[config.Plugin]
if !ok {
return nil, fmt.Errorf("no create function found for plugin %q", config.Plugin)
}
glog.Infof("Use log watcher of plugin %q", config.Plugin)
return create(config), nil
}

View File

@@ -0,0 +1,30 @@
// +build journald
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logwatchers
import (
"k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/journald"
)
const journaldPluginName = "journald"
func init() {
// Register the syslog plugin.
registerLogWatcher(journaldPluginName, journald.NewJournaldWatcher)
}

View File

@@ -0,0 +1,28 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logwatchers
import (
"k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/syslog"
)
const syslogPluginName = "syslog"
func init() {
// Register the syslog plugin.
registerLogWatcher(syslogPluginName, syslog.NewSyslogWatcher)
}

View File

@@ -0,0 +1,102 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package syslog
import (
"fmt"
"io"
"os"
"strings"
"time"
kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types"
"github.com/google/cadvisor/utils/tail"
)
// translate translates the log line into internal type.
func translate(line string) (*kerntypes.KernelLog, error) {
timestamp, message, err := parseLine(line)
if err != nil {
return nil, err
}
return &kerntypes.KernelLog{
Timestamp: timestamp,
Message: message,
}, nil
}
const (
// timestampLen is the length of timestamp in syslog logging format.
timestampLen = 15
// messagePrefix is the character before real message.
messagePrefix = "]"
)
// parseLine parses one log line into timestamp and message.
func parseLine(line string) (time.Time, string, error) {
// Trim the spaces to make sure timestamp could be found
line = strings.TrimSpace(line)
if len(line) < timestampLen {
return time.Time{}, "", fmt.Errorf("the line is too short: %q", line)
}
// Example line: Jan 1 00:00:00 hostname kernel: [0.000000] component: log message
now := time.Now()
// There is no time zone information in kernel log timestamp, apply the current time
// zone.
timestamp, err := time.ParseInLocation(time.Stamp, line[:timestampLen], time.Local)
if err != nil {
return time.Time{}, "", fmt.Errorf("error parsing timestamp in line %q: %v", line, err)
}
// There is no year information in kernel log timestamp, apply the current year.
// This could go wrong during looking back phase after kernel monitor is started,
// and the old logs are generated in old year.
timestamp = timestamp.AddDate(now.Year(), 0, 0)
loc := strings.Index(line, messagePrefix)
if loc == -1 {
return timestamp, "", fmt.Errorf("can't find message prefix %q in line %q", messagePrefix, line)
}
message := strings.Trim(line[loc+1:], " ")
return timestamp, message, nil
}
// defaultKernelLogPath the default path of syslog kernel log.
const defaultKernelLogPath = "/var/log/kern.log"
// getLogReader returns log reader for syslog log. Note that getLogReader doesn't look back
// to the rolled out logs.
func getLogReader(path string) (io.ReadCloser, error) {
if path == "" {
path = defaultKernelLogPath
}
// To handle log rotation, tail will not report error immediately if
// the file doesn't exist. So we check file existence frist.
// This could go wrong during mid-rotation. It should recover after
// several restart when the log file is created again. The chance
// is slim but we should still fix this in the future.
// TODO(random-liu): Handle log missing during rotation.
_, err := os.Stat(path)
if err != nil {
return nil, fmt.Errorf("failed to stat the file %q: %v", path, err)
}
tail, err := tail.NewTail(path)
if err != nil {
return nil, fmt.Errorf("failed to tail the file %q: %v", path, err)
}
return tail, nil
}

View File

@@ -0,0 +1,66 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package syslog
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types"
)
func TestTranslate(t *testing.T) {
year := time.Now().Year()
testCases := []struct {
input string
err bool
log *kerntypes.KernelLog
}{
{
input: "May 1 12:23:45 hostname kernel: [0.000000] component: log message",
log: &kerntypes.KernelLog{
Timestamp: time.Date(year, time.May, 1, 12, 23, 45, 0, time.Local),
Message: "component: log message",
},
},
{
// no log message
input: "May 21 12:23:45 hostname kernel: [9.999999]",
log: &kerntypes.KernelLog{
Timestamp: time.Date(year, time.May, 21, 12, 23, 45, 0, time.Local),
Message: "",
},
},
{
// the right square bracket is missing
input: "May 21 12:23:45 hostname kernel: [9.999999 component: log message",
err: true,
},
}
for c, test := range testCases {
t.Logf("TestCase #%d: %#v", c+1, test)
log, err := translate(test.input)
if (err != nil) != test.err {
t.Errorf("case %d: error assertion failed, got log: %+v, error: %v", c+1, log, err)
continue
}
assert.Equal(t, test.log, log)
}
}

View File

@@ -0,0 +1,122 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package syslog
import (
"bufio"
"bytes"
"io"
"time"
utilclock "code.cloudfoundry.org/clock"
"github.com/golang/glog"
"k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/types"
kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types"
"k8s.io/node-problem-detector/pkg/kernelmonitor/util"
)
type syslogWatcher struct {
cfg types.WatcherConfig
reader *bufio.Reader
closer io.Closer
logCh chan *kerntypes.KernelLog
tomb *util.Tomb
clock utilclock.Clock
}
// NewSyslogWatcher creates a new kernel log watcher.
func NewSyslogWatcher(cfg types.WatcherConfig) types.LogWatcher {
return &syslogWatcher{
cfg: cfg,
tomb: util.NewTomb(),
// A capacity 1000 buffer should be enough
logCh: make(chan *kerntypes.KernelLog, 1000),
clock: utilclock.NewClock(),
}
}
// Make sure NewSyslogWathcer is types.WatcherCreateFunc.
var _ types.WatcherCreateFunc = NewSyslogWatcher
// Watch starts the syslog watcher.
func (s *syslogWatcher) Watch() (<-chan *kerntypes.KernelLog, error) {
r, err := getLogReader(s.cfg.LogPath)
if err != nil {
return nil, err
}
s.reader = bufio.NewReader(r)
s.closer = r
glog.Info("Start watching syslog")
go s.watchLoop()
return s.logCh, nil
}
// Stop stops the syslog watcher.
func (s *syslogWatcher) Stop() {
s.tomb.Stop()
}
// watchPollInterval is the interval syslog log watcher will
// poll for pod change after reading to the end.
const watchPollInterval = 500 * time.Millisecond
// watchLoop is the main watch loop of syslog watcher.
func (s *syslogWatcher) watchLoop() {
defer func() {
s.closer.Close()
close(s.logCh)
s.tomb.Done()
}()
lookback, err := time.ParseDuration(s.cfg.Lookback)
if err != nil {
glog.Fatalf("Failed to parse duration %q: %v", s.cfg.Lookback, err)
}
glog.Info("Lookback:", lookback)
var buffer bytes.Buffer
for {
select {
case <-s.tomb.Stopping():
glog.Infof("Stop watching syslog")
return
default:
}
line, err := s.reader.ReadString('\n')
if err != nil && err != io.EOF {
glog.Errorf("Exiting syslog watch with error: %v", err)
return
}
buffer.WriteString(line)
if err == io.EOF {
time.Sleep(watchPollInterval)
continue
}
line = buffer.String()
buffer.Reset()
log, err := translate(line)
if err != nil {
glog.Warningf("Unable to parse line: %q, %v", line, err)
continue
}
// If the log is older than look back duration, discard it.
if s.clock.Since(log.Timestamp) > lookback {
continue
}
s.logCh <- log
}
}

View File

@@ -0,0 +1,138 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package syslog
import (
"io/ioutil"
"os"
"testing"
"time"
"k8s.io/node-problem-detector/pkg/kernelmonitor/logwatchers/types"
kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types"
"code.cloudfoundry.org/clock/fakeclock"
"github.com/stretchr/testify/assert"
)
func TestWatch(t *testing.T) {
// now is a fake time
now := time.Date(time.Now().Year(), time.January, 2, 3, 4, 5, 0, time.Local)
fakeClock := fakeclock.NewFakeClock(now)
testCases := []struct {
log string
logs []kerntypes.KernelLog
lookback string
}{
{
// The start point is at the head of the log file.
log: `Jan 2 03:04:05 kernel: [0.000000] 1
Jan 2 03:04:06 kernel: [1.000000] 2
Jan 2 03:04:07 kernel: [2.000000] 3
`,
lookback: "0",
logs: []kerntypes.KernelLog{
{
Timestamp: now,
Message: "1",
},
{
Timestamp: now.Add(time.Second),
Message: "2",
},
{
Timestamp: now.Add(2 * time.Second),
Message: "3",
},
},
},
{
// The start point is in the middle of the log file.
log: `Jan 2 03:04:04 kernel: [0.000000] 1
Jan 2 03:04:05 kernel: [1.000000] 2
Jan 2 03:04:06 kernel: [2.000000] 3
`,
lookback: "0",
logs: []kerntypes.KernelLog{
{
Timestamp: now,
Message: "2",
},
{
Timestamp: now.Add(time.Second),
Message: "3",
},
},
},
{
// The start point is at the end of the log file, but we look back.
log: `Jan 2 03:04:03 kernel: [0.000000] 1
Jan 2 03:04:04 kernel: [1.000000] 2
Jan 2 03:04:05 kernel: [2.000000] 3
`,
lookback: "1s",
logs: []kerntypes.KernelLog{
{
Timestamp: now.Add(-time.Second),
Message: "2",
},
{
Timestamp: now,
Message: "3",
},
},
},
}
for c, test := range testCases {
t.Logf("TestCase #%d: %#v", c+1, test)
f, err := ioutil.TempFile("", "kernel_log_watcher_test")
assert.NoError(t, err)
defer func() {
f.Close()
os.Remove(f.Name())
}()
_, err = f.Write([]byte(test.log))
assert.NoError(t, err)
w := NewSyslogWatcher(types.WatcherConfig{
Plugin: "syslog",
LogPath: f.Name(),
Lookback: test.lookback,
})
// Set the fake clock.
w.(*syslogWatcher).clock = fakeClock
logCh, err := w.Watch()
assert.NoError(t, err)
defer w.Stop()
for _, expected := range test.logs {
select {
case got := <-logCh:
assert.Equal(t, &expected, got)
case <-time.After(30 * time.Second):
t.Errorf("timeout waiting for log")
}
}
// The log channel should have already been drained
// There could stil be future messages sent into the channel, but the chance is really slim.
timeout := time.After(100 * time.Millisecond)
select {
case log := <-logCh:
t.Errorf("unexpected extra log: %+v", *log)
case <-timeout:
}
}
}

View File

@@ -0,0 +1,43 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"k8s.io/node-problem-detector/pkg/kernelmonitor/types"
)
// LogWatcher is the interface of a log watcher.
type LogWatcher interface {
// Watch starts watching logs and returns logs via a channel.
Watch() (<-chan *types.KernelLog, error)
// Stop stops the log watcher. Resources open should be closed properly.
Stop()
}
// WatcherConfig is the configuration of the log watcher.
type WatcherConfig struct {
// Plugin is the name of plugin which is currently used.
// Currently supported: syslog, journald.
Plugin string `json:"plugin, omitempty"`
// LogPath is the path to the log
LogPath string `json:"logPath, omitempty"`
// Lookback is the time kernel watcher looks up
Lookback string `json:"lookback, omitempty"`
}
// WatcherCreateFunc is the create function of a log watcher.
type WatcherCreateFunc func(WatcherConfig) LogWatcher