Remove opt-in knob for restarting kmsg parser and simplify retry loop

This commit is contained in:
Arjun Raman
2026-03-24 11:20:14 -07:00
parent c530d1f701
commit 748fecd95d
3 changed files with 31 additions and 181 deletions

2
.gitignore vendored
View File

@@ -9,3 +9,5 @@ debug.test
/output/
coverage.out
.idea/
*.DS_Store
*.iml

View File

@@ -22,7 +22,7 @@ import (
"time"
"github.com/euank/go-kmsg-parser/kmsgparser"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
@@ -33,10 +33,6 @@ import (
const (
// retryDelay is the time to wait before attempting to restart the kmsg parser.
retryDelay = 5 * time.Second
// RestartOnErrorKey is the configuration key to enable restarting
// the kmsg parser when the channel closes due to an error.
RestartOnErrorKey = "restartOnError"
)
type kernelLogWatcher struct {
@@ -92,12 +88,6 @@ func (k *kernelLogWatcher) Stop() {
k.tomb.Stop()
}
// restartOnError checks if the restart on error configuration is enabled.
func (k *kernelLogWatcher) restartOnError() bool {
value, exists := k.cfg.PluginConfig[RestartOnErrorKey]
return exists && value == "true"
}
// watchLoop is the main watch loop of kernel log watcher.
func (k *kernelLogWatcher) watchLoop() {
kmsgs := k.kmsgParser.Parse()
@@ -116,14 +106,7 @@ func (k *kernelLogWatcher) watchLoop() {
return
case msg, ok := <-kmsgs:
if !ok {
klog.Error("Kmsg channel closed")
// Only attempt to restart if configured to do so
if !k.restartOnError() {
return
}
klog.Infof("Attempting to restart kmsg parser")
klog.Error("Kmsg channel closed, attempting to restart kmsg parser")
// Close the old parser
if err := k.kmsgParser.Close(); err != nil {
@@ -159,24 +142,24 @@ func (k *kernelLogWatcher) watchLoop() {
}
// retryCreateParser attempts to create a new kmsg parser.
// It tries immediately first, then waits retryDelay between subsequent failures.
// It returns the new message channel and true on success, or nil and false if stopping was signaled.
func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) {
for {
parser, err := kmsgparser.NewParser()
if err == nil {
k.kmsgParser = parser
klog.Infof("Successfully restarted kmsg parser")
return parser.Parse(), true
}
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
select {
case <-k.tomb.Stopping():
klog.Infof("Stop watching kernel log during restart attempt")
return nil, false
case <-time.After(retryDelay):
}
parser, err := kmsgparser.NewParser()
if err != nil {
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
continue
}
k.kmsgParser = parser
klog.Infof("Successfully restarted kmsg parser")
return parser.Parse(), true
}
}

View File

@@ -192,149 +192,6 @@ func TestWatch(t *testing.T) {
}
}
func TestRestartOnErrorConfig(t *testing.T) {
testCases := []struct {
name string
pluginConfig map[string]string
expected bool
}{
{
name: "nil config returns false",
pluginConfig: nil,
expected: false,
},
{
name: "empty config returns false",
pluginConfig: map[string]string{},
expected: false,
},
{
name: "key not present returns false",
pluginConfig: map[string]string{"otherKey": "true"},
expected: false,
},
{
name: "key present but set to false returns false",
pluginConfig: map[string]string{RestartOnErrorKey: "false"},
expected: false,
},
{
name: "key present and set to true returns true",
pluginConfig: map[string]string{RestartOnErrorKey: "true"},
expected: true,
},
{
name: "key present but uppercase TRUE returns false",
pluginConfig: map[string]string{RestartOnErrorKey: "TRUE"},
expected: false,
},
{
name: "key present but mixed case True returns false",
pluginConfig: map[string]string{RestartOnErrorKey: "True"},
expected: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
w := &kernelLogWatcher{
cfg: types.WatcherConfig{
PluginConfig: tc.pluginConfig,
},
}
assert.Equal(t, tc.expected, w.restartOnError())
})
}
}
func TestWatcherStopsOnChannelCloseWhenRestartDisabled(t *testing.T) {
now := time.Now()
mock := &mockKmsgParser{
kmsgs: []kmsgparser.Message{
{Message: "test message", Timestamp: now},
},
closeAfterSend: true,
}
w := &kernelLogWatcher{
cfg: types.WatcherConfig{
PluginConfig: map[string]string{
RestartOnErrorKey: "false",
},
},
startTime: now.Add(-time.Second),
tomb: tomb.NewTomb(),
logCh: make(chan *logtypes.Log, 100),
kmsgParser: mock,
}
logCh, err := w.Watch()
assert.NoError(t, err)
// Should receive the message
select {
case log := <-logCh:
assert.Equal(t, "test message", log.Message)
case <-time.After(time.Second):
t.Fatal("timeout waiting for log message")
}
// Log channel should be closed since restart is disabled
select {
case _, ok := <-logCh:
assert.False(t, ok, "log channel should be closed")
case <-time.After(time.Second):
t.Fatal("timeout waiting for log channel to close")
}
// Verify parser was closed
assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called")
}
func TestWatcherStopsOnChannelCloseWhenRestartNotConfigured(t *testing.T) {
now := time.Now()
mock := &mockKmsgParser{
kmsgs: []kmsgparser.Message{
{Message: "test message", Timestamp: now},
},
closeAfterSend: true,
}
w := &kernelLogWatcher{
cfg: types.WatcherConfig{
// No PluginConfig set
},
startTime: now.Add(-time.Second),
tomb: tomb.NewTomb(),
logCh: make(chan *logtypes.Log, 100),
kmsgParser: mock,
}
logCh, err := w.Watch()
assert.NoError(t, err)
// Should receive the message
select {
case log := <-logCh:
assert.Equal(t, "test message", log.Message)
case <-time.After(time.Second):
t.Fatal("timeout waiting for log message")
}
// Log channel should be closed since restart is not configured
select {
case _, ok := <-logCh:
assert.False(t, ok, "log channel should be closed")
case <-time.After(time.Second):
t.Fatal("timeout waiting for log channel to close")
}
// Verify parser was closed
assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called")
}
func TestWatcherStopsGracefullyOnTombStop(t *testing.T) {
now := time.Now()
@@ -346,11 +203,7 @@ func TestWatcherStopsGracefullyOnTombStop(t *testing.T) {
}
w := &kernelLogWatcher{
cfg: types.WatcherConfig{
PluginConfig: map[string]string{
RestartOnErrorKey: "true",
},
},
cfg: types.WatcherConfig{},
startTime: now.Add(-time.Second),
tomb: tomb.NewTomb(),
logCh: make(chan *logtypes.Log, 100),
@@ -392,7 +245,7 @@ func TestWatcherProcessesEmptyMessages(t *testing.T) {
{Message: "valid message", Timestamp: now.Add(time.Second)},
{Message: "", Timestamp: now.Add(2 * time.Second)},
},
closeAfterSend: true,
closeAfterSend: false,
}
w := &kernelLogWatcher{
@@ -414,10 +267,12 @@ func TestWatcherProcessesEmptyMessages(t *testing.T) {
t.Fatal("timeout waiting for log message")
}
// Channel should close, no more messages
// Stop the watcher and verify channel closes
w.Stop()
select {
case _, ok := <-logCh:
assert.False(t, ok, "log channel should be closed")
assert.False(t, ok, "log channel should be closed after Stop()")
case <-time.After(time.Second):
t.Fatal("timeout waiting for log channel to close")
}
@@ -432,7 +287,7 @@ func TestWatcherTrimsMessageWhitespace(t *testing.T) {
{Message: "\ttabbed message\t", Timestamp: now.Add(time.Second)},
{Message: "\n\nnewlines\n\n", Timestamp: now.Add(2 * time.Second)},
},
closeAfterSend: true,
closeAfterSend: false,
}
w := &kernelLogWatcher{
@@ -456,4 +311,14 @@ func TestWatcherTrimsMessageWhitespace(t *testing.T) {
t.Fatalf("timeout waiting for message: %s", expected)
}
}
// Stop the watcher and verify channel closes
w.Stop()
select {
case _, ok := <-logCh:
assert.False(t, ok, "log channel should be closed after Stop()")
case <-time.After(time.Second):
t.Fatal("timeout waiting for log channel to close")
}
}