From 748fecd95df00013e9c8be852277a4db0d704056 Mon Sep 17 00:00:00 2001 From: Arjun Raman Date: Tue, 24 Mar 2026 11:20:14 -0700 Subject: [PATCH] Remove opt-in knob for restarting kmsg parser and simplify retry loop --- .gitignore | 2 + .../logwatchers/kmsg/log_watcher_linux.go | 41 ++--- .../kmsg/log_watcher_linux_test.go | 169 ++---------------- 3 files changed, 31 insertions(+), 181 deletions(-) diff --git a/.gitignore b/.gitignore index fd790a32..d0d9e335 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ debug.test /output/ coverage.out .idea/ +*.DS_Store +*.iml diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go index cc038c43..2728ba83 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go @@ -22,7 +22,7 @@ import ( "time" "github.com/euank/go-kmsg-parser/kmsgparser" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" @@ -33,10 +33,6 @@ import ( const ( // retryDelay is the time to wait before attempting to restart the kmsg parser. retryDelay = 5 * time.Second - - // RestartOnErrorKey is the configuration key to enable restarting - // the kmsg parser when the channel closes due to an error. - RestartOnErrorKey = "restartOnError" ) type kernelLogWatcher struct { @@ -92,12 +88,6 @@ func (k *kernelLogWatcher) Stop() { k.tomb.Stop() } -// restartOnError checks if the restart on error configuration is enabled. -func (k *kernelLogWatcher) restartOnError() bool { - value, exists := k.cfg.PluginConfig[RestartOnErrorKey] - return exists && value == "true" -} - // watchLoop is the main watch loop of kernel log watcher. func (k *kernelLogWatcher) watchLoop() { kmsgs := k.kmsgParser.Parse() @@ -116,14 +106,7 @@ func (k *kernelLogWatcher) watchLoop() { return case msg, ok := <-kmsgs: if !ok { - klog.Error("Kmsg channel closed") - - // Only attempt to restart if configured to do so - if !k.restartOnError() { - return - } - - klog.Infof("Attempting to restart kmsg parser") + klog.Error("Kmsg channel closed, attempting to restart kmsg parser") // Close the old parser if err := k.kmsgParser.Close(); err != nil { @@ -159,24 +142,24 @@ func (k *kernelLogWatcher) watchLoop() { } // retryCreateParser attempts to create a new kmsg parser. +// It tries immediately first, then waits retryDelay between subsequent failures. // It returns the new message channel and true on success, or nil and false if stopping was signaled. func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) { for { + parser, err := kmsgparser.NewParser() + if err == nil { + k.kmsgParser = parser + klog.Infof("Successfully restarted kmsg parser") + return parser.Parse(), true + } + + klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err) + select { case <-k.tomb.Stopping(): klog.Infof("Stop watching kernel log during restart attempt") return nil, false case <-time.After(retryDelay): } - - parser, err := kmsgparser.NewParser() - if err != nil { - klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err) - continue - } - - k.kmsgParser = parser - klog.Infof("Successfully restarted kmsg parser") - return parser.Parse(), true } } diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go index 4ddbcaf0..b29a0431 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go @@ -192,149 +192,6 @@ func TestWatch(t *testing.T) { } } -func TestRestartOnErrorConfig(t *testing.T) { - testCases := []struct { - name string - pluginConfig map[string]string - expected bool - }{ - { - name: "nil config returns false", - pluginConfig: nil, - expected: false, - }, - { - name: "empty config returns false", - pluginConfig: map[string]string{}, - expected: false, - }, - { - name: "key not present returns false", - pluginConfig: map[string]string{"otherKey": "true"}, - expected: false, - }, - { - name: "key present but set to false returns false", - pluginConfig: map[string]string{RestartOnErrorKey: "false"}, - expected: false, - }, - { - name: "key present and set to true returns true", - pluginConfig: map[string]string{RestartOnErrorKey: "true"}, - expected: true, - }, - { - name: "key present but uppercase TRUE returns false", - pluginConfig: map[string]string{RestartOnErrorKey: "TRUE"}, - expected: false, - }, - { - name: "key present but mixed case True returns false", - pluginConfig: map[string]string{RestartOnErrorKey: "True"}, - expected: false, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - w := &kernelLogWatcher{ - cfg: types.WatcherConfig{ - PluginConfig: tc.pluginConfig, - }, - } - assert.Equal(t, tc.expected, w.restartOnError()) - }) - } -} - -func TestWatcherStopsOnChannelCloseWhenRestartDisabled(t *testing.T) { - now := time.Now() - - mock := &mockKmsgParser{ - kmsgs: []kmsgparser.Message{ - {Message: "test message", Timestamp: now}, - }, - closeAfterSend: true, - } - - w := &kernelLogWatcher{ - cfg: types.WatcherConfig{ - PluginConfig: map[string]string{ - RestartOnErrorKey: "false", - }, - }, - startTime: now.Add(-time.Second), - tomb: tomb.NewTomb(), - logCh: make(chan *logtypes.Log, 100), - kmsgParser: mock, - } - - logCh, err := w.Watch() - assert.NoError(t, err) - - // Should receive the message - select { - case log := <-logCh: - assert.Equal(t, "test message", log.Message) - case <-time.After(time.Second): - t.Fatal("timeout waiting for log message") - } - - // Log channel should be closed since restart is disabled - select { - case _, ok := <-logCh: - assert.False(t, ok, "log channel should be closed") - case <-time.After(time.Second): - t.Fatal("timeout waiting for log channel to close") - } - - // Verify parser was closed - assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called") -} - -func TestWatcherStopsOnChannelCloseWhenRestartNotConfigured(t *testing.T) { - now := time.Now() - - mock := &mockKmsgParser{ - kmsgs: []kmsgparser.Message{ - {Message: "test message", Timestamp: now}, - }, - closeAfterSend: true, - } - - w := &kernelLogWatcher{ - cfg: types.WatcherConfig{ - // No PluginConfig set - }, - startTime: now.Add(-time.Second), - tomb: tomb.NewTomb(), - logCh: make(chan *logtypes.Log, 100), - kmsgParser: mock, - } - - logCh, err := w.Watch() - assert.NoError(t, err) - - // Should receive the message - select { - case log := <-logCh: - assert.Equal(t, "test message", log.Message) - case <-time.After(time.Second): - t.Fatal("timeout waiting for log message") - } - - // Log channel should be closed since restart is not configured - select { - case _, ok := <-logCh: - assert.False(t, ok, "log channel should be closed") - case <-time.After(time.Second): - t.Fatal("timeout waiting for log channel to close") - } - - // Verify parser was closed - assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called") -} - func TestWatcherStopsGracefullyOnTombStop(t *testing.T) { now := time.Now() @@ -346,11 +203,7 @@ func TestWatcherStopsGracefullyOnTombStop(t *testing.T) { } w := &kernelLogWatcher{ - cfg: types.WatcherConfig{ - PluginConfig: map[string]string{ - RestartOnErrorKey: "true", - }, - }, + cfg: types.WatcherConfig{}, startTime: now.Add(-time.Second), tomb: tomb.NewTomb(), logCh: make(chan *logtypes.Log, 100), @@ -392,7 +245,7 @@ func TestWatcherProcessesEmptyMessages(t *testing.T) { {Message: "valid message", Timestamp: now.Add(time.Second)}, {Message: "", Timestamp: now.Add(2 * time.Second)}, }, - closeAfterSend: true, + closeAfterSend: false, } w := &kernelLogWatcher{ @@ -414,10 +267,12 @@ func TestWatcherProcessesEmptyMessages(t *testing.T) { t.Fatal("timeout waiting for log message") } - // Channel should close, no more messages + // Stop the watcher and verify channel closes + w.Stop() + select { case _, ok := <-logCh: - assert.False(t, ok, "log channel should be closed") + assert.False(t, ok, "log channel should be closed after Stop()") case <-time.After(time.Second): t.Fatal("timeout waiting for log channel to close") } @@ -432,7 +287,7 @@ func TestWatcherTrimsMessageWhitespace(t *testing.T) { {Message: "\ttabbed message\t", Timestamp: now.Add(time.Second)}, {Message: "\n\nnewlines\n\n", Timestamp: now.Add(2 * time.Second)}, }, - closeAfterSend: true, + closeAfterSend: false, } w := &kernelLogWatcher{ @@ -456,4 +311,14 @@ func TestWatcherTrimsMessageWhitespace(t *testing.T) { t.Fatalf("timeout waiting for message: %s", expected) } } + + // Stop the watcher and verify channel closes + w.Stop() + + select { + case _, ok := <-logCh: + assert.False(t, ok, "log channel should be closed after Stop()") + case <-time.After(time.Second): + t.Fatal("timeout waiting for log channel to close") + } }