Add first version of node-problem-detector

This commit is contained in:
Lantao Liu
2016-05-17 15:55:33 -07:00
parent 802acee7e3
commit f0312655bd
31 changed files with 2370 additions and 0 deletions

17
Dockerfile Normal file
View File

@@ -0,0 +1,17 @@
# Copyright 2016 The Kubernetes Authors All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM alpine:3.1
MAINTAINER Random Liu <lantaol@google.com>
ADD node-problem-detector /node-problem-detector

19
Makefile Normal file
View File

@@ -0,0 +1,19 @@
all: push
# See pod.yaml for the version currently running-- bump this ahead before rebuilding!
TAG = 0.1
# TODO(random-liu): Change the project to google_containers.
PROJ = google.com/noogler-kubernetes
node-problem-detector: node_problem_detector.go
CGO_ENABLED=0 GOOS=linux godep go build -a -installsuffix cgo -ldflags '-w' -o node-problem-detector
container: node-problem-detector
docker build -t gcr.io/$(PROJ)/node-problem-detector:$(TAG) .
push: container
gcloud docker push gcr.io/$(PROJ)/node-problem-detector:$(TAG)
clean:
rm -f node-problem-detector

View File

@@ -0,0 +1,31 @@
{
"logPath": "/log/kern.log",
"bufferSize": 50,
"rules": [
{
"type": "temporary",
"reason": "OOMKilling",
"pattern": "Kill process \\d+ (.+) score \\d+ or sacrifice child\\nKilled process \\d+ (.+) total-vm:\\d+kB, anon-rss:\\d+kB, file-rss:\\d+kB"
},
{
"type": "temporary",
"reason": "TaskHung",
"pattern": "task \\S+:\\w+ blocked for more than \\w+ seconds\\."
},
{
"type": "permanent",
"reason": "AUFSUmountHung",
"pattern": "task umount\\.aufs:\\w+ blocked for more than \\w+ seconds\\."
},
{
"type": "permanent",
"reason": "DockerHung",
"pattern": "task docker:\\w+ blocked for more than \\w+ seconds\\."
},
{
"type": "permanent",
"reason": "KernelBug",
"pattern": "unregister_netdevice: waiting for \\w+ to become free. Usage count = \\d+"
}
]
}

1
demo/au_opts_verify Normal file
View File

@@ -0,0 +1 @@
aufs au_opts_verify:1570:docker[0000]: dirperm1 breaks the protection by the permission bits on the lower branch

19
demo/aufs_umount_hung Normal file
View File

@@ -0,0 +1,19 @@
INFO: task umount.aufs:21568 blocked for more than 120 seconds.
Tainted: G C 3.16.0-4-amd64 #1
"echo 0 > /proc/sys/kernel/hung_task_timeout_secs" disables this message.
umount.aufs D ffff8802111a9848 0 21568 21567 0x00000000
ffff8802111a93f0 0000000000000086 0000000000012f00 ffff8800baa0ffd8
0000000000012f00 ffff8802111a93f0 ffff8802111a93f0 ffff8800baa0fdd8
ffff8802149ef038 ffff8802149ef020 ffffffff00000000 ffff8802149ef028
Call Trace:
[<ffffffff81512d45>] ? rwsem_down_write_failed+0x1d5/0x320
[<ffffffff812b7d13>] ? call_rwsem_down_write_failed+0x13/0x20
[<ffffffff815126b9>] ? down_write+0x29/0x40
[<ffffffffa03556b1>] ? si_write_lock+0x31/0x110 [aufs]
[<ffffffff811b83ca>] ? do_filp_open+0x3a/0x90
[<ffffffffa0374e50>] ? au_plink_maint_enter+0x20/0xd0 [aufs]
[<ffffffffa037473d>] ? au_procfs_plm_write+0x13d/0x200 [aufs]
[<ffffffff81207069>] ? proc_reg_write+0x39/0x70
[<ffffffff811a8562>] ? vfs_write+0xb2/0x1f0
[<ffffffff811a90a2>] ? SyS_write+0x42/0xa0
[<ffffffff81513d0d>] ? system_call_fast_compare_end+0x10/0x15

81
demo/demo Executable file
View File

@@ -0,0 +1,81 @@
#!/bin/bash
ZONE=us-central1-b
KERNLOG=/var/log/kern.log
usage () {
echo "Usage : `basename $0` COMMAND [arg...]
Commands:
create: create node problem detector daemon.
nodes: describe all nodes or specific node.
pods: describe all pods or specific pod.
inject: inject error kernel log into specific node.
reboot: generate a fake reboot log on specific node.
delete: delete node problem detector daemon."
exit
}
runCmd() {
echo $1
eval $1
}
rebootAll() {
TEMP=`mktemp`
kubectl get nodes | awk 'NR>1{print $1}' > $TEMP
while read -r node; do
reboot $node
done < $TEMP
rm $TEMP
}
reboot() {
LATEST=`gcloud compute ssh -n root@$1 --zone=$ZONE "tail -1 $KERNLOG"`
PREFIX=`echo $LATEST | cut -d "[" -f 1 -`"[0.000000]"
runCmd "gcloud compute ssh -n root@$1 --zone=$ZONE \"echo '$PREFIX reboot' >> $KERNLOG\""
}
case $1 in
create )
runCmd "kubectl create configmap node-problem-detector-config --from-file=../config/"
runCmd "kubectl create -f ../node-problem-detector.yaml --validate=false"
;;
nodes )
runCmd "kubectl describe nodes $2"
;;
pods )
runCmd "kubectl describe pods $2"
;;
inject )
if [ -z $3 ]; then
usage
exit 1
fi
NODE=$3
LATEST=`gcloud compute ssh root@$NODE --zone=$ZONE "tail -1 $KERNLOG"`
PREFIX=`echo $LATEST | cut -d "]" -f 1 -`"]"
PREFIX=`printf "%q" "$PREFIX"`
COMMAND=
while read error
do
ERROR=`printf "%q" "$error"`
COMMAND=$COMMAND"echo $PREFIX $ERROR >> $KERNLOG; "
done < $2
runCmd "gcloud compute ssh root@$NODE --zone=$ZONE '$COMMAND'"
;;
reboot )
if [ -z $2 ]; then
usage
exit 1
fi
reboot $2
;;
delete )
runCmd "kubectl delete -f ../node-problem-detector.yaml"
runCmd "kubectl delete configmap node-problem-detector-config"
;;
* )
usage
exit 1
;;
esac

19
demo/docker_hung Normal file
View File

@@ -0,0 +1,19 @@
INFO: task docker:20744 blocked for more than 120 seconds.
Tainted: G C 3.16.0-4-amd64 #1
"echo 0 > /proc/sys/kernel/hung_task_timeout_secs" disables this message.
docker D ffff8801a8f2b078 0 20744 1 0x00000000
ffff8801a8f2ac20 0000000000000082 0000000000012f00 ffff880057a17fd8
0000000000012f00 ffff8801a8f2ac20 ffffffff818bb4a0 ffff880057a17d80
ffffffff818bb4a4 ffff8801a8f2ac20 00000000ffffffff ffffffff818bb4a8
Call Trace:
[<ffffffff81510915>] ? schedule_preempt_disabled+0x25/0x70
[<ffffffff815123c3>] ? __mutex_lock_slowpath+0xd3/0x1c0
[<ffffffff815124cb>] ? mutex_lock+0x1b/0x2a
[<ffffffff814175bc>] ? copy_net_ns+0x6c/0x130
[<ffffffff8108bdf4>] ? create_new_namespaces+0xf4/0x180
[<ffffffff8108beec>] ? copy_namespaces+0x6c/0x90
[<ffffffff810654f6>] ? copy_process.part.25+0x966/0x1c30
[<ffffffff81066991>] ? do_fork+0xe1/0x390
[<ffffffff811c442c>] ? __alloc_fd+0x7c/0x120
[<ffffffff81514079>] ? stub_clone+0x69/0x90
[<ffffffff81513d0d>] ? system_call_fast_compare_end+0x10/0x15

2
demo/oom_kill Normal file
View File

@@ -0,0 +1,2 @@
Memory cgroup out of memory: Kill process 1012 (heapster) score 1035 or sacrifice child
Killed process 1012 (heapster) total-vm:327128kB, anon-rss:306328kB, file-rss:11132kB

View File

@@ -0,0 +1 @@
unregister_netdevice: waiting for lo to become free. Usage count = 1

View File

@@ -0,0 +1,40 @@
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
name: node-problem-detector
spec:
template:
metadata:
labels:
app: node-problem-detector
spec:
hostNetwork: true
containers:
- name: node-problem-detector
command:
- /node-problem-detector
- --kernel-monitor=/config/kernel-monitor.json
image: gcr.io/google.com/noogler-kubernetes/node-problem-detector:0.1
imagePullPolicy: Always
env:
# Config the host ip and port of apiserver.
- name: "KUBERNETES_SERVICE_HOST"
value: "e2e-test-lantaol-master"
- name: "KUBERNETES_SERVICE_PORT"
value: "443"
securityContext:
privileged: true
volumeMounts:
- name: log
mountPath: /log
readOnly: true
- name: config
mountPath: /config
readOnly: true
volumes:
- name: log
hostPath:
path: /var/log/
- name: config
configMap:
name: node-problem-detector-config

35
node_problem_detector.go Normal file
View File

@@ -0,0 +1,35 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"k8s.io/node-problem-detector/pkg/kernelmonitor"
"k8s.io/node-problem-detector/pkg/problemdetector"
)
var (
kernelMonitorConfigPath = flag.String("kernel-monitor", "/config/kernel_monitor", "The path to the kernel monitor config file")
)
func main() {
flag.Parse()
k := kernelmonitor.NewKernelMonitorOrDie(*kernelMonitorConfigPath)
p := problemdetector.NewProblemDetector(k)
p.Run()
}

134
pkg/condition/manager.go Normal file
View File

@@ -0,0 +1,134 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package condition
import (
"reflect"
"sync"
"time"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/types"
problemutil "k8s.io/node-problem-detector/pkg/util"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util"
"github.com/golang/glog"
)
const (
// updatePeriod is the period which condition manager checks update.
updatePeriod = 1 * time.Second
// updateTimeout is the timeout of condition update operation.
updateTimeout = 10 * time.Second
// resyncPeriod is the period which condition manager does resync no matter whether these is any update.
resyncPeriod = 30 * time.Second
)
// ConditionManager synchronizes node conditions with the apiserver with problem client.
// It makes sure that:
// 1) Node conditions are updated to apiserver as soon as possible.
// 2) Node problem detector won't flood apiserver.
// 3) No one else could change the node conditions maintained by node problem detector.
// ConditionManager checks every updatePeriod to see whether there is node condition update. If there are any,
// it will synchronize with the apiserver. This addresses 1) and 2).
// ConditionManager synchronizes with apiserver every resyncPeriod no matter there is node condition update or
// not. This addresses 3).
type ConditionManager interface {
// Start starts the condition manager.
Start()
// UpdateCondition update specific condition.
UpdateCondition(types.Condition)
}
type conditionManager struct {
sync.Mutex
clock util.Clock
latest time.Time
client problemclient.Client
updates map[string]types.Condition
conditions map[string]types.Condition
}
// NewConditionManager creates a condition manager.
func NewConditionManager(client problemclient.Client, clock util.Clock) ConditionManager {
return &conditionManager{
client: client,
clock: clock,
updates: make(map[string]types.Condition),
conditions: make(map[string]types.Condition),
}
}
func (c *conditionManager) Start() {
go c.syncLoop()
}
func (c *conditionManager) UpdateCondition(condition types.Condition) {
c.Lock()
defer c.Unlock()
// New node condition will override the old condition, because we only need the newest
// condition for each condition type.
c.updates[condition.Type] = condition
}
func (c *conditionManager) syncLoop() {
updateCh := c.clock.Tick(updatePeriod)
for {
select {
case <-updateCh:
if c.checkUpdates() || c.checkResync() {
c.sync()
}
}
}
}
// checkUpdates checks whether there are recent updates.
func (c *conditionManager) checkUpdates() bool {
c.Lock()
defer c.Unlock()
needUpdate := false
for t, update := range c.updates {
if !reflect.DeepEqual(c.conditions[t], update) {
needUpdate = true
c.conditions[t] = update
}
delete(c.updates, t)
}
return needUpdate
}
// checkResync checks whether a resync is needed.
func (c *conditionManager) checkResync() bool {
return c.clock.Now().Sub(c.latest) >= resyncPeriod
}
// sync synchronizes node conditions with the apiserver.
func (c *conditionManager) sync() {
conditions := []api.NodeCondition{}
for i := range c.conditions {
conditions = append(conditions, problemutil.ConvertToAPICondition(c.conditions[i]))
}
if err := c.client.SetConditions(conditions, updateTimeout); err != nil {
// The conditions will be updated again in future sync
glog.Errorf("failed to update node conditions: %v", err)
return
}
c.latest = c.clock.Now()
}

View File

@@ -0,0 +1,81 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package condition
import (
"reflect"
"testing"
"time"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/types"
problemutil "k8s.io/node-problem-detector/pkg/util"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util"
)
func newTestManager() (*conditionManager, *problemclient.FakeProblemClient, *util.FakeClock) {
fakeClient := problemclient.NewFakeProblemClient()
fakeClock := util.NewFakeClock(time.Now())
manager := NewConditionManager(fakeClient, fakeClock)
return manager.(*conditionManager), fakeClient, fakeClock
}
func newTestCondition() types.Condition {
return types.Condition{
Type: "TestCondition",
Status: true,
Transition: time.Now(),
Reason: "TestReason",
Message: "test message",
}
}
func TestCheckUpdates(t *testing.T) {
condition := newTestCondition()
m, _, _ := newTestManager()
m.UpdateCondition(condition)
if !m.checkUpdates() {
t.Error("expected checkUpdates to be true, got false")
}
if !reflect.DeepEqual(condition, m.conditions[condition.Type]) {
t.Errorf("expected %+v, got %+v", condition, m.conditions[condition.Type])
}
if m.checkUpdates() {
t.Error("expected checkUpdates to be false, got true")
}
}
func TestSync(t *testing.T) {
m, fakeClient, fakeClock := newTestManager()
condition := newTestCondition()
m.conditions = map[string]types.Condition{condition.Type: condition}
m.sync()
expected := []api.NodeCondition{problemutil.ConvertToAPICondition(condition)}
err := fakeClient.AssertConditions(expected)
if err != nil {
t.Error(err)
}
if m.checkResync() {
t.Error("expected checkResync to be false, got true")
}
fakeClock.Step(resyncPeriod)
if !m.checkResync() {
t.Error("expected checkResync to be true, got false")
}
}

View File

@@ -0,0 +1,166 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kernelmonitor
import (
"bufio"
"os"
"k8s.io/node-problem-detector/pkg/kernelmonitor/translator"
"k8s.io/node-problem-detector/pkg/kernelmonitor/types"
"k8s.io/node-problem-detector/pkg/kernelmonitor/util"
"github.com/golang/glog"
"github.com/hpcloud/tail"
)
const (
defaultKernelLogPath = "/var/log/kern.log"
)
// WatcherConfig is the configuration of kernel log watcher.
type WatcherConfig struct {
KernelLogPath string `json:"logPath, omitempty"`
}
// KernelLogWatcher watches and translates the kernel log. Once there is new log line,
// it will translate and report the log.
type KernelLogWatcher interface {
// Watch starts the kernel log watcher and returns a watch channel.
Watch() (<-chan *types.KernelLog, error)
// Stop stops the kernel log watcher.
Stop()
}
type kernelLogWatcher struct {
// trans is the translator translates the log into internal format.
trans translator.Translator
cfg WatcherConfig
tl *tail.Tail
logCh chan *types.KernelLog
tomb *util.Tomb
}
// NewKernelLogWatcher creates a new kernel log watcher.
func NewKernelLogWatcher(cfg WatcherConfig) KernelLogWatcher {
return &kernelLogWatcher{
trans: translator.NewDefaultTranslator(),
cfg: cfg,
tomb: util.NewTomb(),
// A capacity 1000 buffer should be enough
logCh: make(chan *types.KernelLog, 1000),
}
}
func (k *kernelLogWatcher) Watch() (<-chan *types.KernelLog, error) {
path := defaultKernelLogPath
if k.cfg.KernelLogPath != "" {
path = k.cfg.KernelLogPath
}
start, err := k.getStartPoint(path)
if err != nil {
return nil, err
}
// TODO(random-liu): If the file gets recreated during this interval, the logic
// will go wrong here.
// TODO(random-liu): Rate limit tail file.
// TODO(random-liu): Figure out what happens if log lines are removed.
k.tl, err = tail.TailFile(path, tail.Config{
Location: &tail.SeekInfo{
Offset: start,
Whence: os.SEEK_SET,
},
Poll: true,
ReOpen: true,
Follow: true,
})
if err != nil {
return nil, err
}
glog.Info("Start watching kernel log")
go k.watchLoop()
return k.logCh, nil
}
func (k *kernelLogWatcher) Stop() {
k.tomb.Stop()
}
// watchLoop is the main watch loop of kernel log watcher.
func (k *kernelLogWatcher) watchLoop() {
defer func() {
close(k.logCh)
k.tomb.Done()
}()
for {
select {
case line := <-k.tl.Lines:
// Notice that tail has trimmed '\n'
if line.Err != nil {
glog.Errorf("Tail error: %v", line.Err)
continue
}
log, err := k.trans.Translate(line.Text)
if err != nil {
glog.Infof("Unable to parse line: %q, %v", line, err)
continue
}
k.logCh <- log
case <-k.tomb.Stopping():
k.tl.Stop()
glog.Infof("Stop watching kernel log")
return
}
}
}
// getStartPoint parses the newest kernel log file and try to find the latest reboot point.
// Currently we rely on the kernel log timestamp to find the reboot point. The basic idea
// is straight forward: In the whole lifecycle of a node, the kernel log timestamp should
// always increase, only when it is reboot, the timestamp will decrease. We just parse the
// log and find the latest timestamp decreasing, then it should be the latest reboot point.
// TODO(random-liu): A drawback is that if the node is started long time ago, we'll only get
// logs in the newest kernel log file. We may want to improve this in the future.
func (k *kernelLogWatcher) getStartPoint(path string) (int64, error) {
f, err := os.Open(path)
if err != nil {
return -1, err
}
defer f.Close()
start := int64(0)
total := 0
lastTimestamp := int64(0)
reader := bufio.NewReader(f)
done := false
for !done {
line, err := reader.ReadString('\n')
if err != nil {
done = true
}
total += len(line)
log, err := k.trans.Translate(line)
if err != nil {
glog.Infof("unable to parse line: %q, %v", line, err)
continue
}
if log.Timestamp < lastTimestamp {
start = int64(total - len(line))
}
lastTimestamp = log.Timestamp
}
return start, nil
}

View File

@@ -0,0 +1,108 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kernelmonitor
import (
"io/ioutil"
"os"
"reflect"
"testing"
"time"
"k8s.io/node-problem-detector/pkg/kernelmonitor/types"
)
func TestGetStartPoint(t *testing.T) {
testCases := []struct {
log string
logs []types.KernelLog
err bool
}{
{
// The start point is at the head of the log file.
log: `kernel: [1.000000] 1
kernel: [2.000000] 2
kernel: [3.000000] 3
`,
logs: []types.KernelLog{
{
Timestamp: 1000000,
Message: "1",
},
{
Timestamp: 2000000,
Message: "2",
},
{
Timestamp: 3000000,
Message: "3",
},
},
},
{
// The start point is in the middle of the log file.
log: `kernel: [3.000000] 3
kernel: [1.000000] 1
kernel: [2.000000] 2
`,
logs: []types.KernelLog{
{
Timestamp: 1000000,
Message: "1",
},
{
Timestamp: 2000000,
Message: "2",
},
},
},
}
for c, test := range testCases {
f, err := ioutil.TempFile("", "kernel_log_watcher_test")
if err != nil {
t.Fatal(err)
}
defer func() {
f.Close()
os.Remove(f.Name())
}()
_, err = f.Write([]byte(test.log))
if err != nil {
t.Fatal(err)
}
w := NewKernelLogWatcher(WatcherConfig{KernelLogPath: f.Name()})
logCh, err := w.Watch()
if err != nil {
t.Fatal(err)
}
defer w.Stop()
for _, expected := range test.logs {
got := <-logCh
if !reflect.DeepEqual(&expected, got) {
t.Errorf("case %d: expect %+v, got %+v", c+1, expected, *got)
}
}
// The log channel should have already been drained
// There could stil be future messages sent into the channel, but the chance is really slim.
timeout := time.After(100 * time.Millisecond)
select {
case log := <-logCh:
t.Errorf("unexpected extra log: %+v", *log)
case <-timeout:
}
}
}

View File

@@ -0,0 +1,210 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kernelmonitor
import (
"encoding/json"
"io/ioutil"
"regexp"
"syscall"
"time"
kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types"
"k8s.io/node-problem-detector/pkg/kernelmonitor/util"
"k8s.io/node-problem-detector/pkg/types"
"github.com/golang/glog"
)
// May want to add more conditions if we need finer grained node conditions.
// TODO(random-liu): Make the kernel condition to be a predefined list, and make it configurable
// in rule.
const (
KernelDeadlockCondition = "KernelDeadlock"
KernelMonitorSource = "kernel-monitor"
)
// MonitorConfig is the configuration of kernel monitor.
type MonitorConfig struct {
// WatcherConfig is the configuration of kernel log watcher.
WatcherConfig
// BufferSize is the size (in lines) of the log buffer.
BufferSize int `json:"bufferSize"`
// Rules are the rules kernel monitor will follow to parse the log file.
Rules []kerntypes.Rule `json:"rules"`
}
// KernelMonitor monitors the kernel log and reports node problem condition and event according to
// the rules.
type KernelMonitor interface {
// Start starts the kernel monitor.
Start() (<-chan *types.Status, error)
// Stop stops the kernel monitor.
Stop()
}
type kernelMonitor struct {
watcher KernelLogWatcher
buffer LogBuffer
config MonitorConfig
condition types.Condition
uptime time.Time
logCh <-chan *kerntypes.KernelLog
output chan *types.Status
tomb *util.Tomb
}
// NewKernelMonitorOrDie create a new KernelMonitor, panic if error occurs.
func NewKernelMonitorOrDie(configPath string) KernelMonitor {
k := &kernelMonitor{
condition: defaultCondition(),
tomb: util.NewTomb(),
}
f, err := ioutil.ReadFile(configPath)
if err != nil {
panic(err)
}
err = json.Unmarshal(f, &k.config)
if err != nil {
panic(err)
}
err = validateRules(k.config.Rules)
if err != nil {
panic(err)
}
glog.Infof("Finish parsing log file: %+v", k.config)
var info syscall.Sysinfo_t
err = syscall.Sysinfo(&info)
if err != nil {
panic(err)
}
k.uptime = time.Now().Add(time.Duration(-info.Uptime * int64(time.Second)))
glog.Infof("Got system boot time: %v", k.uptime)
k.watcher = NewKernelLogWatcher(k.config.WatcherConfig)
k.buffer = NewLogBuffer(k.config.BufferSize)
// A 1000 size channel should be big enough.
k.output = make(chan *types.Status, 1000)
return k
}
func (k *kernelMonitor) Start() (<-chan *types.Status, error) {
glog.Info("Start kernel monitor")
var err error
k.logCh, err = k.watcher.Watch()
if err != nil {
return nil, err
}
go k.monitorLoop()
return k.output, nil
}
func (k *kernelMonitor) Stop() {
glog.Info("Stop kernel monitor")
k.tomb.Stop()
}
// monitorLoop is the main loop of kernel monitor.
func (k *kernelMonitor) monitorLoop() {
defer k.tomb.Done()
k.output <- defaultStatus() // Update the default status
for {
select {
case log := <-k.logCh:
// Once there is new log, kernel monitor will push it into the log buffer and try
// to match each rule. If any rule is matched, kernel monitor will report a status.
k.buffer.Push(log)
for _, rule := range k.config.Rules {
matched := k.buffer.Match(rule.Pattern)
if len(matched) == 0 {
continue
}
status := k.generateStatus(matched, rule)
glog.Infof("New status generated: %+v", status)
k.output <- status
}
case <-k.tomb.Stopping():
k.watcher.Stop()
glog.Infof("Kernel monitor stopped")
return
}
}
}
// generateStatus generates status from the logs.
func (k *kernelMonitor) generateStatus(logs []*kerntypes.KernelLog, rule kerntypes.Rule) *types.Status {
// We use the timestamp of the first log line as the timestamp of the status.
timestamp := k.generateTimestamp(logs[0].Timestamp)
messages := []string{}
for _, log := range logs {
messages = append(messages, log.Message)
}
message := concatLogs(messages)
var event *types.Event
if rule.Type == kerntypes.Temp {
// For temporary error only generate event
event = &types.Event{
Severity: types.Warn,
Timestamp: timestamp,
Reason: rule.Reason,
Message: message,
}
} else {
// For permanent error changes the condition
k.condition.Type = KernelDeadlockCondition
k.condition.Status = true
k.condition.Transition = timestamp
k.condition.Reason = rule.Reason
k.condition.Message = message
}
return &types.Status{
Source: KernelMonitorSource,
Event: event,
Condition: k.condition,
}
}
// generateTimestamp converts the kernel log time to real time.
func (k *kernelMonitor) generateTimestamp(timestamp int64) time.Time {
return k.uptime.Add(time.Duration(timestamp * int64(time.Microsecond)))
}
// defaultStatus returns the default status with default condition.
func defaultStatus() *types.Status {
return &types.Status{
Source: KernelMonitorSource,
Condition: defaultCondition(),
}
}
func defaultCondition() types.Condition {
return types.Condition{
Type: KernelDeadlockCondition,
Status: false,
Transition: time.Now(),
}
}
// validateRules verifies whether the regular expressions in the rules are valid.
func validateRules(rules []kerntypes.Rule) error {
for _, rule := range rules {
_, err := regexp.Compile(rule.Pattern)
if err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,88 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kernelmonitor
import (
"reflect"
"testing"
"time"
kerntypes "k8s.io/node-problem-detector/pkg/kernelmonitor/types"
"k8s.io/node-problem-detector/pkg/types"
)
func TestGenerateStatus(t *testing.T) {
uptime := time.Unix(1000, 0)
initCondition := defaultCondition()
logs := []*kerntypes.KernelLog{
{
Timestamp: 100000,
Message: "test message 1",
},
{
Timestamp: 200000,
Message: "test message 2",
},
}
for c, test := range []struct {
rule kerntypes.Rule
expected types.Status
}{
// Do not need Pattern because we don't do pattern match in this test
{
rule: kerntypes.Rule{
Type: kerntypes.Perm,
Reason: "test reason",
},
expected: types.Status{
Source: KernelMonitorSource,
Condition: types.Condition{
Type: KernelDeadlockCondition,
Status: true,
Transition: time.Unix(1000, 100000*1000),
Reason: "test reason",
Message: "test message 1\ntest message 2",
},
},
},
{
rule: kerntypes.Rule{
Type: kerntypes.Temp,
Reason: "test reason",
},
expected: types.Status{
Source: KernelMonitorSource,
Event: &types.Event{
Severity: types.Warn,
Timestamp: time.Unix(1000, 100000*1000),
Reason: "test reason",
Message: "test message 1\ntest message 2",
},
Condition: initCondition,
},
},
} {
k := &kernelMonitor{
condition: initCondition,
uptime: uptime,
}
got := k.generateStatus(logs, test.rule)
if !reflect.DeepEqual(&test.expected, got) {
t.Errorf("case %d: expected status %+v, got %+v", c+1, test.expected, got)
}
}
}

View File

@@ -0,0 +1,102 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kernelmonitor
import (
"regexp"
"strings"
"k8s.io/node-problem-detector/pkg/kernelmonitor/types"
)
// LogBuffer buffers the logs and supports match in the log buffer with regular expression.
type LogBuffer interface {
// Push pushes log into the log buffer.
Push(*types.KernelLog)
// Match with regular expression in the log buffer.
Match(string) []*types.KernelLog
// String returns a concatenated string of the buffered logs.
String() string
}
type logBuffer struct {
// buffer is a simple ring buffer.
buffer []*types.KernelLog
msg []string
max int
current int
}
// NewLogBuffer creates log buffer with max line number limit. Because we only match logs
// in the log buffer, the max buffer line number is also the max pattern line number we
// support. Smaller buffer line number means less memory and cpu usage, but also means less
// lines of patterns we support.
func NewLogBuffer(maxLines int) *logBuffer {
return &logBuffer{
buffer: make([]*types.KernelLog, maxLines, maxLines),
msg: make([]string, maxLines, maxLines),
max: maxLines,
}
}
func (b *logBuffer) Push(log *types.KernelLog) {
b.buffer[b.current%b.max] = log
b.msg[b.current%b.max] = log.Message
b.current++
}
// TODO(random-liu): Cache regexp if garbage collection becomes a problem someday.
func (b *logBuffer) Match(expr string) []*types.KernelLog {
// The expression should be checked outside, and it must match to the end.
reg := regexp.MustCompile(expr + `\z`)
log := b.String()
loc := reg.FindStringIndex(log)
if loc == nil {
// No match
return nil
}
// reverse index
s := len(log) - loc[0] - 1
total := 0
matched := []*types.KernelLog{}
for i := b.tail(); i >= b.current && b.buffer[i%b.max] != nil; i-- {
matched = append(matched, b.buffer[i%b.max])
total += len(b.msg[i%b.max]) + 1 // Add '\n'
if total > s {
break
}
}
for i := 0; i < len(matched)/2; i++ {
matched[i], matched[len(matched)-i-1] = matched[len(matched)-i-1], matched[i]
}
return matched
}
func (b *logBuffer) String() string {
logs := append(b.msg[b.current%b.max:], b.msg[:b.current%b.max]...)
return concatLogs(logs)
}
// tail returns current tail index.
func (b *logBuffer) tail() int {
return b.current + b.max - 1
}
// concatLogs concatenates multiple lines of logs into one string.
func concatLogs(logs []string) string {
return strings.Join(logs, "\n")
}

View File

@@ -0,0 +1,110 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kernelmonitor
import (
"reflect"
"testing"
"k8s.io/node-problem-detector/pkg/kernelmonitor/types"
)
func TestPush(t *testing.T) {
for c, test := range []struct {
max int
logs []string
expected string
}{
{
max: 1,
logs: []string{"a", "b"},
expected: "b",
},
{
max: 2,
logs: []string{"a", "b"},
expected: "a\nb",
},
{
max: 2,
logs: []string{"a", "b", "c"},
expected: "b\nc",
},
{
max: 2,
logs: []string{"a", "b", "c", "d"},
expected: "c\nd",
},
} {
b := NewLogBuffer(test.max)
for _, log := range test.logs {
b.Push(&types.KernelLog{Message: log})
}
got := b.String()
if test.expected != got {
t.Errorf("case %d: expected %q, got %q", c+1, test.expected, got)
}
}
}
func TestMatch(t *testing.T) {
max := 4
for c, test := range []struct {
logs []string
exprs []string
expected [][]string
}{
{
// Buffer not full
logs: []string{"a1", "b2"},
exprs: []string{
"a1", // Not including the last line, should not match
"b1", // Not match
"b2", // match
`\w{2}`, // Regexp should work
"a1\nb2", // Including the last line, should match
`a1b2`, // No new line, should not match
},
expected: [][]string{{}, {}, {"b2"}, {"b2"}, {"a1", "b2"}, {}},
},
{
// Buffer full
logs: []string{"a1", "b2", "c3", "d4", "e5"},
exprs: []string{
"(?s)a1.+", // Rotate out, should not match
`[a-z]\d\n[a-z]\d`, // New line should work, and only the one contains the last line should match
`[a-z]\d`, // Multiple match, only the one contains the last line should match
},
expected: [][]string{{}, {"d4", "e5"}, {"e5"}},
},
} {
b := NewLogBuffer(max)
for _, log := range test.logs {
b.Push(&types.KernelLog{Message: log})
}
for i, expr := range test.exprs {
kLogs := b.Match(expr)
got := []string{}
for _, kLog := range kLogs {
got = append(got, kLog.Message)
}
if !reflect.DeepEqual(test.expected[i], got) {
t.Errorf("case %d.%d: expected %v, got %v", c+1, i+1, test.expected[i], got)
}
}
}
}

View File

@@ -0,0 +1,86 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package translator
import (
"fmt"
"strconv"
"strings"
"k8s.io/node-problem-detector/pkg/kernelmonitor/types"
)
// Translator translates a log line into types.KernelLog, so that kernel monitor
// could parse it whatever the original format is.
type Translator interface {
// Translate translates one log line into types.KernelLog.
Translate(string) (*types.KernelLog, error)
}
// defaultTranslator works well for ubuntu and debian, but may not work well with
// other os distros. However it is easy to add a new translator for new os distro.
type defaultTranslator struct{}
// NewDefaultTranslator creates a default translator.
func NewDefaultTranslator() Translator {
return &defaultTranslator{}
}
func (t *defaultTranslator) Translate(line string) (*types.KernelLog, error) {
timestr, message, err := parseLine(line)
if err != nil {
return nil, err
}
timestamp, err := parseTimestamp(timestr)
if err != nil {
return nil, err
}
return &types.KernelLog{
Timestamp: timestamp,
Message: message,
}, nil
}
func parseLine(line string) (string, string, error) {
// Example line: Jan 1 00:00:00 hostname kernel: [0.000000] component: log message
timestampPrefix := "kernel: ["
timestampSuffix := "]"
idx := strings.Index(line, timestampPrefix)
if idx == -1 {
return "", "", fmt.Errorf("can't find timestamp prefix %q in line %q", timestampPrefix, line)
}
line = line[idx+len(timestampPrefix):]
idx = strings.Index(line, timestampSuffix)
if idx == -1 {
return "", "", fmt.Errorf("can't find timestamp suffix %q in line %q", timestampSuffix, line)
}
timestamp := strings.Trim(line[:idx], " ")
message := strings.Trim(line[idx+1:], " ")
return timestamp, message, nil
}
func parseTimestamp(timestamp string) (int64, error) {
f, err := strconv.ParseFloat(timestamp, 64)
if err != nil {
return 0, err
}
// seconds to microseconds
return int64(f * 1000000), nil
}

View File

@@ -0,0 +1,64 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package translator
import (
"testing"
)
func TestDefaultTranslator(t *testing.T) {
tr := NewDefaultTranslator()
testCases := []struct {
input string
err bool
timestamp int64
message string
}{
{
input: "Jan 1 00:00:00 hostname kernel: [9.999999] component: log message",
timestamp: 9999999,
message: "component: log message",
},
{
input: "Jan 1 00:00:00 hostname kernel: [9.999999]",
timestamp: 9999999,
message: "",
},
{
input: "Jan 1 00:00:00 hostname kernel: [9.999999 component: log message",
err: true,
},
{
input: "Jan 1 00:00:00 hostname user: [9.999999] component: log message",
err: true,
},
}
for c, test := range testCases {
log, err := tr.Translate(test.input)
if test.err {
if err == nil {
t.Errorf("case %d: expect error should occur, got %+v, %v", c+1, log, err)
}
continue
}
if test.timestamp != log.Timestamp || test.message != log.Message {
t.Errorf("case %d: expect timestamp: %d, message: %q; got %+v", c+1, test.timestamp, test.message, log)
}
}
}

View File

@@ -0,0 +1,45 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
// KernelLog is the log item returned by translator. It's very easy to extend this
// to support other log monitoring, such as docker log monitoring.
type KernelLog struct {
Timestamp int64 // microseconds since kernel boot
Message string
}
// Type is the type of the kernel problem.
type Type string
const (
// Temp means the kernel problem is temporary, only need to report an event.
Temp Type = "temporary"
// Perm means the kernel problem is permanent, need to change the node condition.
Perm Type = "permanent"
)
// Rule describes how kernel monitor should analyze the kernel log.
type Rule struct {
// Type is the type of matched kernel problem.
Type Type `json:"type"`
// Reason is the short reason of the kernel problem.
Reason string `json:"reason"`
// Pattern is the regular expression to match the kernel problem in kernel log.
// Notice that the pattern must match to the end of the line.
Pattern string `json:"pattern"`
}

View File

@@ -0,0 +1,47 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
// Tomb is used to control the lifecycle of a goroutine.
type Tomb struct {
stop chan struct{}
done chan struct{}
}
// NewTomb creates a new tomb.
func NewTomb() *Tomb {
return &Tomb{
stop: make(chan struct{}),
done: make(chan struct{}),
}
}
// Stop is used to stop the goroutine outside.
func (t *Tomb) Stop() {
close(t.stop)
<-t.done
}
// Stopping is used by the goroutine to tell whether it should stop.
func (t *Tomb) Stopping() <-chan struct{} {
return t.stop
}
// Done is used by the goroutine to inform that it has stopped.
func (t *Tomb) Done() {
close(t.done)
}

View File

@@ -0,0 +1,39 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"reflect"
"testing"
)
func TestTomb(t *testing.T) {
tomb := NewTomb()
workflow := []string{}
expected := []string{"stop", "stopping", "stopped"}
go func() {
defer tomb.Done()
<-tomb.Stopping()
workflow = append(workflow, "stopping")
}()
workflow = append(workflow, "stop")
tomb.Stop()
workflow = append(workflow, "stopped")
if !reflect.DeepEqual(workflow, expected) {
t.Errorf("expected workflow %v, got %v", expected, workflow)
}
}

View File

@@ -0,0 +1,100 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package problemclient
import (
"fmt"
"reflect"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
)
// FakeProblemClient is a fake problem client for debug.
type FakeProblemClient struct {
sync.Mutex
conditions map[api.NodeConditionType]api.NodeCondition
errors map[string]error
}
// NewFakeProblemClient creates a new fake problem client.
func NewFakeProblemClient() *FakeProblemClient {
return &FakeProblemClient{
conditions: make(map[api.NodeConditionType]api.NodeCondition),
errors: make(map[string]error),
}
}
// InjectError injects error to specific function.
func (f *FakeProblemClient) InjectError(fun string, err error) {
f.Lock()
defer f.Unlock()
f.errors[fun] = err
}
// AssertConditions asserts that the internal conditions in fake problem client should match
// the expected conditions.
func (f *FakeProblemClient) AssertConditions(expected []api.NodeCondition) error {
conditions := map[api.NodeConditionType]api.NodeCondition{}
for _, condition := range expected {
conditions[condition.Type] = condition
}
if !reflect.DeepEqual(conditions, f.conditions) {
return fmt.Errorf("expected %+v, got %+v", conditions, f.conditions)
}
return nil
}
// SetConditions is a fake mimic of SetConditions, it only update the internal condition cache.
func (f *FakeProblemClient) SetConditions(conditions []api.NodeCondition, timeout time.Duration) error {
f.Lock()
defer f.Unlock()
if err, ok := f.errors["SetConditions"]; ok {
return err
}
for _, condition := range conditions {
t := condition.Type
if condition.Status == api.ConditionFalse {
delete(f.conditions, t)
} else {
f.conditions[t] = condition
}
}
return nil
}
// GetConditions is a fake mimic of GetConditions, it returns the conditions cached internally.
func (f *FakeProblemClient) GetConditions(types []api.NodeConditionType) ([]*api.NodeCondition, error) {
f.Lock()
defer f.Unlock()
if err, ok := f.errors["GetConditions"]; ok {
return nil, err
}
conditions := []*api.NodeCondition{}
for _, t := range types {
condition, ok := f.conditions[t]
if ok {
conditions = append(conditions, &condition)
}
}
return conditions, nil
}
// Eventf does nothing now.
func (f *FakeProblemClient) Eventf(eventType string, source, reason, messageFmt string, args ...interface{}) {
}

View File

@@ -0,0 +1,202 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package problemclient
import (
"fmt"
"os"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"github.com/golang/glog"
)
// Client is the interface of problem client
type Client interface {
// GetConditions get all specifiec conditions of current node.
GetConditions(conditionTypes []api.NodeConditionType) ([]*api.NodeCondition, error)
// SetConditions set or update conditions of current node.
// Notice that conditions with status api.ConditionFalse will be removed from the condition list, so that
// we'll only have useful conditions in the condition list.
SetConditions(conditions []api.NodeCondition, timeout time.Duration) error
// Eventf reports the event.
Eventf(eventType string, source, reason, messageFmt string, args ...interface{})
}
type nodeProblemClient struct {
nodeName string
client clientset.Interface
clock util.Clock
recorders map[string]record.EventRecorder
nodeRef *api.ObjectReference
}
// NewClientOrDie creates a new problem client, panics if error occurs.
func NewClientOrDie() Client {
c := &nodeProblemClient{clock: util.RealClock{}}
cfg, err := restclient.InClusterConfig()
if err != nil {
panic(err)
}
// TODO(random-liu): Set QPS Limit
c.client, err = clientset.NewForConfig(cfg)
if err != nil {
panic(err)
}
// TODO(random-liu): Get node name from cloud provider
c.nodeName, err = os.Hostname()
if err != nil {
panic(err)
}
c.nodeRef = getNodeRef(c.nodeName)
c.recorders = make(map[string]record.EventRecorder)
return c
}
func (c *nodeProblemClient) GetConditions(conditionTypes []api.NodeConditionType) ([]*api.NodeCondition, error) {
node, err := c.client.Core().Nodes().Get(c.nodeName)
if err != nil {
return nil, err
}
conditions := []*api.NodeCondition{}
for _, conditionType := range conditionTypes {
for _, condition := range node.Status.Conditions {
if condition.Type == conditionType {
conditions = append(conditions, &condition)
}
}
}
return conditions, nil
}
func (c *nodeProblemClient) SetConditions(newConditions []api.NodeCondition, timeout time.Duration) error {
for i := range newConditions {
// Each time we update the conditions, we update the heart beat time
newConditions[i].LastHeartbeatTime = unversioned.NewTime(c.clock.Now())
}
return c.updateNodeCondition(func(conditions []api.NodeCondition) []api.NodeCondition {
for _, condition := range newConditions {
if condition.Status == api.ConditionFalse {
conditions = unsetCondition(condition.Type, conditions)
} else {
conditions = setCondition(condition, conditions)
}
}
return conditions
}, timeout)
}
func (c *nodeProblemClient) Eventf(eventType, source, reason, messageFmt string, args ...interface{}) {
recorder, found := c.recorders[source]
if !found {
// TODO(random-liu): If needed use separate client and QPS limit for event.
recorder = getEventRecorder(c.client, c.nodeName, source)
c.recorders[source] = recorder
}
recorder.Eventf(c.nodeRef, eventType, reason, messageFmt, args...)
}
func unsetCondition(conditionType api.NodeConditionType, conditions []api.NodeCondition) []api.NodeCondition {
result := []api.NodeCondition{}
for _, condition := range conditions {
if condition.Type != conditionType {
result = append(result, condition)
}
}
return result
}
func setCondition(condition api.NodeCondition, conditions []api.NodeCondition) []api.NodeCondition {
found := false
for i := range conditions {
if conditions[i].Type == condition.Type {
target := &conditions[i]
*target = condition
found = true
break
}
}
if !found {
conditions = append(conditions, condition)
}
return conditions
}
func (c *nodeProblemClient) updateNodeCondition(updateFunc func([]api.NodeCondition) []api.NodeCondition, timeout time.Duration) error {
updateTime := c.clock.Now()
for {
node, err := c.client.Core().Nodes().Get(c.nodeName)
if err != nil {
return err
}
node.Status.Conditions = updateFunc(node.Status.Conditions)
_, err = c.client.Core().Nodes().UpdateStatus(node)
if err != nil {
if errors.IsConflict(err) {
glog.Warningf("Conflicting update node status for node %q, will retry soon: %v", c.nodeName, err)
if c.clock.Now().Sub(updateTime) >= timeout {
return timeoutError{node: c.nodeName, timeout: timeout}
}
continue
}
return err
}
return nil
}
}
// getEventRecorder generates a recorder for specific node name and source.
func getEventRecorder(c clientset.Interface, nodeName, source string) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: source, Host: nodeName})
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: c.Core().Events("")})
return recorder
}
func getNodeRef(nodeName string) *api.ObjectReference {
return &api.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeName),
Namespace: "",
}
}
// timeoutError is the error returned by problem client when condition update timeout.
type timeoutError struct {
node string
timeout time.Duration
}
func (e timeoutError) Error() string {
return fmt.Sprintf("update condition for node %q timeout %s", e.node, e.timeout)
}
// IsErrTimeout checks whether a given error is timeout error.
func IsErrTimeout(err error) bool {
_, ok := err.(timeoutError)
return ok
}

View File

@@ -0,0 +1,259 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package problemclient
import (
"fmt"
"reflect"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
)
const (
testSource = "test"
testNode = "test-node"
)
func newFakeProblemClient(fakeClient *fake.Clientset) *nodeProblemClient {
return &nodeProblemClient{
nodeName: testNode,
client: fakeClient,
clock: &util.FakeClock{},
recorders: make(map[string]record.EventRecorder),
nodeRef: getNodeRef(testNode),
}
}
func newFakeNode(conditions []api.NodeCondition) *api.Node {
node := &api.Node{}
node.Name = testNode
node.Status = api.NodeStatus{Conditions: conditions}
return node
}
type action struct {
verb string
resource string
subresource string
}
func TestSetConditions(t *testing.T) {
now := time.Now()
expectedActions := []action{
{
verb: "get",
resource: "nodes",
},
{
verb: "update",
resource: "nodes",
subresource: "status",
},
}
for _, test := range []struct {
init []api.NodeCondition
update []api.NodeCondition
expected []api.NodeCondition
}{
// Init condition with the same type should be override
{
init: []api.NodeCondition{
{
Type: "TestType",
Status: api.ConditionTrue,
},
},
update: []api.NodeCondition{
{
Type: "TestType",
Status: api.ConditionTrue,
LastTransitionTime: unversioned.NewTime(now),
Reason: "TestReason",
Message: "TestMessage",
},
},
expected: []api.NodeCondition{
{
// LastHeartbeatTime should be updated in SetConditions
Type: "TestType",
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.NewTime(now),
LastTransitionTime: unversioned.NewTime(now),
Reason: "TestReason",
Message: "TestMessage",
},
},
},
// Init condition with different type should be kept
{
init: []api.NodeCondition{
{
Type: "InitType",
Status: api.ConditionTrue,
LastTransitionTime: unversioned.NewTime(now),
Reason: "InitReason",
Message: "InitMessage",
},
},
update: []api.NodeCondition{
{
Type: "TestType",
Status: api.ConditionTrue,
LastTransitionTime: unversioned.NewTime(now),
Reason: "TestReason",
Message: "TestMessage",
},
},
expected: []api.NodeCondition{
{
Type: "InitType",
Status: api.ConditionTrue,
LastTransitionTime: unversioned.NewTime(now),
Reason: "InitReason",
Message: "InitMessage",
},
{
// LastHeartbeatTime should be updated in SetConditions
Type: "TestType",
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.NewTime(now),
LastTransitionTime: unversioned.NewTime(now),
Reason: "TestReason",
Message: "TestMessage",
},
},
},
// Condition with false status should be removed
{
init: []api.NodeCondition{
{
Type: "TestType",
Status: api.ConditionTrue,
LastHeartbeatTime: unversioned.NewTime(now),
LastTransitionTime: unversioned.NewTime(now),
Reason: "TestReason",
Message: "TestMessage",
},
},
update: []api.NodeCondition{
{
Type: "TestType",
Status: api.ConditionFalse,
},
},
expected: []api.NodeCondition{},
},
} {
fakeClient := fake.NewSimpleClientset(newFakeNode(test.init))
client := newFakeProblemClient(fakeClient)
clock := client.clock.(*util.FakeClock)
clock.SetTime(now)
client.SetConditions(test.update, 10*time.Second)
// The actions should match the expected actions
actions := fakeClient.Actions()
if len(expectedActions) != len(actions) {
t.Errorf("expected actions %+v, got %+v", expectedActions, fakeClient.Actions())
continue
}
for i, a := range actions {
if !a.Matches(expectedActions[i].verb, expectedActions[i].resource) || a.GetSubresource() != expectedActions[i].subresource {
t.Errorf("expected action %+v, got %+v", expectedActions[i], a)
}
}
// The last action should be an update
a, ok := actions[len(actions)-1].(core.UpdateAction)
if !ok {
t.Errorf("expected the last action to be update, got %+v", actions[len(actions)-1])
}
// The updated node conditions should match the expected conditions
node, ok := a.GetObject().(*api.Node)
if !ok {
t.Errorf("expected the update object to be node, got %+v", a.GetObject())
}
if !api.Semantic.DeepEqual(test.expected, node.Status.Conditions) {
t.Errorf("expected conditions %+v, got %+v", test.expected, node.Status.Conditions)
}
}
}
func TestSetConditionsError(t *testing.T) {
timeout := time.Duration(0)
node := newFakeNode([]api.NodeCondition{})
for c, test := range []struct {
errMap map[string]error
expectedErr error
}{
{
// Get error
errMap: map[string]error{"get": fmt.Errorf("get error")},
expectedErr: fmt.Errorf("get error"),
},
{
// Update error
errMap: map[string]error{"update": fmt.Errorf("update error")},
expectedErr: fmt.Errorf("update error"),
},
{
// Timeout error
errMap: map[string]error{
"update": &errors.StatusError{ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonConflict}},
},
expectedErr: timeoutError{node: testNode, timeout: timeout},
},
{
// No error
errMap: map[string]error{},
expectedErr: nil,
},
} {
fakeClient := &fake.Clientset{}
client := newFakeProblemClient(fakeClient)
fakeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
return true, node, test.errMap["get"]
})
fakeClient.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
return true, node, test.errMap["update"]
})
err := client.SetConditions([]api.NodeCondition{}, timeout)
if !reflect.DeepEqual(err, test.expectedErr) {
t.Errorf("case %d: expected error %v, got %v", c+1, test.expectedErr, err)
}
}
}
func TestEvent(t *testing.T) {
fakeRecorder := record.NewFakeRecorder(1)
client := newFakeProblemClient(&fake.Clientset{})
client.recorders[testSource] = fakeRecorder
client.Eventf(api.EventTypeWarning, testSource, "test reason", "test message")
expected := fmt.Sprintf("%s %s %s", api.EventTypeWarning, "test reason", "test message")
got := <-fakeRecorder.Events
if expected != got {
t.Errorf("expected event %q, got %q", expected, got)
}
}

View File

@@ -0,0 +1,74 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package problemdetector
import (
"github.com/golang/glog"
kubeutil "k8s.io/kubernetes/pkg/util"
"k8s.io/node-problem-detector/pkg/condition"
"k8s.io/node-problem-detector/pkg/kernelmonitor"
"k8s.io/node-problem-detector/pkg/problemclient"
"k8s.io/node-problem-detector/pkg/util"
)
// ProblemDetector collects statuses from all problem daemons and update the node condition and send node event.
type ProblemDetector interface {
Run() error
}
type problemDetector struct {
client problemclient.Client
conditionManager condition.ConditionManager
// TODO(random-liu): Use slices of problem daemons if multiple monitors are needed in the future
monitor kernelmonitor.KernelMonitor
}
// NewProblemDetector creates the problem detector. Currently we just directly passed in the problem daemons, but
// in the future we may want to let the problem daemons register themselves.
func NewProblemDetector(monitor kernelmonitor.KernelMonitor) ProblemDetector {
client := problemclient.NewClientOrDie()
return &problemDetector{
client: client,
conditionManager: condition.NewConditionManager(client, kubeutil.RealClock{}),
monitor: monitor,
}
}
// Run starts the problem detector.
func (p *problemDetector) Run() error {
p.conditionManager.Start()
ch, err := p.monitor.Start()
if err != nil {
return err
}
glog.Info("Problem detector started")
for {
select {
case status, ok := <-ch:
if !ok {
glog.Errorf("Monitor stopped unexpectedly")
break
}
if status.Event != nil {
p.client.Eventf(util.ConvertToAPIEventType(status.Event.Severity), status.Source, status.Event.Reason, status.Event.Message)
}
p.conditionManager.UpdateCondition(status.Condition)
}
}
}

78
pkg/types/types.go Normal file
View File

@@ -0,0 +1,78 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"time"
)
// The following types are used internally in problem detector. In the future this could be the
// interface between node problem detector and other problem daemons.
// We added these types because:
// 1) The kubernetes api packages are too heavy.
// 2) We want to make the interface independent with kubernetes api change.
// Severity is the severity of the problem event. Now we only have 2 severity levels: Info and Warn,
// which are corresponding to the current kubernetes event types. We may want to add more severity
// levels in the future.
type Severity string
const (
// Info is translated to a normal event.
Info Severity = "info"
// Warn is translated to a warning event.
Warn Severity = "warn"
)
// Condition is the node condition used internally by problem detector.
type Condition struct {
// Type is the condition type. It should describe the condition of node in problem. For example
// KernelDeadlock, OutOfResource etc.
Type string `json:"type"`
// Status indicates whether the node is in the condition or not.
Status bool `json:"status"`
// Transition is the time when the node transits to this condition.
Transition time.Time `json:"transition"`
// Reason is a short reason of why node goes into this condition.
Reason string `json:"reason"`
// Message is a human readable message of why node goes into this condition.
Message string `json:"message"`
}
// Event is the event used internally by node problem detector.
type Event struct {
// Severity is the severity level of the event.
Severity Severity `json:"severity"`
// Timestamp is the time when the event is generated.
Timestamp time.Time `json:"timestamp"`
// Reason is a short reason of why the event is generated.
Reason string `json:"reason"`
// Message is a human readable message of why the event is generated.
Message string `json:"message"`
}
// Status is the status other problem daemons should report to node problem detector.
type Status struct {
// Source is the name of the problem daemon.
Source string `json:"source"`
// Event is the temporary node problem event. If the status is only a condition update,
// this field could be nil.
Event *Event `json:"event"`
// Condition is the permanent node condition. The problem daemon should always report the
// newest node condition in this field.
Condition Condition `json:"condition"`
}

63
pkg/util/convert.go Normal file
View File

@@ -0,0 +1,63 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/node-problem-detector/pkg/types"
)
// ConvertToAPICondition converts the internal node condition to api.NodeCondition.
func ConvertToAPICondition(condition types.Condition) api.NodeCondition {
return api.NodeCondition{
Type: api.NodeConditionType(condition.Type),
Status: ConvertToAPIConditionStatus(condition.Status),
LastTransitionTime: ConvertToAPITimestamp(condition.Transition),
Reason: condition.Reason,
Message: condition.Message,
}
}
// ConvertToAPIConditionStatus converts the internal node condition status to api.ConditionStatus.
func ConvertToAPIConditionStatus(status bool) api.ConditionStatus {
if status {
return api.ConditionTrue
}
return api.ConditionFalse
}
// ConvertToAPIEventType converts the internal severity to event type.
func ConvertToAPIEventType(severity types.Severity) string {
switch severity {
case types.Info:
return api.EventTypeNormal
case types.Warn:
return api.EventTypeWarning
default:
// Should never get here, just in case
return api.EventTypeNormal
}
}
// ConvertToAPITimestamp converts the timestamp to unversioned.Time
func ConvertToAPITimestamp(timestamp time.Time) unversioned.Time {
return unversioned.NewTime(timestamp)
}

49
pkg/util/convert_test.go Normal file
View File

@@ -0,0 +1,49 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/node-problem-detector/pkg/types"
)
func TestConvertToAPICondition(t *testing.T) {
now := time.Now()
condition := types.Condition{
Type: "TestCondition",
Status: true,
Transition: now,
Reason: "test reason",
Message: "test message",
}
expected := api.NodeCondition{
Type: "TestCondition",
Status: api.ConditionTrue,
LastTransitionTime: unversioned.NewTime(now),
Reason: "test reason",
Message: "test message",
}
apiCondition := ConvertToAPICondition(condition)
if apiCondition != expected {
t.Errorf("expected %+v, got %+v", expected, apiCondition)
}
}