package alertmanager import ( "encoding/json" "fmt" "net/http" "path" "sort" "strings" "sync" "time" "github.com/prymitive/karma/internal/config" "github.com/prymitive/karma/internal/filters" "github.com/prymitive/karma/internal/mapper" "github.com/prymitive/karma/internal/models" "github.com/prymitive/karma/internal/slices" "github.com/prymitive/karma/internal/transform" "github.com/prymitive/karma/internal/uri" log "github.com/sirupsen/logrus" ) const ( labelValueErrorsAlerts = "alerts" labelValueErrorsSilences = "silences" ) type alertmanagerMetrics struct { cycles float64 errors map[string]float64 } type alertmanagerStatus struct { version string amID string peerIDs []string } // Alertmanager represents Alertmanager upstream instance type Alertmanager struct { URI string `json:"uri"` RequestTimeout time.Duration `json:"timeout"` Name string `json:"name"` // whenever this instance should be proxied ProxyRequests bool `json:"proxyRequests"` // reader instances are specific to URI scheme we collect from reader uri.Reader // implements how we fetch requests from the Alertmanager, we don't set it // by default so it's nil and http.DefaultTransport is used HTTPTransport http.RoundTripper `json:"-"` // lock protects data access while updating lock sync.RWMutex // fields for storing pulled data alertGroups []models.AlertGroup silences map[string]models.Silence colors models.LabelsColorMap autocomplete []models.Autocomplete knownLabels []string lastError string status alertmanagerStatus // metrics tracked per alertmanager instance metrics alertmanagerMetrics // headers to send with each AlertManager request HTTPHeaders map[string]string } func (am *Alertmanager) fetchStatus() alertmanagerStatus { status := alertmanagerStatus{ // if everything fails assume Alertmanager is at latest possible version version: "999.0.0", amID: "", peerIDs: []string{}, } url, err := uri.JoinURL(am.URI, "api/v1/status") if err != nil { log.Errorf("Failed to join url '%s' and path 'api/v1/status': %s", am.SanitizedURI(), err) return status } resp := alertmanagerStatusResponse{} // read raw body from the source source, err := am.reader.Read(url, am.HTTPHeaders) if err != nil { log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err) return status } defer source.Close() // decode body as JSON err = json.NewDecoder(source).Decode(&resp) if err != nil { log.Errorf("[%s] %s failed to decode as JSON: %s", am.Name, uri.SanitizeURI(url), err) return status } if resp.Status != "success" { log.Errorf("[%s] Request to %s returned status %s", am.Name, uri.SanitizeURI(url), resp.Status) return status } if resp.Data.VersionInfo.Version == "" { log.Errorf("[%s] No version information in Alertmanager API at %s", am.Name, uri.SanitizeURI(url)) return status } status.version = resp.Data.VersionInfo.Version log.Infof("[%s] Remote Alertmanager version: %s", am.Name, status.version) if resp.Data.ClusterStatus.Name != "" { status.amID = resp.Data.ClusterStatus.Name for _, peer := range resp.Data.ClusterStatus.Peers { status.peerIDs = append(status.peerIDs, peer.Name) } } else if resp.Data.MeshStatus.Name != "" { status.amID = resp.Data.MeshStatus.Name for _, peer := range resp.Data.MeshStatus.Peers { status.peerIDs = append(status.peerIDs, peer.Name) } } return status } func (am *Alertmanager) clearData() { am.lock.Lock() am.alertGroups = []models.AlertGroup{} am.silences = map[string]models.Silence{} am.colors = models.LabelsColorMap{} am.autocomplete = []models.Autocomplete{} am.knownLabels = []string{} am.status = alertmanagerStatus{ version: "", amID: "", peerIDs: []string{}, } am.lock.Unlock() } func (am *Alertmanager) pullSilences(version string) error { mapper, err := mapper.GetSilenceMapper(version) if err != nil { return err } // generate full URL to collect silences from url, err := mapper.AbsoluteURL(am.URI) if err != nil { log.Errorf("[%s] Failed to generate silences endpoint URL: %s", am.Name, err) return err } // append query args if mapper needs those queryArgs := mapper.QueryArgs() if queryArgs != "" { url = fmt.Sprintf("%s?%s", url, queryArgs) } start := time.Now() // read raw body from the source source, err := am.reader.Read(url, am.HTTPHeaders) if err != nil { log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err) return err } defer source.Close() // decode body text silences, err := mapper.Decode(source) if err != nil { return err } log.Infof("[%s] Got %d silences(s) in %s", am.Name, len(silences), time.Since(start)) log.Infof("[%s] Detecting JIRA links in silences (%d)", am.Name, len(silences)) silenceMap := map[string]models.Silence{} for _, silence := range silences { silence.JiraID, silence.JiraURL = transform.DetectJIRAs(&silence) silenceMap[silence.ID] = silence } am.lock.Lock() am.silences = silenceMap am.lock.Unlock() return nil } // PublicURI is the URI of this Alertmanager we put in JSON reponse // it's either real full URI or a proxy relative URI func (am *Alertmanager) PublicURI() string { if am.ProxyRequests { sub := fmt.Sprintf("/proxy/alertmanager/%s", am.Name) uri := path.Join(config.Config.Listen.Prefix, sub) if strings.HasSuffix(sub, "/") { // if sub path had trailing slash then add it here, since path.Join will // skip it return uri + "/" } return uri } return am.URI } func (am *Alertmanager) pullAlerts(version string) error { mapper, err := mapper.GetAlertMapper(version) if err != nil { return err } // generate full URL to collect alerts from url, err := mapper.AbsoluteURL(am.URI) if err != nil { log.Errorf("[%s] Failed to generate alerts endpoint URL: %s", am.Name, err) return err } // append query args if mapper needs those queryArgs := mapper.QueryArgs() if queryArgs != "" { url = fmt.Sprintf("%s?%s", url, queryArgs) } start := time.Now() // read raw body from the source source, err := am.reader.Read(url, am.HTTPHeaders) if err != nil { log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err) return err } defer source.Close() // decode body text groups, err := mapper.Decode(source) if err != nil { return err } log.Infof("[%s] Got %d alert group(s) in %s", am.Name, len(groups), time.Since(start)) log.Infof("[%s] Deduplicating alert groups (%d)", am.Name, len(groups)) uniqueGroups := map[string]models.AlertGroup{} uniqueAlerts := map[string]map[string]models.Alert{} knownLabelsMap := map[string]bool{} for _, ag := range groups { agID := ag.LabelsFingerprint() if _, found := uniqueGroups[agID]; !found { uniqueGroups[agID] = models.AlertGroup{ Receiver: ag.Receiver, Labels: ag.Labels, ID: agID, } } for _, alert := range ag.Alerts { if _, found := uniqueAlerts[agID]; !found { uniqueAlerts[agID] = map[string]models.Alert{} } alertCFP := alert.ContentFingerprint() if _, found := uniqueAlerts[agID][alertCFP]; !found { uniqueAlerts[agID][alertCFP] = alert } for key := range alert.Labels { knownLabelsMap[key] = true } } } dedupedGroups := []models.AlertGroup{} colors := models.LabelsColorMap{} autocompleteMap := map[string]models.Autocomplete{} log.Infof("[%s] Processing unique alert groups (%d)", am.Name, len(uniqueGroups)) for _, ag := range uniqueGroups { alerts := models.AlertList{} for _, alert := range uniqueAlerts[ag.ID] { silences := map[string]*models.Silence{} for _, silenceID := range alert.SilencedBy { silence, err := am.SilenceByID(silenceID) if err == nil { silences[silenceID] = &silence } } alert.Alertmanager = []models.AlertmanagerInstance{ models.AlertmanagerInstance{ Name: am.Name, Cluster: am.ClusterID(), State: alert.State, StartsAt: alert.StartsAt, EndsAt: alert.EndsAt, Source: alert.GeneratorURL, Silences: silences, SilencedBy: alert.SilencedBy, }, } transform.ColorLabel(colors, "@receiver", alert.Receiver) for k, v := range alert.Labels { transform.ColorLabel(colors, k, v) } alert.UpdateFingerprints() alerts = append(alerts, alert) } for _, hint := range filters.BuildAutocomplete(alerts) { autocompleteMap[hint.Value] = hint } sort.Sort(&alerts) ag.Alerts = alerts // Hash is a checksum of all alerts, used to tell when any alert in the group changed ag.Hash = ag.ContentFingerprint() dedupedGroups = append(dedupedGroups, ag) } log.Infof("[%s] Merging autocomplete data (%d)", am.Name, len(autocompleteMap)) autocomplete := []models.Autocomplete{} for _, hint := range autocompleteMap { autocomplete = append(autocomplete, hint) } knownLabels := []string{} for key := range knownLabelsMap { knownLabels = append(knownLabels, key) } am.lock.Lock() am.alertGroups = dedupedGroups am.colors = colors am.autocomplete = autocomplete am.knownLabels = knownLabels am.lock.Unlock() return nil } // Pull data from upstream Alertmanager instance func (am *Alertmanager) Pull() error { am.metrics.cycles++ status := am.fetchStatus() err := am.pullSilences(status.version) if err != nil { am.clearData() am.setError(err.Error()) am.metrics.errors[labelValueErrorsSilences]++ return err } err = am.pullAlerts(status.version) if err != nil { am.clearData() am.setError(err.Error()) am.metrics.errors[labelValueErrorsAlerts]++ return err } am.lock.Lock() am.status = status am.lastError = "" am.lock.Unlock() return nil } // Alerts returns a copy of all alert groups func (am *Alertmanager) Alerts() []models.AlertGroup { am.lock.RLock() defer am.lock.RUnlock() alerts := make([]models.AlertGroup, len(am.alertGroups)) copy(alerts, am.alertGroups) return alerts } // Silences returns a copy of all silences func (am *Alertmanager) Silences() map[string]models.Silence { am.lock.RLock() defer am.lock.RUnlock() silences := map[string]models.Silence{} for id, silence := range am.silences { silences[id] = silence } return silences } // SilenceByID allows to query for a silence by it's ID, returns error if not found func (am *Alertmanager) SilenceByID(id string) (models.Silence, error) { am.lock.RLock() defer am.lock.RUnlock() s, found := am.silences[id] if !found { return models.Silence{}, fmt.Errorf("silence '%s' not found", id) } return s, nil } // Colors returns a copy of all color maps func (am *Alertmanager) Colors() models.LabelsColorMap { am.lock.RLock() defer am.lock.RUnlock() colors := models.LabelsColorMap{} for k, v := range am.colors { colors[k] = map[string]models.LabelColors{} for nk, nv := range v { colors[k][nk] = nv } } return colors } // Autocomplete returns a copy of all autocomplete data func (am *Alertmanager) Autocomplete() []models.Autocomplete { am.lock.RLock() defer am.lock.RUnlock() autocomplete := make([]models.Autocomplete, len(am.autocomplete)) copy(autocomplete, am.autocomplete) return autocomplete } // KnownLabels returns a copy of a map with known labels func (am *Alertmanager) KnownLabels() []string { am.lock.RLock() defer am.lock.RUnlock() knownLabels := make([]string, len(am.knownLabels)) copy(knownLabels, am.knownLabels) return knownLabels } func (am *Alertmanager) setError(err string) { am.lock.Lock() defer am.lock.Unlock() am.lastError = err } func (am *Alertmanager) Error() string { am.lock.RLock() defer am.lock.RUnlock() return am.lastError } // SanitizedURI returns a copy of Alertmanager.URI with password replaced by // "xxx" func (am *Alertmanager) SanitizedURI() string { am.lock.RLock() defer am.lock.RUnlock() return uri.SanitizeURI(am.URI) } // Version returns last known version of this Alertmanager instance func (am *Alertmanager) Version() string { am.lock.RLock() defer am.lock.RUnlock() return am.status.version } // ClusterPeers returns a list of IDs of all peers this instance // is connected to. // IDs are the same as in Alertmanager API. func (am *Alertmanager) ClusterPeers() []string { am.lock.RLock() defer am.lock.RUnlock() return am.status.peerIDs } // ClusterMemberNames returns a list of names of all Alertmanager instances // that are in the same cluster as this instance (including self). // Names are the same as in karma configuration. func (am *Alertmanager) ClusterMemberNames() []string { am.lock.RLock() defer am.lock.RUnlock() members := []string{am.Name} upstreams := GetAlertmanagers() for _, upstream := range upstreams { if upstream.Name == am.Name { continue } for _, peerID := range upstream.ClusterPeers() { if slices.StringInSlice(am.status.peerIDs, peerID) { if !slices.StringInSlice(members, upstream.Name) { members = append(members, upstream.Name) } } } } sort.Strings(members) return members } // ClusterID returns the ID (sha1) of the cluster this Alertmanager instance // belongs to func (am *Alertmanager) ClusterID() string { members := am.ClusterMemberNames() id, err := slices.StringSliceToSHA1(members) if err != nil { log.Errorf("slices.StringSliceToSHA1 error: %s", err) return am.Name } return id }