Merge pull request #1192 from arjraman/master

feat(logwatchers): Restart kmsg parser if channel closes
This commit is contained in:
Kubernetes Prow Robot
2026-04-10 11:40:20 +05:30
committed by GitHub
3 changed files with 200 additions and 4 deletions

2
.gitignore vendored
View File

@@ -9,3 +9,5 @@ debug.test
/output/
coverage.out
.idea/
*.DS_Store
*.iml

View File

@@ -30,6 +30,11 @@ import (
"k8s.io/node-problem-detector/pkg/util/tomb"
)
const (
// retryDelay is the time to wait before attempting to restart the kmsg parser.
retryDelay = 5 * time.Second
)
type kernelLogWatcher struct {
cfg types.WatcherConfig
startTime time.Time
@@ -101,8 +106,21 @@ func (k *kernelLogWatcher) watchLoop() {
return
case msg, ok := <-kmsgs:
if !ok {
klog.Error("Kmsg channel closed")
return
klog.Error("Kmsg channel closed, attempting to restart kmsg parser")
// Close the old parser
if err := k.kmsgParser.Close(); err != nil {
klog.Errorf("Failed to close kmsg parser: %v", err)
}
// Try to restart
var restarted bool
kmsgs, restarted = k.retryCreateParser()
if !restarted {
// Stopping was signaled
return
}
continue
}
klog.V(5).Infof("got kernel message: %+v", msg)
if msg.Message == "" {
@@ -122,3 +140,26 @@ func (k *kernelLogWatcher) watchLoop() {
}
}
}
// retryCreateParser attempts to create a new kmsg parser.
// It tries immediately first, then waits retryDelay between subsequent failures.
// It returns the new message channel and true on success, or nil and false if stopping was signaled.
func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) {
for {
parser, err := kmsgparser.NewParser()
if err == nil {
k.kmsgParser = parser
klog.Infof("Successfully restarted kmsg parser")
return parser.Parse(), true
}
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
select {
case <-k.tomb.Stopping():
klog.Infof("Stop watching kernel log during restart attempt")
return nil, false
case <-time.After(retryDelay):
}
}
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package kmsg
import (
"sync"
"testing"
"time"
@@ -27,23 +28,44 @@ import (
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
"k8s.io/node-problem-detector/pkg/util"
"k8s.io/node-problem-detector/pkg/util/tomb"
)
type mockKmsgParser struct {
kmsgs []kmsgparser.Message
kmsgs []kmsgparser.Message
closeAfterSend bool
closeCalled bool
mu sync.Mutex
}
func (m *mockKmsgParser) SetLogger(kmsgparser.Logger) {}
func (m *mockKmsgParser) Close() error { return nil }
func (m *mockKmsgParser) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
m.closeCalled = true
return nil
}
func (m *mockKmsgParser) WasCloseCalled() bool {
m.mu.Lock()
defer m.mu.Unlock()
return m.closeCalled
}
func (m *mockKmsgParser) Parse() <-chan kmsgparser.Message {
c := make(chan kmsgparser.Message)
go func() {
for _, msg := range m.kmsgs {
c <- msg
}
if m.closeAfterSend {
close(c)
}
}()
return c
}
func (m *mockKmsgParser) SeekEnd() error { return nil }
func TestWatch(t *testing.T) {
@@ -169,3 +191,134 @@ func TestWatch(t *testing.T) {
}
}
}
func TestWatcherStopsGracefullyOnTombStop(t *testing.T) {
now := time.Now()
mock := &mockKmsgParser{
kmsgs: []kmsgparser.Message{
{Message: "test message", Timestamp: now},
},
closeAfterSend: false, // Don't close, let tomb stop it
}
w := &kernelLogWatcher{
cfg: types.WatcherConfig{},
startTime: now.Add(-time.Second),
tomb: tomb.NewTomb(),
logCh: make(chan *logtypes.Log, 100),
kmsgParser: mock,
}
logCh, err := w.Watch()
assert.NoError(t, err)
// Should receive the message
select {
case log := <-logCh:
assert.Equal(t, "test message", log.Message)
case <-time.After(time.Second):
t.Fatal("timeout waiting for log message")
}
// Stop the watcher
w.Stop()
// Log channel should be closed after stop
select {
case _, ok := <-logCh:
assert.False(t, ok, "log channel should be closed after Stop()")
case <-time.After(time.Second):
t.Fatal("timeout waiting for log channel to close after Stop()")
}
// Verify parser was closed
assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called")
}
func TestWatcherProcessesEmptyMessages(t *testing.T) {
now := time.Now()
mock := &mockKmsgParser{
kmsgs: []kmsgparser.Message{
{Message: "", Timestamp: now},
{Message: "valid message", Timestamp: now.Add(time.Second)},
{Message: "", Timestamp: now.Add(2 * time.Second)},
},
closeAfterSend: false,
}
w := &kernelLogWatcher{
cfg: types.WatcherConfig{},
startTime: now.Add(-time.Second),
tomb: tomb.NewTomb(),
logCh: make(chan *logtypes.Log, 100),
kmsgParser: mock,
}
logCh, err := w.Watch()
assert.NoError(t, err)
// Should only receive the non-empty message
select {
case log := <-logCh:
assert.Equal(t, "valid message", log.Message)
case <-time.After(time.Second):
t.Fatal("timeout waiting for log message")
}
// Stop the watcher and verify channel closes
w.Stop()
select {
case _, ok := <-logCh:
assert.False(t, ok, "log channel should be closed after Stop()")
case <-time.After(time.Second):
t.Fatal("timeout waiting for log channel to close")
}
}
func TestWatcherTrimsMessageWhitespace(t *testing.T) {
now := time.Now()
mock := &mockKmsgParser{
kmsgs: []kmsgparser.Message{
{Message: " message with spaces ", Timestamp: now},
{Message: "\ttabbed message\t", Timestamp: now.Add(time.Second)},
{Message: "\n\nnewlines\n\n", Timestamp: now.Add(2 * time.Second)},
},
closeAfterSend: false,
}
w := &kernelLogWatcher{
cfg: types.WatcherConfig{},
startTime: now.Add(-time.Second),
tomb: tomb.NewTomb(),
logCh: make(chan *logtypes.Log, 100),
kmsgParser: mock,
}
logCh, err := w.Watch()
assert.NoError(t, err)
expectedMessages := []string{"message with spaces", "tabbed message", "newlines"}
for _, expected := range expectedMessages {
select {
case log := <-logCh:
assert.Equal(t, expected, log.Message)
case <-time.After(time.Second):
t.Fatalf("timeout waiting for message: %s", expected)
}
}
// Stop the watcher and verify channel closes
w.Stop()
select {
case _, ok := <-logCh:
assert.False(t, ok, "log channel should be closed after Stop()")
case <-time.After(time.Second):
t.Fatal("timeout waiting for log channel to close")
}
}