Merge pull request #1259 from hakman/kmsg-duplicate-message

fix(logwatchers/kmsg): prevent duplicate message replay after restart
This commit is contained in:
Kubernetes Prow Robot
2026-04-11 00:18:18 +05:30
committed by GitHub

View File

@@ -110,7 +110,8 @@ func (k *kernelLogWatcher) watchLoop() {
klog.Errorf("Failed to close kmsg parser: %v", err)
}
// Try to restart
// Try to restart immediately. retryCreateParser() applies backoff only
// after a failed NewParser() or SeekEnd() attempt.
var restarted bool
kmsgs, restarted = k.retryCreateParser()
if !restarted {
@@ -140,18 +141,29 @@ func (k *kernelLogWatcher) watchLoop() {
// retryCreateParser attempts to create a new kmsg parser.
// It tries immediately first, then waits retryDelay between subsequent failures.
// On success, it seeks the new parser to the end of the kmsg ring buffer to
// avoid replaying messages that were already processed before the restart.
// Any messages written to kmsg between the old parser closing and the new
// parser being seeked are not delivered; this is preferable to replaying an
// entire ring buffer the watcher has already processed, especially when the
// restart was triggered by a kmsg flood.
// It returns the new message channel and true on success, or nil and false if stopping was signaled.
func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) {
for {
parser, err := kmsgparser.NewParser()
if err == nil {
if err != nil {
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
} else if seekErr := parser.SeekEnd(); seekErr != nil {
klog.Errorf("Failed to seek new kmsg parser to end, retrying in %v: %v", retryDelay, seekErr)
if closeErr := parser.Close(); closeErr != nil {
klog.Errorf("Failed to close kmsg parser after seek failure: %v", closeErr)
}
} else {
k.kmsgParser = parser
klog.Infof("Successfully restarted kmsg parser")
return parser.Parse(), true
}
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
select {
case <-k.tomb.Stopping():
klog.Infof("Stop watching kernel log during restart attempt")