mirror of
https://github.com/prymitive/karma
synced 2026-05-07 03:26:52 +00:00
We need to know upstream version to decide which API endpoints we can use, metrics don't change between versions so they allow us to do that without any prior knowlage or assumptions
548 lines
14 KiB
Go
548 lines
14 KiB
Go
package alertmanager
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"path"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prymitive/karma/internal/config"
|
|
"github.com/prymitive/karma/internal/filters"
|
|
"github.com/prymitive/karma/internal/mapper"
|
|
"github.com/prymitive/karma/internal/models"
|
|
"github.com/prymitive/karma/internal/slices"
|
|
"github.com/prymitive/karma/internal/transform"
|
|
"github.com/prymitive/karma/internal/uri"
|
|
"github.com/prymitive/karma/internal/verprobe"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
labelValueErrorsAlerts = "alerts"
|
|
labelValueErrorsSilences = "silences"
|
|
)
|
|
|
|
type alertmanagerMetrics struct {
|
|
cycles float64
|
|
errors map[string]float64
|
|
}
|
|
|
|
type alertmanagerStatus struct {
|
|
version string
|
|
amID string
|
|
peerIDs []string
|
|
}
|
|
|
|
// Alertmanager represents Alertmanager upstream instance
|
|
type Alertmanager struct {
|
|
URI string `json:"uri"`
|
|
RequestTimeout time.Duration `json:"timeout"`
|
|
Name string `json:"name"`
|
|
// whenever this instance should be proxied
|
|
ProxyRequests bool `json:"proxyRequests"`
|
|
// reader instances are specific to URI scheme we collect from
|
|
reader uri.Reader
|
|
// implements how we fetch requests from the Alertmanager, we don't set it
|
|
// by default so it's nil and http.DefaultTransport is used
|
|
HTTPTransport http.RoundTripper `json:"-"`
|
|
// lock protects data access while updating
|
|
lock sync.RWMutex
|
|
// fields for storing pulled data
|
|
alertGroups []models.AlertGroup
|
|
silences map[string]models.Silence
|
|
colors models.LabelsColorMap
|
|
autocomplete []models.Autocomplete
|
|
knownLabels []string
|
|
lastError string
|
|
status alertmanagerStatus
|
|
// metrics tracked per alertmanager instance
|
|
metrics alertmanagerMetrics
|
|
// headers to send with each AlertManager request
|
|
HTTPHeaders map[string]string
|
|
}
|
|
|
|
func (am *Alertmanager) probeVersion() string {
|
|
const fakeVersion = "999.0.0"
|
|
|
|
url, err := uri.JoinURL(am.URI, "metrics")
|
|
if err != nil {
|
|
log.Errorf("Failed to join url '%s' and path 'metrics': %s", am.SanitizedURI(), err)
|
|
return fakeVersion
|
|
}
|
|
|
|
source, err := am.reader.Read(url, am.HTTPHeaders)
|
|
if err != nil {
|
|
log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err)
|
|
return fakeVersion
|
|
}
|
|
defer source.Close()
|
|
|
|
version, err := verprobe.Detect(source)
|
|
if err != nil {
|
|
return fakeVersion
|
|
}
|
|
|
|
return version
|
|
}
|
|
|
|
func (am *Alertmanager) fetchStatus() alertmanagerStatus {
|
|
status := alertmanagerStatus{
|
|
// if everything fails assume Alertmanager is at latest possible version
|
|
version: "999.0.0",
|
|
amID: "",
|
|
peerIDs: []string{},
|
|
}
|
|
|
|
url, err := uri.JoinURL(am.URI, "api/v1/status")
|
|
if err != nil {
|
|
log.Errorf("Failed to join url '%s' and path 'api/v1/status': %s", am.SanitizedURI(), err)
|
|
return status
|
|
}
|
|
|
|
resp := alertmanagerStatusResponse{}
|
|
|
|
// read raw body from the source
|
|
source, err := am.reader.Read(url, am.HTTPHeaders)
|
|
if err != nil {
|
|
log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err)
|
|
return status
|
|
}
|
|
defer source.Close()
|
|
|
|
// decode body as JSON
|
|
err = json.NewDecoder(source).Decode(&resp)
|
|
if err != nil {
|
|
log.Errorf("[%s] %s failed to decode as JSON: %s", am.Name, uri.SanitizeURI(url), err)
|
|
return status
|
|
}
|
|
|
|
if resp.Status != "success" {
|
|
log.Errorf("[%s] Request to %s returned status %s", am.Name, uri.SanitizeURI(url), resp.Status)
|
|
return status
|
|
}
|
|
|
|
if resp.Data.VersionInfo.Version == "" {
|
|
log.Errorf("[%s] No version information in Alertmanager API at %s", am.Name, uri.SanitizeURI(url))
|
|
return status
|
|
}
|
|
|
|
status.version = resp.Data.VersionInfo.Version
|
|
log.Infof("[%s] Remote Alertmanager version: %s", am.Name, status.version)
|
|
|
|
if resp.Data.ClusterStatus.Name != "" {
|
|
status.amID = resp.Data.ClusterStatus.Name
|
|
for _, peer := range resp.Data.ClusterStatus.Peers {
|
|
status.peerIDs = append(status.peerIDs, peer.Name)
|
|
}
|
|
} else if resp.Data.MeshStatus.Name != "" {
|
|
status.amID = resp.Data.MeshStatus.Name
|
|
for _, peer := range resp.Data.MeshStatus.Peers {
|
|
status.peerIDs = append(status.peerIDs, peer.Name)
|
|
}
|
|
}
|
|
|
|
return status
|
|
}
|
|
|
|
func (am *Alertmanager) clearData() {
|
|
am.lock.Lock()
|
|
am.alertGroups = []models.AlertGroup{}
|
|
am.silences = map[string]models.Silence{}
|
|
am.colors = models.LabelsColorMap{}
|
|
am.autocomplete = []models.Autocomplete{}
|
|
am.knownLabels = []string{}
|
|
am.status = alertmanagerStatus{
|
|
version: "",
|
|
amID: "",
|
|
peerIDs: []string{},
|
|
}
|
|
am.lock.Unlock()
|
|
}
|
|
|
|
func (am *Alertmanager) pullSilences(version string) error {
|
|
mapper, err := mapper.GetSilenceMapper(version)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// generate full URL to collect silences from
|
|
url, err := mapper.AbsoluteURL(am.URI)
|
|
if err != nil {
|
|
log.Errorf("[%s] Failed to generate silences endpoint URL: %s", am.Name, err)
|
|
return err
|
|
}
|
|
// append query args if mapper needs those
|
|
queryArgs := mapper.QueryArgs()
|
|
if queryArgs != "" {
|
|
url = fmt.Sprintf("%s?%s", url, queryArgs)
|
|
}
|
|
|
|
start := time.Now()
|
|
// read raw body from the source
|
|
source, err := am.reader.Read(url, am.HTTPHeaders)
|
|
if err != nil {
|
|
log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err)
|
|
return err
|
|
}
|
|
defer source.Close()
|
|
|
|
// decode body text
|
|
silences, err := mapper.Decode(source)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Infof("[%s] Got %d silences(s) in %s", am.Name, len(silences), time.Since(start))
|
|
|
|
log.Infof("[%s] Detecting JIRA links in silences (%d)", am.Name, len(silences))
|
|
silenceMap := map[string]models.Silence{}
|
|
for _, silence := range silences {
|
|
silence.JiraID, silence.JiraURL = transform.DetectJIRAs(&silence)
|
|
silenceMap[silence.ID] = silence
|
|
}
|
|
|
|
am.lock.Lock()
|
|
am.silences = silenceMap
|
|
am.lock.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// PublicURI is the URI of this Alertmanager we put in JSON reponse
|
|
// it's either real full URI or a proxy relative URI
|
|
func (am *Alertmanager) PublicURI() string {
|
|
if am.ProxyRequests {
|
|
sub := fmt.Sprintf("/proxy/alertmanager/%s", am.Name)
|
|
uri := path.Join(config.Config.Listen.Prefix, sub)
|
|
if strings.HasSuffix(sub, "/") {
|
|
// if sub path had trailing slash then add it here, since path.Join will
|
|
// skip it
|
|
return uri + "/"
|
|
}
|
|
return uri
|
|
}
|
|
return am.URI
|
|
}
|
|
|
|
func (am *Alertmanager) pullAlerts(version string) error {
|
|
mapper, err := mapper.GetAlertMapper(version)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// generate full URL to collect alerts from
|
|
url, err := mapper.AbsoluteURL(am.URI)
|
|
if err != nil {
|
|
log.Errorf("[%s] Failed to generate alerts endpoint URL: %s", am.Name, err)
|
|
return err
|
|
}
|
|
|
|
// append query args if mapper needs those
|
|
queryArgs := mapper.QueryArgs()
|
|
if queryArgs != "" {
|
|
url = fmt.Sprintf("%s?%s", url, queryArgs)
|
|
}
|
|
|
|
start := time.Now()
|
|
// read raw body from the source
|
|
source, err := am.reader.Read(url, am.HTTPHeaders)
|
|
if err != nil {
|
|
log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err)
|
|
return err
|
|
}
|
|
defer source.Close()
|
|
|
|
// decode body text
|
|
groups, err := mapper.Decode(source)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Infof("[%s] Got %d alert group(s) in %s", am.Name, len(groups), time.Since(start))
|
|
|
|
log.Infof("[%s] Deduplicating alert groups (%d)", am.Name, len(groups))
|
|
uniqueGroups := map[string]models.AlertGroup{}
|
|
uniqueAlerts := map[string]map[string]models.Alert{}
|
|
knownLabelsMap := map[string]bool{}
|
|
for _, ag := range groups {
|
|
agID := ag.LabelsFingerprint()
|
|
if _, found := uniqueGroups[agID]; !found {
|
|
uniqueGroups[agID] = models.AlertGroup{
|
|
Receiver: ag.Receiver,
|
|
Labels: ag.Labels,
|
|
ID: agID,
|
|
}
|
|
}
|
|
for _, alert := range ag.Alerts {
|
|
if _, found := uniqueAlerts[agID]; !found {
|
|
uniqueAlerts[agID] = map[string]models.Alert{}
|
|
}
|
|
alertCFP := alert.ContentFingerprint()
|
|
if _, found := uniqueAlerts[agID][alertCFP]; !found {
|
|
uniqueAlerts[agID][alertCFP] = alert
|
|
}
|
|
for key := range alert.Labels {
|
|
knownLabelsMap[key] = true
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
dedupedGroups := []models.AlertGroup{}
|
|
colors := models.LabelsColorMap{}
|
|
autocompleteMap := map[string]models.Autocomplete{}
|
|
|
|
log.Infof("[%s] Processing unique alert groups (%d)", am.Name, len(uniqueGroups))
|
|
for _, ag := range uniqueGroups {
|
|
alerts := models.AlertList{}
|
|
for _, alert := range uniqueAlerts[ag.ID] {
|
|
|
|
silences := map[string]*models.Silence{}
|
|
for _, silenceID := range alert.SilencedBy {
|
|
silence, err := am.SilenceByID(silenceID)
|
|
if err == nil {
|
|
silences[silenceID] = &silence
|
|
}
|
|
}
|
|
|
|
alert.Alertmanager = []models.AlertmanagerInstance{
|
|
models.AlertmanagerInstance{
|
|
Name: am.Name,
|
|
Cluster: am.ClusterID(),
|
|
State: alert.State,
|
|
StartsAt: alert.StartsAt,
|
|
EndsAt: alert.EndsAt,
|
|
Source: alert.GeneratorURL,
|
|
Silences: silences,
|
|
SilencedBy: alert.SilencedBy,
|
|
},
|
|
}
|
|
|
|
transform.ColorLabel(colors, "@receiver", alert.Receiver)
|
|
for k, v := range alert.Labels {
|
|
transform.ColorLabel(colors, k, v)
|
|
}
|
|
|
|
alert.UpdateFingerprints()
|
|
alerts = append(alerts, alert)
|
|
}
|
|
|
|
for _, hint := range filters.BuildAutocomplete(alerts) {
|
|
autocompleteMap[hint.Value] = hint
|
|
}
|
|
|
|
sort.Sort(&alerts)
|
|
ag.Alerts = alerts
|
|
|
|
// Hash is a checksum of all alerts, used to tell when any alert in the group changed
|
|
ag.Hash = ag.ContentFingerprint()
|
|
|
|
dedupedGroups = append(dedupedGroups, ag)
|
|
}
|
|
|
|
log.Infof("[%s] Merging autocomplete data (%d)", am.Name, len(autocompleteMap))
|
|
autocomplete := []models.Autocomplete{}
|
|
for _, hint := range autocompleteMap {
|
|
autocomplete = append(autocomplete, hint)
|
|
}
|
|
|
|
knownLabels := []string{}
|
|
for key := range knownLabelsMap {
|
|
knownLabels = append(knownLabels, key)
|
|
}
|
|
|
|
am.lock.Lock()
|
|
am.alertGroups = dedupedGroups
|
|
am.colors = colors
|
|
am.autocomplete = autocomplete
|
|
am.knownLabels = knownLabels
|
|
am.lock.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Pull data from upstream Alertmanager instance
|
|
func (am *Alertmanager) Pull() error {
|
|
am.metrics.cycles++
|
|
|
|
version := am.probeVersion()
|
|
|
|
status := am.fetchStatus()
|
|
|
|
err := am.pullSilences(version)
|
|
if err != nil {
|
|
am.clearData()
|
|
am.setError(err.Error())
|
|
am.metrics.errors[labelValueErrorsSilences]++
|
|
return err
|
|
}
|
|
|
|
err = am.pullAlerts(version)
|
|
if err != nil {
|
|
am.clearData()
|
|
am.setError(err.Error())
|
|
am.metrics.errors[labelValueErrorsAlerts]++
|
|
return err
|
|
}
|
|
|
|
am.lock.Lock()
|
|
am.status = status
|
|
am.lastError = ""
|
|
am.lock.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Alerts returns a copy of all alert groups
|
|
func (am *Alertmanager) Alerts() []models.AlertGroup {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
alerts := make([]models.AlertGroup, len(am.alertGroups))
|
|
copy(alerts, am.alertGroups)
|
|
return alerts
|
|
}
|
|
|
|
// Silences returns a copy of all silences
|
|
func (am *Alertmanager) Silences() map[string]models.Silence {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
silences := map[string]models.Silence{}
|
|
for id, silence := range am.silences {
|
|
silences[id] = silence
|
|
}
|
|
return silences
|
|
}
|
|
|
|
// SilenceByID allows to query for a silence by it's ID, returns error if not found
|
|
func (am *Alertmanager) SilenceByID(id string) (models.Silence, error) {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
s, found := am.silences[id]
|
|
if !found {
|
|
return models.Silence{}, fmt.Errorf("silence '%s' not found", id)
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// Colors returns a copy of all color maps
|
|
func (am *Alertmanager) Colors() models.LabelsColorMap {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
colors := models.LabelsColorMap{}
|
|
for k, v := range am.colors {
|
|
colors[k] = map[string]models.LabelColors{}
|
|
for nk, nv := range v {
|
|
colors[k][nk] = nv
|
|
}
|
|
}
|
|
return colors
|
|
}
|
|
|
|
// Autocomplete returns a copy of all autocomplete data
|
|
func (am *Alertmanager) Autocomplete() []models.Autocomplete {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
autocomplete := make([]models.Autocomplete, len(am.autocomplete))
|
|
copy(autocomplete, am.autocomplete)
|
|
return autocomplete
|
|
}
|
|
|
|
// KnownLabels returns a copy of a map with known labels
|
|
func (am *Alertmanager) KnownLabels() []string {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
knownLabels := make([]string, len(am.knownLabels))
|
|
copy(knownLabels, am.knownLabels)
|
|
|
|
return knownLabels
|
|
}
|
|
|
|
func (am *Alertmanager) setError(err string) {
|
|
am.lock.Lock()
|
|
defer am.lock.Unlock()
|
|
|
|
am.lastError = err
|
|
}
|
|
|
|
func (am *Alertmanager) Error() string {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
return am.lastError
|
|
}
|
|
|
|
// SanitizedURI returns a copy of Alertmanager.URI with password replaced by
|
|
// "xxx"
|
|
func (am *Alertmanager) SanitizedURI() string {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
return uri.SanitizeURI(am.URI)
|
|
}
|
|
|
|
// Version returns last known version of this Alertmanager instance
|
|
func (am *Alertmanager) Version() string {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
return am.status.version
|
|
}
|
|
|
|
// ClusterPeers returns a list of IDs of all peers this instance
|
|
// is connected to.
|
|
// IDs are the same as in Alertmanager API.
|
|
func (am *Alertmanager) ClusterPeers() []string {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
return am.status.peerIDs
|
|
}
|
|
|
|
// ClusterMemberNames returns a list of names of all Alertmanager instances
|
|
// that are in the same cluster as this instance (including self).
|
|
// Names are the same as in karma configuration.
|
|
func (am *Alertmanager) ClusterMemberNames() []string {
|
|
am.lock.RLock()
|
|
defer am.lock.RUnlock()
|
|
|
|
members := []string{am.Name}
|
|
|
|
upstreams := GetAlertmanagers()
|
|
for _, upstream := range upstreams {
|
|
if upstream.Name == am.Name {
|
|
continue
|
|
}
|
|
for _, peerID := range upstream.ClusterPeers() {
|
|
if slices.StringInSlice(am.status.peerIDs, peerID) {
|
|
if !slices.StringInSlice(members, upstream.Name) {
|
|
members = append(members, upstream.Name)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
sort.Strings(members)
|
|
return members
|
|
}
|
|
|
|
// ClusterID returns the ID (sha1) of the cluster this Alertmanager instance
|
|
// belongs to
|
|
func (am *Alertmanager) ClusterID() string {
|
|
members := am.ClusterMemberNames()
|
|
id, err := slices.StringSliceToSHA1(members)
|
|
if err != nil {
|
|
log.Errorf("slices.StringSliceToSHA1 error: %s", err)
|
|
return am.Name
|
|
}
|
|
return id
|
|
}
|