mirror of
https://github.com/prymitive/karma
synced 2026-05-05 03:16:51 +00:00
fix(backend): fix a deadlock in ClusterMemberNames()
ClusterMemberNames can aquire read lock twice, which could block write locks. Fixes #2888
This commit is contained in:
committed by
Łukasz Mierzwa
parent
c6b091f7b0
commit
c64b900c43
@@ -1,6 +1,10 @@
|
||||
# Changelog
|
||||
|
||||
## [unreleased]
|
||||
## v0.81
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed a deadlock issue that could cause karma to hang #2888.
|
||||
|
||||
### Added
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prymitive/karma/internal/config"
|
||||
|
||||
@@ -122,36 +121,3 @@ func TestGetViewURL(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPullFromAlertmanager(t *testing.T) {
|
||||
zerolog.SetGlobalLevel(zerolog.FatalLevel)
|
||||
mockConfig()
|
||||
mockCache()
|
||||
lastPull = time.Time{}
|
||||
|
||||
start := time.Now()
|
||||
pullFromAlertmanager()
|
||||
dur := time.Since(start)
|
||||
if dur > time.Second {
|
||||
t.Errorf("First pullFromAlertmanager took %s, expected <= 1s", dur)
|
||||
return
|
||||
}
|
||||
|
||||
start = time.Now()
|
||||
pullFromAlertmanager()
|
||||
dur = time.Since(start)
|
||||
if dur < time.Second*5 {
|
||||
t.Errorf("Second pullFromAlertmanager took %s, expected >= 5s", dur)
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 6)
|
||||
|
||||
start = time.Now()
|
||||
pullFromAlertmanager()
|
||||
dur = time.Since(start)
|
||||
if dur > time.Second {
|
||||
t.Errorf("Third pullFromAlertmanager took %s, expected <= 1s", dur)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,30 +3,16 @@ package main
|
||||
import (
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prymitive/karma/internal/alertmanager"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
var (
|
||||
lastPull time.Time
|
||||
)
|
||||
|
||||
func pullFromAlertmanager() {
|
||||
// always flush cache once we're done
|
||||
defer apiCache.Flush()
|
||||
|
||||
// Ensure that we're not putting write locks in a tight loop
|
||||
// We need at least 5s since last pull
|
||||
nextPull := lastPull.Add(time.Second * 5)
|
||||
waitNeeded := time.Until(nextPull)
|
||||
if waitNeeded > 0 {
|
||||
log.Warn().Dur("wait", waitNeeded).Msg("Less than 5s since the last pull, will wait before next cycle to process client requests, try increasing alertmanager.interval option if you see this warning too often")
|
||||
time.Sleep(waitNeeded)
|
||||
}
|
||||
|
||||
log.Info().Msg("Pulling latest alerts and silences from Alertmanager")
|
||||
|
||||
upstreams := alertmanager.GetAlertmanagers()
|
||||
@@ -48,8 +34,6 @@ func pullFromAlertmanager() {
|
||||
|
||||
log.Info().Msg("Collection completed")
|
||||
runtime.GC()
|
||||
|
||||
lastPull = time.Now()
|
||||
}
|
||||
|
||||
// Tick is the background timer used to call PullFromAlertmanager
|
||||
|
||||
@@ -147,7 +147,6 @@ func mockAlerts(version string) {
|
||||
mock.RegisterURL("http://localhost/api/v2/silences", version, "api/v2/silences")
|
||||
mock.RegisterURL("http://localhost/api/v2/alerts/groups", version, "api/v2/alerts/groups")
|
||||
|
||||
lastPull = time.Time{}
|
||||
pullFromAlertmanager()
|
||||
}
|
||||
|
||||
@@ -2309,7 +2308,6 @@ func TestUpstreamStatus(t *testing.T) {
|
||||
for _, m := range testCase.mocks {
|
||||
httpmock.RegisterResponder("GET", m.uri, httpmock.NewStringResponder(m.code, m.body))
|
||||
}
|
||||
lastPull = time.Time{}
|
||||
pullFromAlertmanager()
|
||||
|
||||
req := httptest.NewRequest("GET", "/alerts.json?q=@receiver=by-cluster-service&q=alertname=HTTP_Probe_Failed&q=instance=web1", nil)
|
||||
@@ -2552,7 +2550,6 @@ func TestAlertFilters(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
lastPull = time.Time{}
|
||||
pullFromAlertmanager()
|
||||
|
||||
r := testRouter()
|
||||
|
||||
@@ -546,19 +546,27 @@ func (am *Alertmanager) ClusterPeers() []string {
|
||||
// that are in the same cluster as this instance (including self).
|
||||
// Names are the same as in karma configuration.
|
||||
func (am *Alertmanager) ClusterMemberNames() []string {
|
||||
// copy status so we don't need to hold RLock until return
|
||||
status := models.AlertmanagerStatus{}
|
||||
am.lock.RLock()
|
||||
defer am.lock.RUnlock()
|
||||
status.ID = am.status.ID
|
||||
copy(am.status.PeerIDs, status.PeerIDs)
|
||||
am.lock.RUnlock()
|
||||
|
||||
members := []string{am.Name}
|
||||
|
||||
upstreams := GetAlertmanagers()
|
||||
for _, upstream := range upstreams {
|
||||
// skip self, it's already part of members slice
|
||||
if upstream.Name == am.Name {
|
||||
continue
|
||||
}
|
||||
for _, peerID := range upstream.ClusterPeers() {
|
||||
// IF
|
||||
// other alertmanagers peerID is in this alertmanager cluster status
|
||||
// OR
|
||||
// this alertmanager peerID is in other alertmanagers cluster status
|
||||
if slices.StringInSlice(am.status.PeerIDs, peerID) || peerID == am.status.ID {
|
||||
if slices.StringInSlice(status.PeerIDs, peerID) || peerID == status.ID {
|
||||
if !slices.StringInSlice(members, upstream.Name) {
|
||||
members = append(members, upstream.Name)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user