diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go index 04d03940..db34efa6 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go @@ -110,7 +110,8 @@ func (k *kernelLogWatcher) watchLoop() { klog.Errorf("Failed to close kmsg parser: %v", err) } - // Try to restart + // Try to restart immediately. retryCreateParser() applies backoff only + // after a failed NewParser() or SeekEnd() attempt. var restarted bool kmsgs, restarted = k.retryCreateParser() if !restarted { @@ -140,18 +141,29 @@ func (k *kernelLogWatcher) watchLoop() { // retryCreateParser attempts to create a new kmsg parser. // It tries immediately first, then waits retryDelay between subsequent failures. +// On success, it seeks the new parser to the end of the kmsg ring buffer to +// avoid replaying messages that were already processed before the restart. +// Any messages written to kmsg between the old parser closing and the new +// parser being seeked are not delivered; this is preferable to replaying an +// entire ring buffer the watcher has already processed, especially when the +// restart was triggered by a kmsg flood. // It returns the new message channel and true on success, or nil and false if stopping was signaled. func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) { for { parser, err := kmsgparser.NewParser() - if err == nil { + if err != nil { + klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err) + } else if seekErr := parser.SeekEnd(); seekErr != nil { + klog.Errorf("Failed to seek new kmsg parser to end, retrying in %v: %v", retryDelay, seekErr) + if closeErr := parser.Close(); closeErr != nil { + klog.Errorf("Failed to close kmsg parser after seek failure: %v", closeErr) + } + } else { k.kmsgParser = parser klog.Infof("Successfully restarted kmsg parser") return parser.Parse(), true } - klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err) - select { case <-k.tomb.Stopping(): klog.Infof("Stop watching kernel log during restart attempt")