mirror of
https://github.com/kubernetes/node-problem-detector.git
synced 2026-05-07 01:37:06 +00:00
Restart kmsg on error
This commit is contained in:
@@ -22,7 +22,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/euank/go-kmsg-parser/kmsgparser"
|
||||
"k8s.io/klog/v2"
|
||||
klog "k8s.io/klog/v2"
|
||||
|
||||
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
|
||||
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
|
||||
@@ -30,6 +30,15 @@ import (
|
||||
"k8s.io/node-problem-detector/pkg/util/tomb"
|
||||
)
|
||||
|
||||
const (
|
||||
// retryDelay is the time to wait before attempting to restart the kmsg parser.
|
||||
retryDelay = 5 * time.Second
|
||||
|
||||
// RestartOnErrorKey is the configuration key to enable restarting
|
||||
// the kmsg parser when the channel closes due to an error.
|
||||
RestartOnErrorKey = "restartOnError"
|
||||
)
|
||||
|
||||
type kernelLogWatcher struct {
|
||||
cfg types.WatcherConfig
|
||||
startTime time.Time
|
||||
@@ -83,6 +92,12 @@ func (k *kernelLogWatcher) Stop() {
|
||||
k.tomb.Stop()
|
||||
}
|
||||
|
||||
// restartOnError checks if the restart on error configuration is enabled.
|
||||
func (k *kernelLogWatcher) restartOnError() bool {
|
||||
value, exists := k.cfg.PluginConfig[RestartOnErrorKey]
|
||||
return exists && value == "true"
|
||||
}
|
||||
|
||||
// watchLoop is the main watch loop of kernel log watcher.
|
||||
func (k *kernelLogWatcher) watchLoop() {
|
||||
kmsgs := k.kmsgParser.Parse()
|
||||
@@ -102,7 +117,28 @@ func (k *kernelLogWatcher) watchLoop() {
|
||||
case msg, ok := <-kmsgs:
|
||||
if !ok {
|
||||
klog.Error("Kmsg channel closed")
|
||||
return
|
||||
|
||||
// Only attempt to restart if configured to do so
|
||||
if !k.restartOnError() {
|
||||
klog.Infof("Restart on error not enabled, stopping watcher")
|
||||
return
|
||||
}
|
||||
|
||||
klog.Infof("Attempting to restart kmsg parser")
|
||||
|
||||
// Close the old parser
|
||||
if err := k.kmsgParser.Close(); err != nil {
|
||||
klog.Errorf("Failed to close kmsg parser: %v", err)
|
||||
}
|
||||
|
||||
// Try to restart with backoff
|
||||
var restarted bool
|
||||
kmsgs, restarted = k.retryCreateParser()
|
||||
if !restarted {
|
||||
// Stopping was signaled
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
klog.V(5).Infof("got kernel message: %+v", msg)
|
||||
if msg.Message == "" {
|
||||
@@ -122,3 +158,26 @@ func (k *kernelLogWatcher) watchLoop() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// retryCreateParser attempts to create a new kmsg parser.
|
||||
// It returns the new message channel and true on success, or nil and false if stopping was signaled.
|
||||
func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) {
|
||||
for {
|
||||
select {
|
||||
case <-k.tomb.Stopping():
|
||||
klog.Infof("Stop watching kernel log during restart attempt")
|
||||
return nil, false
|
||||
case <-time.After(retryDelay):
|
||||
}
|
||||
|
||||
parser, err := kmsgparser.NewParser()
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
|
||||
continue
|
||||
}
|
||||
|
||||
k.kmsgParser = parser
|
||||
klog.Infof("Successfully restarted kmsg parser")
|
||||
return parser.Parse(), true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package kmsg
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -27,23 +28,44 @@ import (
|
||||
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
|
||||
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
|
||||
"k8s.io/node-problem-detector/pkg/util"
|
||||
"k8s.io/node-problem-detector/pkg/util/tomb"
|
||||
)
|
||||
|
||||
type mockKmsgParser struct {
|
||||
kmsgs []kmsgparser.Message
|
||||
kmsgs []kmsgparser.Message
|
||||
closeAfterSend bool
|
||||
closeCalled bool
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (m *mockKmsgParser) SetLogger(kmsgparser.Logger) {}
|
||||
func (m *mockKmsgParser) Close() error { return nil }
|
||||
|
||||
func (m *mockKmsgParser) Close() error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.closeCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockKmsgParser) WasCloseCalled() bool {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.closeCalled
|
||||
}
|
||||
|
||||
func (m *mockKmsgParser) Parse() <-chan kmsgparser.Message {
|
||||
c := make(chan kmsgparser.Message)
|
||||
go func() {
|
||||
for _, msg := range m.kmsgs {
|
||||
c <- msg
|
||||
}
|
||||
if m.closeAfterSend {
|
||||
close(c)
|
||||
}
|
||||
}()
|
||||
return c
|
||||
}
|
||||
|
||||
func (m *mockKmsgParser) SeekEnd() error { return nil }
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
@@ -169,3 +191,269 @@ func TestWatch(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestartOnErrorConfig(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
pluginConfig map[string]string
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "nil config returns false",
|
||||
pluginConfig: nil,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "empty config returns false",
|
||||
pluginConfig: map[string]string{},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "key not present returns false",
|
||||
pluginConfig: map[string]string{"otherKey": "true"},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "key present but set to false returns false",
|
||||
pluginConfig: map[string]string{RestartOnErrorKey: "false"},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "key present and set to true returns true",
|
||||
pluginConfig: map[string]string{RestartOnErrorKey: "true"},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "key present but uppercase TRUE returns false",
|
||||
pluginConfig: map[string]string{RestartOnErrorKey: "TRUE"},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "key present but mixed case True returns false",
|
||||
pluginConfig: map[string]string{RestartOnErrorKey: "True"},
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
w := &kernelLogWatcher{
|
||||
cfg: types.WatcherConfig{
|
||||
PluginConfig: tc.pluginConfig,
|
||||
},
|
||||
}
|
||||
assert.Equal(t, tc.expected, w.restartOnError())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatcherStopsOnChannelCloseWhenRestartDisabled(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
mock := &mockKmsgParser{
|
||||
kmsgs: []kmsgparser.Message{
|
||||
{Message: "test message", Timestamp: now},
|
||||
},
|
||||
closeAfterSend: true,
|
||||
}
|
||||
|
||||
w := &kernelLogWatcher{
|
||||
cfg: types.WatcherConfig{
|
||||
PluginConfig: map[string]string{
|
||||
RestartOnErrorKey: "false",
|
||||
},
|
||||
},
|
||||
startTime: now.Add(-time.Second),
|
||||
tomb: tomb.NewTomb(),
|
||||
logCh: make(chan *logtypes.Log, 100),
|
||||
kmsgParser: mock,
|
||||
}
|
||||
|
||||
logCh, err := w.Watch()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Should receive the message
|
||||
select {
|
||||
case log := <-logCh:
|
||||
assert.Equal(t, "test message", log.Message)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for log message")
|
||||
}
|
||||
|
||||
// Log channel should be closed since restart is disabled
|
||||
select {
|
||||
case _, ok := <-logCh:
|
||||
assert.False(t, ok, "log channel should be closed")
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for log channel to close")
|
||||
}
|
||||
|
||||
// Verify parser was closed
|
||||
assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called")
|
||||
}
|
||||
|
||||
func TestWatcherStopsOnChannelCloseWhenRestartNotConfigured(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
mock := &mockKmsgParser{
|
||||
kmsgs: []kmsgparser.Message{
|
||||
{Message: "test message", Timestamp: now},
|
||||
},
|
||||
closeAfterSend: true,
|
||||
}
|
||||
|
||||
w := &kernelLogWatcher{
|
||||
cfg: types.WatcherConfig{
|
||||
// No PluginConfig set
|
||||
},
|
||||
startTime: now.Add(-time.Second),
|
||||
tomb: tomb.NewTomb(),
|
||||
logCh: make(chan *logtypes.Log, 100),
|
||||
kmsgParser: mock,
|
||||
}
|
||||
|
||||
logCh, err := w.Watch()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Should receive the message
|
||||
select {
|
||||
case log := <-logCh:
|
||||
assert.Equal(t, "test message", log.Message)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for log message")
|
||||
}
|
||||
|
||||
// Log channel should be closed since restart is not configured
|
||||
select {
|
||||
case _, ok := <-logCh:
|
||||
assert.False(t, ok, "log channel should be closed")
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for log channel to close")
|
||||
}
|
||||
|
||||
// Verify parser was closed
|
||||
assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called")
|
||||
}
|
||||
|
||||
func TestWatcherStopsGracefullyOnTombStop(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
mock := &mockKmsgParser{
|
||||
kmsgs: []kmsgparser.Message{
|
||||
{Message: "test message", Timestamp: now},
|
||||
},
|
||||
closeAfterSend: false, // Don't close, let tomb stop it
|
||||
}
|
||||
|
||||
w := &kernelLogWatcher{
|
||||
cfg: types.WatcherConfig{
|
||||
PluginConfig: map[string]string{
|
||||
RestartOnErrorKey: "true",
|
||||
},
|
||||
},
|
||||
startTime: now.Add(-time.Second),
|
||||
tomb: tomb.NewTomb(),
|
||||
logCh: make(chan *logtypes.Log, 100),
|
||||
kmsgParser: mock,
|
||||
}
|
||||
|
||||
logCh, err := w.Watch()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Should receive the message
|
||||
select {
|
||||
case log := <-logCh:
|
||||
assert.Equal(t, "test message", log.Message)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for log message")
|
||||
}
|
||||
|
||||
// Stop the watcher
|
||||
w.Stop()
|
||||
|
||||
// Log channel should be closed after stop
|
||||
select {
|
||||
case _, ok := <-logCh:
|
||||
assert.False(t, ok, "log channel should be closed after Stop()")
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for log channel to close after Stop()")
|
||||
}
|
||||
|
||||
// Verify parser was closed
|
||||
assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called")
|
||||
}
|
||||
|
||||
func TestWatcherProcessesEmptyMessages(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
mock := &mockKmsgParser{
|
||||
kmsgs: []kmsgparser.Message{
|
||||
{Message: "", Timestamp: now},
|
||||
{Message: "valid message", Timestamp: now.Add(time.Second)},
|
||||
{Message: "", Timestamp: now.Add(2 * time.Second)},
|
||||
},
|
||||
closeAfterSend: true,
|
||||
}
|
||||
|
||||
w := &kernelLogWatcher{
|
||||
cfg: types.WatcherConfig{},
|
||||
startTime: now.Add(-time.Second),
|
||||
tomb: tomb.NewTomb(),
|
||||
logCh: make(chan *logtypes.Log, 100),
|
||||
kmsgParser: mock,
|
||||
}
|
||||
|
||||
logCh, err := w.Watch()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Should only receive the non-empty message
|
||||
select {
|
||||
case log := <-logCh:
|
||||
assert.Equal(t, "valid message", log.Message)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for log message")
|
||||
}
|
||||
|
||||
// Channel should close, no more messages
|
||||
select {
|
||||
case _, ok := <-logCh:
|
||||
assert.False(t, ok, "log channel should be closed")
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timeout waiting for log channel to close")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatcherTrimsMessageWhitespace(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
mock := &mockKmsgParser{
|
||||
kmsgs: []kmsgparser.Message{
|
||||
{Message: " message with spaces ", Timestamp: now},
|
||||
{Message: "\ttabbed message\t", Timestamp: now.Add(time.Second)},
|
||||
{Message: "\n\nnewlines\n\n", Timestamp: now.Add(2 * time.Second)},
|
||||
},
|
||||
closeAfterSend: true,
|
||||
}
|
||||
|
||||
w := &kernelLogWatcher{
|
||||
cfg: types.WatcherConfig{},
|
||||
startTime: now.Add(-time.Second),
|
||||
tomb: tomb.NewTomb(),
|
||||
logCh: make(chan *logtypes.Log, 100),
|
||||
kmsgParser: mock,
|
||||
}
|
||||
|
||||
logCh, err := w.Watch()
|
||||
assert.NoError(t, err)
|
||||
|
||||
expectedMessages := []string{"message with spaces", "tabbed message", "newlines"}
|
||||
|
||||
for _, expected := range expectedMessages {
|
||||
select {
|
||||
case log := <-logCh:
|
||||
assert.Equal(t, expected, log.Message)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("timeout waiting for message: %s", expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user