mirror of
https://github.com/prymitive/karma
synced 2026-05-09 03:36:44 +00:00
Store and expose via API the version of each Alertmanager instance and the list of instances in the same cluster
505 lines
13 KiB
Go
505 lines
13 KiB
Go
package alertmanager
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"path"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prymitive/karma/internal/config"
|
|
"github.com/prymitive/karma/internal/filters"
|
|
"github.com/prymitive/karma/internal/mapper"
|
|
"github.com/prymitive/karma/internal/models"
|
|
"github.com/prymitive/karma/internal/slices"
|
|
"github.com/prymitive/karma/internal/transform"
|
|
"github.com/prymitive/karma/internal/uri"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
labelValueErrorsAlerts = "alerts"
|
|
labelValueErrorsSilences = "silences"
|
|
)
|
|
|
|
type alertmanagerMetrics struct {
|
|
cycles float64
|
|
errors map[string]float64
|
|
}
|
|
|
|
type alertmanagerStatus struct {
|
|
version string
|
|
amID string
|
|
peerIDs []string
|
|
}
|
|
|
|
// Alertmanager represents Alertmanager upstream instance
|
|
type Alertmanager struct {
|
|
URI string `json:"uri"`
|
|
RequestTimeout time.Duration `json:"timeout"`
|
|
Name string `json:"name"`
|
|
// whenever this instance should be proxied
|
|
ProxyRequests bool `json:"proxyRequests"`
|
|
// reader instances are specific to URI scheme we collect from
|
|
reader uri.Reader
|
|
// implements how we fetch requests from the Alertmanager, we don't set it
|
|
// by default so it's nil and http.DefaultTransport is used
|
|
HTTPTransport http.RoundTripper `json:"-"`
|
|
// lock protects data access while updating
|
|
lock sync.RWMutex
|
|
// fields for storing pulled data
|
|
alertGroups []models.AlertGroup
|
|
silences map[string]models.Silence
|
|
colors models.LabelsColorMap
|
|
autocomplete []models.Autocomplete
|
|
knownLabels []string
|
|
lastError string
|
|
status alertmanagerStatus
|
|
// metrics tracked per alertmanager instance
|
|
metrics alertmanagerMetrics
|
|
}
|
|
|
|
func (am *Alertmanager) fetchStatus() alertmanagerStatus {
|
|
status := alertmanagerStatus{
|
|
// if everything fails assume Alertmanager is at latest possible version
|
|
version: "999.0.0",
|
|
amID: "",
|
|
peerIDs: []string{},
|
|
}
|
|
|
|
url, err := uri.JoinURL(am.URI, "api/v1/status")
|
|
if err != nil {
|
|
log.Errorf("Failed to join url '%s' and path 'api/v1/status': %s", am.SanitizedURI(), err)
|
|
return status
|
|
}
|
|
|
|
resp := alertmanagerStatusResponse{}
|
|
|
|
// read raw body from the source
|
|
source, err := am.reader.Read(url)
|
|
if err != nil {
|
|
log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err)
|
|
return status
|
|
}
|
|
defer source.Close()
|
|
|
|
// decode body as JSON
|
|
err = json.NewDecoder(source).Decode(&resp)
|
|
if err != nil {
|
|
log.Errorf("[%s] %s failed to decode as JSON: %s", am.Name, uri.SanitizeURI(url), err)
|
|
return status
|
|
}
|
|
|
|
if resp.Status != "success" {
|
|
log.Errorf("[%s] Request to %s returned status %s", am.Name, uri.SanitizeURI(url), resp.Status)
|
|
return status
|
|
}
|
|
|
|
if resp.Data.VersionInfo.Version == "" {
|
|
log.Errorf("[%s] No version information in Alertmanager API at %s", am.Name, uri.SanitizeURI(url))
|
|
return status
|
|
}
|
|
|
|
status.version = resp.Data.VersionInfo.Version
|
|
log.Infof("[%s] Remote Alertmanager version: %s", am.Name, status.version)
|
|
|
|
if resp.Data.ClusterStatus.Name != "" {
|
|
status.amID = resp.Data.ClusterStatus.Name
|
|
for _, peer := range resp.Data.ClusterStatus.Peers {
|
|
status.peerIDs = append(status.peerIDs, peer.Name)
|
|
}
|
|
} else if resp.Data.MeshStatus.Name != "" {
|
|
status.amID = resp.Data.MeshStatus.Name
|
|
for _, peer := range resp.Data.MeshStatus.Peers {
|
|
status.peerIDs = append(status.peerIDs, peer.Name)
|
|
}
|
|
}
|
|
|
|
return status
|
|
}
|
|
|
|
func (am *Alertmanager) clearData() {
|
|
am.lock.Lock()
|
|
am.alertGroups = []models.AlertGroup{}
|
|
am.silences = map[string]models.Silence{}
|
|
am.colors = models.LabelsColorMap{}
|
|
am.autocomplete = []models.Autocomplete{}
|
|
am.knownLabels = []string{}
|
|
am.status = alertmanagerStatus{
|
|
version: "",
|
|
amID: "",
|
|
peerIDs: []string{},
|
|
}
|
|
am.lock.Unlock()
|
|
}
|
|
|
|
func (am *Alertmanager) pullSilences(version string) error {
|
|
mapper, err := mapper.GetSilenceMapper(version)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// generate full URL to collect silences from
|
|
url, err := mapper.AbsoluteURL(am.URI)
|
|
if err != nil {
|
|
log.Errorf("[%s] Failed to generate silences endpoint URL: %s", am.Name, err)
|
|
return err
|
|
}
|
|
// append query args if mapper needs those
|
|
queryArgs := mapper.QueryArgs()
|
|
if queryArgs != "" {
|
|
url = fmt.Sprintf("%s?%s", url, queryArgs)
|
|
}
|
|
|
|
start := time.Now()
|
|
// read raw body from the source
|
|
source, err := am.reader.Read(url)
|
|
if err != nil {
|
|
log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err)
|
|
return err
|
|
}
|
|
defer source.Close()
|
|
|
|
// decode body text
|
|
silences, err := mapper.Decode(source)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Infof("[%s] Got %d silences(s) in %s", am.Name, len(silences), time.Since(start))
|
|
|
|
log.Infof("[%s] Detecting JIRA links in silences (%d)", am.Name, len(silences))
|
|
silenceMap := map[string]models.Silence{}
|
|
for _, silence := range silences {
|
|
silence.JiraID, silence.JiraURL = transform.DetectJIRAs(&silence)
|
|
silenceMap[silence.ID] = silence
|
|
}
|
|
|
|
am.lock.Lock()
|
|
am.silences = silenceMap
|
|
am.lock.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// PublicURI is the URI of this Alertmanager we put in JSON reponse
|
|
// it's either real full URI or a proxy relative URI
|
|
func (am *Alertmanager) PublicURI() string {
|
|
if am.ProxyRequests {
|
|
sub := fmt.Sprintf("/proxy/alertmanager/%s", am.Name)
|
|
uri := path.Join(config.Config.Listen.Prefix, sub)
|
|
if strings.HasSuffix(sub, "/") {
|
|
// if sub path had trailing slash then add it here, since path.Join will
|
|
// skip it
|
|
return uri + "/"
|
|
}
|
|
return uri
|
|
}
|
|
return am.URI
|
|
}
|
|
|
|
func (am *Alertmanager) pullAlerts(version string) error {
|
|
mapper, err := mapper.GetAlertMapper(version)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// generate full URL to collect alerts from
|
|
url, err := mapper.AbsoluteURL(am.URI)
|
|
if err != nil {
|
|
log.Errorf("[%s] Failed to generate alerts endpoint URL: %s", am.Name, err)
|
|
return err
|
|
}
|
|
|
|
// append query args if mapper needs those
|
|
queryArgs := mapper.QueryArgs()
|
|
if queryArgs != "" {
|
|
url = fmt.Sprintf("%s?%s", url, queryArgs)
|
|
}
|
|
|
|
start := time.Now()
|
|
// read raw body from the source
|
|
source, err := am.reader.Read(url)
|
|
if err != nil {
|
|
log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err)
|
|
return err
|
|
}
|
|
defer source.Close()
|
|
|
|
// decode body text
|
|
groups, err := mapper.Decode(source)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Infof("[%s] Got %d alert group(s) in %s", am.Name, len(groups), time.Since(start))
|
|
|
|
log.Infof("[%s] Deduplicating alert groups (%d)", am.Name, len(groups))
|
|
uniqueGroups := map[string]models.AlertGroup{}
|
|
uniqueAlerts := map[string]map[string]models.Alert{}
|
|
knownLabelsMap := map[string]bool{}
|
|
for _, ag := range groups {
|
|
agID := ag.LabelsFingerprint()
|
|
if _, found := uniqueGroups[agID]; !found {
|
|
uniqueGroups[agID] = models.AlertGroup{
|
|
Receiver: ag.Receiver,
|
|
Labels: ag.Labels,
|
|
ID: agID,
|
|
}
|
|
}
|
|
for _, alert := range ag.Alerts {
|
|
if _, found := uniqueAlerts[agID]; !found {
|
|
uniqueAlerts[agID] = map[string]models.Alert{}
|
|
}
|
|
alertCFP := alert.ContentFingerprint()
|
|
if _, found := uniqueAlerts[agID][alertCFP]; !found {
|
|
uniqueAlerts[agID][alertCFP] = alert
|
|
}
|
|
for key := range alert.Labels {
|
|
knownLabelsMap[key] = true
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
dedupedGroups := []models.AlertGroup{}
|
|
colors := models.LabelsColorMap{}
|
|
autocompleteMap := map[string]models.Autocomplete{}
|
|
|
|
log.Infof("[%s] Processing unique alert groups (%d)", am.Name, len(uniqueGroups))
|
|
for _, ag := range uniqueGroups {
|
|
alerts := models.AlertList{}
|
|
for _, alert := range uniqueAlerts[ag.ID] {
|
|
|
|
silences := map[string]*models.Silence{}
|
|
for _, silenceID := range alert.SilencedBy {
|
|
silence, err := am.SilenceByID(silenceID)
|
|
if err == nil {
|
|
silences[silenceID] = &silence
|
|
}
|
|
}
|
|
|
|
alert.Alertmanager = []models.AlertmanagerInstance{
|
|
models.AlertmanagerInstance{
|
|
Name: am.Name,
|
|
State: alert.State,
|
|
StartsAt: alert.StartsAt,
|
|
EndsAt: alert.EndsAt,
|
|
Source: alert.GeneratorURL,
|
|
Silences: silences,
|
|
SilencedBy: alert.SilencedBy,
|
|
},
|
|
}
|
|
|
|
transform.ColorLabel(colors, "@receiver", alert.Receiver)
|
|
for k, v := range alert.Labels {
|
|
transform.ColorLabel(colors, k, v)
|
|
}
|
|
|
|
alert.UpdateFingerprints()
|
|
alerts = append(alerts, alert)
|
|
}
|
|
|
|
for _, hint := range filters.BuildAutocomplete(alerts) {
|
|
autocompleteMap[hint.Value] = hint
|
|
}
|
|
|
|
sort.Sort(&alerts)
|
|
ag.Alerts = alerts
|
|
|
|
// Hash is a checksum of all alerts, used to tell when any alert in the group changed
|
|
ag.Hash = ag.ContentFingerprint()
|
|
|
|
dedupedGroups = append(dedupedGroups, ag)
|
|
}
|
|
|
|
log.Infof("[%s] Merging autocomplete data (%d)", am.Name, len(autocompleteMap))
|
|
autocomplete := []models.Autocomplete{}
|
|
for _, hint := range autocompleteMap {
|
|
autocomplete = append(autocomplete, hint)
|
|
}
|
|
|
|
knownLabels := []string{}
|
|
for key := range knownLabelsMap {
|
|
knownLabels = append(knownLabels, key)
|
|
}
|
|
|
|
am.lock.Lock()
|
|
am.alertGroups = dedupedGroups
|
|
am.colors = colors
|
|
am.autocomplete = autocomplete
|
|
am.knownLabels = knownLabels
|
|
am.lock.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Pull data from upstream Alertmanager instance
|
|
func (am *Alertmanager) Pull() error {
|
|
am.metrics.cycles++
|
|
|
|
status := am.fetchStatus()
|
|
|
|
err := am.pullSilences(status.version)
|
|
if err != nil {
|
|
am.clearData()
|
|
am.setError(err.Error())
|
|
am.metrics.errors[labelValueErrorsSilences]++
|
|
return err
|
|
}
|
|
|
|
err = am.pullAlerts(status.version)
|
|
if err != nil {
|
|
am.clearData()
|
|
am.setError(err.Error())
|
|
am.metrics.errors[labelValueErrorsAlerts]++
|
|
return err
|
|
}
|
|
|
|
am.lock.Lock()
|
|
am.status = status
|
|
am.lastError = ""
|
|
am.lock.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Alerts returns a copy of all alert groups
|
|
func (am *Alertmanager) Alerts() []models.AlertGroup {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
alerts := make([]models.AlertGroup, len(am.alertGroups))
|
|
copy(alerts, am.alertGroups)
|
|
return alerts
|
|
}
|
|
|
|
// Silences returns a copy of all silences
|
|
func (am *Alertmanager) Silences() map[string]models.Silence {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
silences := map[string]models.Silence{}
|
|
for id, silence := range am.silences {
|
|
silences[id] = silence
|
|
}
|
|
return silences
|
|
}
|
|
|
|
// SilenceByID allows to query for a silence by it's ID, returns error if not found
|
|
func (am *Alertmanager) SilenceByID(id string) (models.Silence, error) {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
s, found := am.silences[id]
|
|
if !found {
|
|
return models.Silence{}, fmt.Errorf("silence '%s' not found", id)
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// Colors returns a copy of all color maps
|
|
func (am *Alertmanager) Colors() models.LabelsColorMap {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
colors := models.LabelsColorMap{}
|
|
for k, v := range am.colors {
|
|
colors[k] = map[string]models.LabelColors{}
|
|
for nk, nv := range v {
|
|
colors[k][nk] = nv
|
|
}
|
|
}
|
|
return colors
|
|
}
|
|
|
|
// Autocomplete returns a copy of all autocomplete data
|
|
func (am *Alertmanager) Autocomplete() []models.Autocomplete {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
autocomplete := make([]models.Autocomplete, len(am.autocomplete))
|
|
copy(autocomplete, am.autocomplete)
|
|
return autocomplete
|
|
}
|
|
|
|
// KnownLabels returns a copy of a map with known labels
|
|
func (am *Alertmanager) KnownLabels() []string {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
knownLabels := make([]string, len(am.knownLabels))
|
|
copy(knownLabels, am.knownLabels)
|
|
|
|
return knownLabels
|
|
}
|
|
|
|
func (am *Alertmanager) setError(err string) {
|
|
am.lock.Lock()
|
|
defer am.lock.Unlock()
|
|
|
|
am.lastError = err
|
|
}
|
|
|
|
func (am *Alertmanager) Error() string {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
return am.lastError
|
|
}
|
|
|
|
// SanitizedURI returns a copy of Alertmanager.URI with password replaced by
|
|
// "xxx"
|
|
func (am *Alertmanager) SanitizedURI() string {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
return uri.SanitizeURI(am.URI)
|
|
}
|
|
|
|
// Version returns last known version of this Alertmanager instance
|
|
func (am *Alertmanager) Version() string {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
return am.status.version
|
|
}
|
|
|
|
// ClusterPeers returns a list of IDs of all peers this instance
|
|
// is connected to.
|
|
// IDs are the same as in Alertmanager API.
|
|
func (am *Alertmanager) ClusterPeers() []string {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
return am.status.peerIDs
|
|
}
|
|
|
|
// ClusterMemberNames returns a list of names of all Alertmanager instances
|
|
// that are in the same cluster as this instance (including self).
|
|
// Names are the same as in karma configuration.
|
|
func (am *Alertmanager) ClusterMemberNames() []string {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
members := []string{am.Name}
|
|
|
|
upstreams := GetAlertmanagers()
|
|
for _, upstream := range upstreams {
|
|
if upstream.Name == am.Name {
|
|
continue
|
|
}
|
|
for _, peerID := range upstream.ClusterPeers() {
|
|
if slices.StringInSlice(am.status.peerIDs, peerID) {
|
|
if !slices.StringInSlice(members, upstream.Name) {
|
|
members = append(members, upstream.Name)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return members
|
|
}
|