fix(logwatchers/kmsg): prevent duplicate message replay after restart

This commit is contained in:
Ciprian Hacman
2026-04-10 09:01:55 +03:00
parent 6411bde750
commit b3379b0d23

View File

@@ -113,7 +113,8 @@ func (k *kernelLogWatcher) watchLoop() {
klog.Errorf("Failed to close kmsg parser: %v", err)
}
// Try to restart
// Try to restart immediately. retryCreateParser() applies backoff only
// after a failed NewParser() or SeekEnd() attempt.
var restarted bool
kmsgs, restarted = k.retryCreateParser()
if !restarted {
@@ -143,18 +144,29 @@ func (k *kernelLogWatcher) watchLoop() {
// retryCreateParser attempts to create a new kmsg parser.
// It tries immediately first, then waits retryDelay between subsequent failures.
// On success, it seeks the new parser to the end of the kmsg ring buffer to
// avoid replaying messages that were already processed before the restart.
// Any messages written to kmsg between the old parser closing and the new
// parser being seeked are not delivered; this is preferable to replaying an
// entire ring buffer the watcher has already processed, especially when the
// restart was triggered by a kmsg flood.
// It returns the new message channel and true on success, or nil and false if stopping was signaled.
func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) {
for {
parser, err := kmsgparser.NewParser()
if err == nil {
if err != nil {
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
} else if seekErr := parser.SeekEnd(); seekErr != nil {
klog.Errorf("Failed to seek new kmsg parser to end, retrying in %v: %v", retryDelay, seekErr)
if closeErr := parser.Close(); closeErr != nil {
klog.Errorf("Failed to close kmsg parser after seek failure: %v", closeErr)
}
} else {
k.kmsgParser = parser
klog.Infof("Successfully restarted kmsg parser")
return parser.Parse(), true
}
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
select {
case <-k.tomb.Stopping():
klog.Infof("Stop watching kernel log during restart attempt")