From b3379b0d236227067148dd1e2d5c9f05024b9ac2 Mon Sep 17 00:00:00 2001 From: Ciprian Hacman Date: Fri, 10 Apr 2026 09:01:55 +0300 Subject: [PATCH] fix(logwatchers/kmsg): prevent duplicate message replay after restart --- .../logwatchers/kmsg/log_watcher_linux.go | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go index 2728ba83..bd25d50a 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go @@ -113,7 +113,8 @@ func (k *kernelLogWatcher) watchLoop() { klog.Errorf("Failed to close kmsg parser: %v", err) } - // Try to restart + // Try to restart immediately. retryCreateParser() applies backoff only + // after a failed NewParser() or SeekEnd() attempt. var restarted bool kmsgs, restarted = k.retryCreateParser() if !restarted { @@ -143,18 +144,29 @@ func (k *kernelLogWatcher) watchLoop() { // retryCreateParser attempts to create a new kmsg parser. // It tries immediately first, then waits retryDelay between subsequent failures. +// On success, it seeks the new parser to the end of the kmsg ring buffer to +// avoid replaying messages that were already processed before the restart. +// Any messages written to kmsg between the old parser closing and the new +// parser being seeked are not delivered; this is preferable to replaying an +// entire ring buffer the watcher has already processed, especially when the +// restart was triggered by a kmsg flood. // It returns the new message channel and true on success, or nil and false if stopping was signaled. func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) { for { parser, err := kmsgparser.NewParser() - if err == nil { + if err != nil { + klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err) + } else if seekErr := parser.SeekEnd(); seekErr != nil { + klog.Errorf("Failed to seek new kmsg parser to end, retrying in %v: %v", retryDelay, seekErr) + if closeErr := parser.Close(); closeErr != nil { + klog.Errorf("Failed to close kmsg parser after seek failure: %v", closeErr) + } + } else { k.kmsgParser = parser klog.Infof("Successfully restarted kmsg parser") return parser.Parse(), true } - klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err) - select { case <-k.tomb.Stopping(): klog.Infof("Stop watching kernel log during restart attempt")