Files
karma/internal/alertmanager/models.go
2026-03-11 13:10:15 +00:00

561 lines
14 KiB
Go

package alertmanager
import (
"fmt"
"maps"
"net/http"
"net/url"
"slices"
"sort"
"sync"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/prymitive/karma/internal/config"
"github.com/prymitive/karma/internal/filters"
"github.com/prymitive/karma/internal/mapper"
"github.com/prymitive/karma/internal/models"
"github.com/prymitive/karma/internal/transform"
"github.com/prymitive/karma/internal/uri"
"github.com/prymitive/karma/internal/verprobe"
"github.com/rs/zerolog/log"
)
const (
labelValueErrorsAlerts = "alerts"
labelValueErrorsSilences = "silences"
)
type alertmanagerMetrics struct {
Errors map[string]float64
Cycles float64
}
type HealthCheck struct {
filters []filters.Filter
wasFound bool
}
// Alertmanager represents Alertmanager upstream instance
// nolint: maligned
type Alertmanager struct {
// reader instances are specific to URI scheme we collect from
reader uri.Reader
// implements how we fetch requests from the Alertmanager, we don't set it
// by default so it's nil and http.DefaultTransport is used
HTTPTransport http.RoundTripper `json:"-"`
// metrics tracked per alertmanager instance
Metrics alertmanagerMetrics
silences map[string]models.Silence
colors models.LabelsColorMap
// headers to send with each AlertManager request
HTTPHeaders map[string]string
healthchecks map[string]HealthCheck
URI string `json:"uri"`
ExternalURI string `json:"-"`
Cluster string `json:"cluster"`
Name string `json:"name"`
lastError string
lastVersionProbe string
// CORS credentials
CORSCredentials string `json:"corsCredentials"`
// fields for storing pulled data
alertGroups []models.AlertGroup
autocomplete []models.Autocomplete
autocompleteMap map[string]models.Autocomplete
knownLabels []string
RequestTimeout time.Duration `json:"timeout"`
// lock protects data access while updating
lock sync.RWMutex
// whenever this instance should be proxied
ProxyRequests bool `json:"proxyRequests"`
ReadOnly bool `json:"readonly"`
healthchecksVisible bool
}
func (am *Alertmanager) probeVersion() string {
url, err := uri.JoinURL(am.URI, "metrics")
if err != nil {
log.Error().
Err(err).
Str("uri", am.SanitizedURI()).
Msg("Failed to join url with /metrics path")
return ""
}
source, err := am.reader.Read(url, am.HTTPHeaders)
if err != nil {
log.Error().
Err(err).
Str("alertmanager", am.Name).
Str("uri", am.SanitizedURI()).
Msg("Request failed")
return ""
}
defer source.Close()
version, err := verprobe.Detect(source)
if err != nil {
log.Error().Err(err).Str("alertmanager", am.Name).Msg("Error while discovering version")
return ""
}
log.Info().
Str("version", version).
Str("alertmanager", am.Name).
Msg("Upstream version")
return version
}
func (am *Alertmanager) clearData() {
am.lock.Lock()
am.alertGroups = []models.AlertGroup{}
am.silences = map[string]models.Silence{}
am.colors = models.LabelsColorMap{}
am.autocomplete = []models.Autocomplete{}
am.knownLabels = []string{}
am.lock.Unlock()
}
func (am *Alertmanager) pullSilences(version string) error {
mapper, err := mapper.GetSilenceMapper(version)
if err != nil {
return err
}
var silences []models.Silence
start := time.Now()
silences, err = mapper.Collect(am.URI, am.HTTPHeaders, am.RequestTimeout, am.HTTPTransport)
if err != nil {
return err
}
log.Info().
Str("alertmanager", am.Name).
Int("silences", len(silences)).
Dur("duration", time.Since(start)).
Msg("Got silences")
log.Info().
Str("alertmanager", am.Name).
Int("silences", len(silences)).
Msg("Detecting ticket links in silences")
silenceMap := make(map[string]models.Silence, len(silences))
for _, silence := range silences {
silence.TicketID, silence.TicketURL = transform.DetectLinks(&silence)
silenceMap[silence.ID] = silence
}
am.lock.Lock()
am.silences = silenceMap
am.lock.Unlock()
return nil
}
// InternalURI is the URI of this Alertmanager that will be used for all request made by the UI
func (am *Alertmanager) InternalURI() string {
if am.ProxyRequests {
return "./proxy/alertmanager/" + url.PathEscape(am.Name)
}
// strip all user/pass information, fetch() doesn't support it anyway
return uri.WithoutUserinfo(am.PublicURI())
}
// PublicURI is the URI of this Alertmanager that will be used for browser links
func (am *Alertmanager) PublicURI() string {
// external_uri is always the first setting to check for browser links
if am.ExternalURI != "" {
return am.ExternalURI
}
return am.URI
}
func (am *Alertmanager) pullAlerts(version string) error {
mapper, err := mapper.GetAlertMapper(version)
if err != nil {
return err
}
healthchecks := map[string]HealthCheck{}
am.lock.RLock()
for name, hc := range am.healthchecks {
healthchecks[name] = HealthCheck{
filters: hc.filters,
wasFound: false,
}
}
am.lock.RUnlock()
var groups []models.AlertGroup
start := time.Now()
groups, err = mapper.Collect(am.URI, am.HTTPHeaders, am.RequestTimeout, am.HTTPTransport)
if err != nil {
return err
}
log.Info().
Str("alertmanager", am.Name).
Int("groups", len(groups)).
Dur("duration", time.Since((start))).
Msg("Collected alert groups")
log.Info().
Str("alertmanager", am.Name).
Int("groups", len(groups)).
Msg("Deduplicating alert groups")
uniqueGroups := map[string]models.AlertGroup{}
uniqueAlerts := map[string]map[string]models.Alert{}
knownLabelsMap := map[string]struct{}{}
for _, ag := range groups {
agID := ag.LabelsFingerprint()
if _, found := uniqueGroups[agID]; !found {
uniqueGroups[agID] = models.AlertGroup{
Receiver: ag.Receiver,
Labels: ag.Labels,
ID: agID,
}
}
for _, alert := range ag.Alerts {
if _, found := uniqueAlerts[agID]; !found {
uniqueAlerts[agID] = map[string]models.Alert{}
}
alertCFP := alert.ContentFingerprint()
if _, found := uniqueAlerts[agID][alertCFP]; !found {
uniqueAlerts[agID][alertCFP] = alert
}
if name, hc := am.IsHealthCheckAlert(&alert); hc != nil {
healthchecks[name] = HealthCheck{
filters: hc.filters,
wasFound: true,
}
}
}
}
dedupedGroups := make([]models.AlertGroup, 0, len(uniqueGroups))
colors := models.LabelsColorMap{}
if am.autocompleteMap == nil {
am.autocompleteMap = map[string]models.Autocomplete{}
} else {
clear(am.autocompleteMap)
}
expiredSilences := am.ExpiredSilences()
log.Info().
Str("alertmanager", am.Name).
Int("groups", len(uniqueGroups)).
Msg("Processing deduplicated alert groups")
for _, ag := range uniqueGroups {
alerts := make(models.AlertList, 0, len(uniqueAlerts[ag.ID]))
labelPairs := make([][]labels.Label, 0, len(uniqueAlerts[ag.ID]))
for _, alert := range uniqueAlerts[ag.ID] {
silences := map[string]*models.Silence{}
for _, silenceID := range alert.SilencedBy {
silence, err := am.SilenceByID(silenceID)
if err == nil {
silences[silenceID] = &silence
}
}
if config.Config.Silences.Expired > 0 &&
alert.State == models.AlertStateActive {
alertLabels := alert.Labels.Map()
for _, silence := range expiredSilences {
if silence.IsMatch(alertLabels) {
silences[silence.ID] = silence
alert.SilencedBy = append(alert.SilencedBy, silence.ID)
}
}
}
sort.Slice(alert.SilencedBy, func(i, j int) bool {
si := silences[alert.SilencedBy[i]]
sj := silences[alert.SilencedBy[j]]
if si != nil && sj != nil {
if alert.State == models.AlertStateSuppressed {
return si.EndsAt.Unix() < sj.EndsAt.Unix()
}
return si.EndsAt.Unix() > sj.EndsAt.Unix()
}
return alert.SilencedBy[i] < alert.SilencedBy[j]
})
alert.Alertmanager = []models.AlertmanagerInstance{
{
Fingerprint: alert.Fingerprint,
Name: am.Name,
Cluster: am.Cluster,
State: alert.State,
StartsAt: alert.StartsAt,
Source: alert.GeneratorURL,
Silences: silences,
SilencedBy: alert.SilencedBy,
InhibitedBy: alert.InhibitedBy,
},
}
transform.ColorLabel(colors, "@receiver", alert.Receiver)
for _, am := range alert.Alertmanager {
transform.ColorLabel(colors, "@alertmanager", am.Name)
transform.ColorLabel(colors, "@cluster", am.Cluster)
}
var pairs []labels.Label
alert.Labels.Range(func(l labels.Label) {
transform.ColorLabel(colors, l.Name, l.Value)
knownLabelsMap[l.Name] = struct{}{}
pairs = append(pairs, l)
})
labelPairs = append(labelPairs, pairs)
alert.UpdateFingerprints()
alerts = append(alerts, alert)
}
filters.BuildAutocomplete(alerts, am.autocompleteMap)
filters.LabelAutocomplete(labelPairs, am.autocompleteMap)
slices.SortFunc(alerts, models.CompareAlerts)
ag.Alerts = alerts
// Hash is a checksum of all alerts, used to tell when any alert in the group changed
ag.Hash = ag.ContentFingerprint()
dedupedGroups = append(dedupedGroups, ag)
}
log.Info().
Str("alertmanager", am.Name).
Int("hints", len(am.autocompleteMap)).
Msg("Merging autocomplete hints")
autocomplete := make([]models.Autocomplete, 0, len(am.autocompleteMap))
for _, hint := range am.autocompleteMap {
autocomplete = append(autocomplete, hint)
}
knownLabels := make([]string, 0, len(knownLabelsMap))
for key := range knownLabelsMap {
knownLabels = append(knownLabels, key)
}
am.lock.Lock()
am.alertGroups = dedupedGroups
am.colors = colors
am.autocomplete = autocomplete
am.knownLabels = knownLabels
am.healthchecks = healthchecks
am.lock.Unlock()
return nil
}
// Pull data from upstream Alertmanager instance
func (am *Alertmanager) Pull() error {
am.Metrics.Cycles++
version := am.probeVersion()
am.lock.Lock()
am.lastVersionProbe = version
am.lock.Unlock()
log.Debug().Str("alertmanager", am.Name).Str("version", version).Msg("Probed alertmanager version")
// verify that URI is correct
_, err := url.Parse(am.URI)
if err != nil {
return err
}
err = am.pullSilences(version)
if err != nil {
am.clearData()
am.setError(err.Error())
am.Metrics.Errors[labelValueErrorsSilences]++
return err
}
err = am.pullAlerts(version)
if err != nil {
am.clearData()
am.setError(err.Error())
am.Metrics.Errors[labelValueErrorsAlerts]++
return err
}
am.lock.Lock()
am.lastError = ""
am.lock.Unlock()
for name, hc := range am.healthchecks {
if !hc.wasFound {
am.setError(fmt.Sprintf("Healthcheck filter %q didn't match any alerts", name))
}
}
return nil
}
// Alerts returns a copy of all alert groups
func (am *Alertmanager) Alerts() []models.AlertGroup {
am.lock.RLock()
defer am.lock.RUnlock()
alerts := make([]models.AlertGroup, len(am.alertGroups))
copy(alerts, am.alertGroups)
return alerts
}
// Silences returns a copy of all silences
func (am *Alertmanager) Silences() map[string]models.Silence {
am.lock.RLock()
defer am.lock.RUnlock()
silences := make(map[string]models.Silence, len(am.silences))
maps.Copy(silences, am.silences)
return silences
}
// SilenceByID allows to query for a silence by it's ID, returns error if not found
func (am *Alertmanager) SilenceByID(id string) (models.Silence, error) {
am.lock.RLock()
defer am.lock.RUnlock()
s, found := am.silences[id]
if !found {
return models.Silence{}, fmt.Errorf("silence '%s' not found", id)
}
return s, nil
}
func (am *Alertmanager) ExpiredSilences() (silences []*models.Silence) {
am.lock.RLock()
defer am.lock.RUnlock()
now := time.Now()
maxExpired := now.Add(-config.Config.Silences.Expired)
for _, silence := range am.silences {
if silence.EndsAt.Before(now) && !silence.EndsAt.Before(maxExpired) {
silence := silence
silences = append(silences, &silence)
}
}
return silences
}
// Colors returns a copy of all color maps
func (am *Alertmanager) Colors() models.LabelsColorMap {
am.lock.RLock()
defer am.lock.RUnlock()
colors := make(models.LabelsColorMap, len(am.colors))
for k, v := range am.colors {
colors[k] = make(map[string]models.LabelColors, len(v))
maps.Copy(colors[k], v)
}
return colors
}
// ForEachAutocomplete calls fn for each autocomplete hint while holding a read lock.
func (am *Alertmanager) ForEachAutocomplete(fn func(models.Autocomplete)) {
am.lock.RLock()
defer am.lock.RUnlock()
for _, hint := range am.autocomplete {
fn(hint)
}
}
// KnownLabels returns a copy of a map with known labels
func (am *Alertmanager) KnownLabels() []string {
am.lock.RLock()
defer am.lock.RUnlock()
knownLabels := make([]string, len(am.knownLabels))
copy(knownLabels, am.knownLabels)
return knownLabels
}
func (am *Alertmanager) setError(err string) {
am.lock.Lock()
defer am.lock.Unlock()
am.lastError = err
}
func (am *Alertmanager) getLastError() string {
am.lock.RLock()
defer am.lock.RUnlock()
return am.lastError
}
func (am *Alertmanager) Error() string {
return am.getLastError()
}
// SanitizedURI returns a copy of Alertmanager.URI with password replaced by
// "xxx"
func (am *Alertmanager) SanitizedURI() string {
return uri.SanitizeURI(am.URI)
}
// Version returns last known version of this Alertmanager instance
func (am *Alertmanager) Version() string {
am.lock.RLock()
defer am.lock.RUnlock()
return am.lastVersionProbe
}
// ClusterMemberNames returns a list of names of all Alertmanager instances
// that are in the same cluster as this instance (including self).
// Names are the same as in karma configuration.
func (am *Alertmanager) ClusterMemberNames() []string {
members := []string{am.Name}
upstreams := GetAlertmanagers()
for _, upstream := range upstreams {
// skip self, it's already part of members slice
if upstream.Name == am.Name {
continue
}
if upstream.Cluster != "" && upstream.Cluster == am.Cluster {
members = append(members, upstream.Name)
}
}
sort.Strings(members)
return members
}
func (am *Alertmanager) IsHealthy() bool {
lastError := am.getLastError()
return lastError == ""
}
func (am *Alertmanager) IsHealthCheckAlert(alert *models.Alert) (string, *HealthCheck) {
for name, hc := range am.healthchecks {
positiveMatch := false
negativeMatch := false
for _, hcFilter := range hc.filters {
if hcFilter.Match(alert, 0) {
log.Debug().
Str("alertmanager", am.Name).
Str("healthcheck", name).
Msg("Healthcheck alert matched")
positiveMatch = true
} else {
negativeMatch = true
}
}
if positiveMatch && !negativeMatch {
return name, &hc
}
}
return "", nil
}