Files
karma/internal/alertmanager/models.go
Richard Maynard ec14be0288 feat(backend): add support for custom headers (#368)
This will allow the AlertManager upstreams to be sent user defined HTTP headers.
2019-01-17 08:53:33 +00:00

521 lines
13 KiB
Go

package alertmanager
import (
"encoding/json"
"fmt"
"net/http"
"path"
"sort"
"strings"
"sync"
"time"
"github.com/prymitive/karma/internal/config"
"github.com/prymitive/karma/internal/filters"
"github.com/prymitive/karma/internal/mapper"
"github.com/prymitive/karma/internal/models"
"github.com/prymitive/karma/internal/slices"
"github.com/prymitive/karma/internal/transform"
"github.com/prymitive/karma/internal/uri"
log "github.com/sirupsen/logrus"
)
const (
labelValueErrorsAlerts = "alerts"
labelValueErrorsSilences = "silences"
)
type alertmanagerMetrics struct {
cycles float64
errors map[string]float64
}
type alertmanagerStatus struct {
version string
amID string
peerIDs []string
}
// Alertmanager represents Alertmanager upstream instance
type Alertmanager struct {
URI string `json:"uri"`
RequestTimeout time.Duration `json:"timeout"`
Name string `json:"name"`
// whenever this instance should be proxied
ProxyRequests bool `json:"proxyRequests"`
// reader instances are specific to URI scheme we collect from
reader uri.Reader
// implements how we fetch requests from the Alertmanager, we don't set it
// by default so it's nil and http.DefaultTransport is used
HTTPTransport http.RoundTripper `json:"-"`
// lock protects data access while updating
lock sync.RWMutex
// fields for storing pulled data
alertGroups []models.AlertGroup
silences map[string]models.Silence
colors models.LabelsColorMap
autocomplete []models.Autocomplete
knownLabels []string
lastError string
status alertmanagerStatus
// metrics tracked per alertmanager instance
metrics alertmanagerMetrics
// headers to send with each AlertManager request
HTTPHeaders map[string]string
}
func (am *Alertmanager) fetchStatus() alertmanagerStatus {
status := alertmanagerStatus{
// if everything fails assume Alertmanager is at latest possible version
version: "999.0.0",
amID: "",
peerIDs: []string{},
}
url, err := uri.JoinURL(am.URI, "api/v1/status")
if err != nil {
log.Errorf("Failed to join url '%s' and path 'api/v1/status': %s", am.SanitizedURI(), err)
return status
}
resp := alertmanagerStatusResponse{}
// read raw body from the source
source, err := am.reader.Read(url, am.HTTPHeaders)
if err != nil {
log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err)
return status
}
defer source.Close()
// decode body as JSON
err = json.NewDecoder(source).Decode(&resp)
if err != nil {
log.Errorf("[%s] %s failed to decode as JSON: %s", am.Name, uri.SanitizeURI(url), err)
return status
}
if resp.Status != "success" {
log.Errorf("[%s] Request to %s returned status %s", am.Name, uri.SanitizeURI(url), resp.Status)
return status
}
if resp.Data.VersionInfo.Version == "" {
log.Errorf("[%s] No version information in Alertmanager API at %s", am.Name, uri.SanitizeURI(url))
return status
}
status.version = resp.Data.VersionInfo.Version
log.Infof("[%s] Remote Alertmanager version: %s", am.Name, status.version)
if resp.Data.ClusterStatus.Name != "" {
status.amID = resp.Data.ClusterStatus.Name
for _, peer := range resp.Data.ClusterStatus.Peers {
status.peerIDs = append(status.peerIDs, peer.Name)
}
} else if resp.Data.MeshStatus.Name != "" {
status.amID = resp.Data.MeshStatus.Name
for _, peer := range resp.Data.MeshStatus.Peers {
status.peerIDs = append(status.peerIDs, peer.Name)
}
}
return status
}
func (am *Alertmanager) clearData() {
am.lock.Lock()
am.alertGroups = []models.AlertGroup{}
am.silences = map[string]models.Silence{}
am.colors = models.LabelsColorMap{}
am.autocomplete = []models.Autocomplete{}
am.knownLabels = []string{}
am.status = alertmanagerStatus{
version: "",
amID: "",
peerIDs: []string{},
}
am.lock.Unlock()
}
func (am *Alertmanager) pullSilences(version string) error {
mapper, err := mapper.GetSilenceMapper(version)
if err != nil {
return err
}
// generate full URL to collect silences from
url, err := mapper.AbsoluteURL(am.URI)
if err != nil {
log.Errorf("[%s] Failed to generate silences endpoint URL: %s", am.Name, err)
return err
}
// append query args if mapper needs those
queryArgs := mapper.QueryArgs()
if queryArgs != "" {
url = fmt.Sprintf("%s?%s", url, queryArgs)
}
start := time.Now()
// read raw body from the source
source, err := am.reader.Read(url, am.HTTPHeaders)
if err != nil {
log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err)
return err
}
defer source.Close()
// decode body text
silences, err := mapper.Decode(source)
if err != nil {
return err
}
log.Infof("[%s] Got %d silences(s) in %s", am.Name, len(silences), time.Since(start))
log.Infof("[%s] Detecting JIRA links in silences (%d)", am.Name, len(silences))
silenceMap := map[string]models.Silence{}
for _, silence := range silences {
silence.JiraID, silence.JiraURL = transform.DetectJIRAs(&silence)
silenceMap[silence.ID] = silence
}
am.lock.Lock()
am.silences = silenceMap
am.lock.Unlock()
return nil
}
// PublicURI is the URI of this Alertmanager we put in JSON reponse
// it's either real full URI or a proxy relative URI
func (am *Alertmanager) PublicURI() string {
if am.ProxyRequests {
sub := fmt.Sprintf("/proxy/alertmanager/%s", am.Name)
uri := path.Join(config.Config.Listen.Prefix, sub)
if strings.HasSuffix(sub, "/") {
// if sub path had trailing slash then add it here, since path.Join will
// skip it
return uri + "/"
}
return uri
}
return am.URI
}
func (am *Alertmanager) pullAlerts(version string) error {
mapper, err := mapper.GetAlertMapper(version)
if err != nil {
return err
}
// generate full URL to collect alerts from
url, err := mapper.AbsoluteURL(am.URI)
if err != nil {
log.Errorf("[%s] Failed to generate alerts endpoint URL: %s", am.Name, err)
return err
}
// append query args if mapper needs those
queryArgs := mapper.QueryArgs()
if queryArgs != "" {
url = fmt.Sprintf("%s?%s", url, queryArgs)
}
start := time.Now()
// read raw body from the source
source, err := am.reader.Read(url, am.HTTPHeaders)
if err != nil {
log.Errorf("[%s] %s request failed: %s", am.Name, uri.SanitizeURI(url), err)
return err
}
defer source.Close()
// decode body text
groups, err := mapper.Decode(source)
if err != nil {
return err
}
log.Infof("[%s] Got %d alert group(s) in %s", am.Name, len(groups), time.Since(start))
log.Infof("[%s] Deduplicating alert groups (%d)", am.Name, len(groups))
uniqueGroups := map[string]models.AlertGroup{}
uniqueAlerts := map[string]map[string]models.Alert{}
knownLabelsMap := map[string]bool{}
for _, ag := range groups {
agID := ag.LabelsFingerprint()
if _, found := uniqueGroups[agID]; !found {
uniqueGroups[agID] = models.AlertGroup{
Receiver: ag.Receiver,
Labels: ag.Labels,
ID: agID,
}
}
for _, alert := range ag.Alerts {
if _, found := uniqueAlerts[agID]; !found {
uniqueAlerts[agID] = map[string]models.Alert{}
}
alertCFP := alert.ContentFingerprint()
if _, found := uniqueAlerts[agID][alertCFP]; !found {
uniqueAlerts[agID][alertCFP] = alert
}
for key := range alert.Labels {
knownLabelsMap[key] = true
}
}
}
dedupedGroups := []models.AlertGroup{}
colors := models.LabelsColorMap{}
autocompleteMap := map[string]models.Autocomplete{}
log.Infof("[%s] Processing unique alert groups (%d)", am.Name, len(uniqueGroups))
for _, ag := range uniqueGroups {
alerts := models.AlertList{}
for _, alert := range uniqueAlerts[ag.ID] {
silences := map[string]*models.Silence{}
for _, silenceID := range alert.SilencedBy {
silence, err := am.SilenceByID(silenceID)
if err == nil {
silences[silenceID] = &silence
}
}
alert.Alertmanager = []models.AlertmanagerInstance{
models.AlertmanagerInstance{
Name: am.Name,
Cluster: am.ClusterID(),
State: alert.State,
StartsAt: alert.StartsAt,
EndsAt: alert.EndsAt,
Source: alert.GeneratorURL,
Silences: silences,
SilencedBy: alert.SilencedBy,
},
}
transform.ColorLabel(colors, "@receiver", alert.Receiver)
for k, v := range alert.Labels {
transform.ColorLabel(colors, k, v)
}
alert.UpdateFingerprints()
alerts = append(alerts, alert)
}
for _, hint := range filters.BuildAutocomplete(alerts) {
autocompleteMap[hint.Value] = hint
}
sort.Sort(&alerts)
ag.Alerts = alerts
// Hash is a checksum of all alerts, used to tell when any alert in the group changed
ag.Hash = ag.ContentFingerprint()
dedupedGroups = append(dedupedGroups, ag)
}
log.Infof("[%s] Merging autocomplete data (%d)", am.Name, len(autocompleteMap))
autocomplete := []models.Autocomplete{}
for _, hint := range autocompleteMap {
autocomplete = append(autocomplete, hint)
}
knownLabels := []string{}
for key := range knownLabelsMap {
knownLabels = append(knownLabels, key)
}
am.lock.Lock()
am.alertGroups = dedupedGroups
am.colors = colors
am.autocomplete = autocomplete
am.knownLabels = knownLabels
am.lock.Unlock()
return nil
}
// Pull data from upstream Alertmanager instance
func (am *Alertmanager) Pull() error {
am.metrics.cycles++
status := am.fetchStatus()
err := am.pullSilences(status.version)
if err != nil {
am.clearData()
am.setError(err.Error())
am.metrics.errors[labelValueErrorsSilences]++
return err
}
err = am.pullAlerts(status.version)
if err != nil {
am.clearData()
am.setError(err.Error())
am.metrics.errors[labelValueErrorsAlerts]++
return err
}
am.lock.Lock()
am.status = status
am.lastError = ""
am.lock.Unlock()
return nil
}
// Alerts returns a copy of all alert groups
func (am *Alertmanager) Alerts() []models.AlertGroup {
am.lock.RLock()
defer am.lock.RUnlock()
alerts := make([]models.AlertGroup, len(am.alertGroups))
copy(alerts, am.alertGroups)
return alerts
}
// Silences returns a copy of all silences
func (am *Alertmanager) Silences() map[string]models.Silence {
am.lock.RLock()
defer am.lock.RUnlock()
silences := map[string]models.Silence{}
for id, silence := range am.silences {
silences[id] = silence
}
return silences
}
// SilenceByID allows to query for a silence by it's ID, returns error if not found
func (am *Alertmanager) SilenceByID(id string) (models.Silence, error) {
am.lock.RLock()
defer am.lock.RUnlock()
s, found := am.silences[id]
if !found {
return models.Silence{}, fmt.Errorf("silence '%s' not found", id)
}
return s, nil
}
// Colors returns a copy of all color maps
func (am *Alertmanager) Colors() models.LabelsColorMap {
am.lock.RLock()
defer am.lock.RUnlock()
colors := models.LabelsColorMap{}
for k, v := range am.colors {
colors[k] = map[string]models.LabelColors{}
for nk, nv := range v {
colors[k][nk] = nv
}
}
return colors
}
// Autocomplete returns a copy of all autocomplete data
func (am *Alertmanager) Autocomplete() []models.Autocomplete {
am.lock.RLock()
defer am.lock.RUnlock()
autocomplete := make([]models.Autocomplete, len(am.autocomplete))
copy(autocomplete, am.autocomplete)
return autocomplete
}
// KnownLabels returns a copy of a map with known labels
func (am *Alertmanager) KnownLabels() []string {
am.lock.RLock()
defer am.lock.RUnlock()
knownLabels := make([]string, len(am.knownLabels))
copy(knownLabels, am.knownLabels)
return knownLabels
}
func (am *Alertmanager) setError(err string) {
am.lock.Lock()
defer am.lock.Unlock()
am.lastError = err
}
func (am *Alertmanager) Error() string {
am.lock.RLock()
defer am.lock.RUnlock()
return am.lastError
}
// SanitizedURI returns a copy of Alertmanager.URI with password replaced by
// "xxx"
func (am *Alertmanager) SanitizedURI() string {
am.lock.RLock()
defer am.lock.RUnlock()
return uri.SanitizeURI(am.URI)
}
// Version returns last known version of this Alertmanager instance
func (am *Alertmanager) Version() string {
am.lock.RLock()
defer am.lock.RUnlock()
return am.status.version
}
// ClusterPeers returns a list of IDs of all peers this instance
// is connected to.
// IDs are the same as in Alertmanager API.
func (am *Alertmanager) ClusterPeers() []string {
am.lock.RLock()
defer am.lock.RUnlock()
return am.status.peerIDs
}
// ClusterMemberNames returns a list of names of all Alertmanager instances
// that are in the same cluster as this instance (including self).
// Names are the same as in karma configuration.
func (am *Alertmanager) ClusterMemberNames() []string {
am.lock.RLock()
defer am.lock.RUnlock()
members := []string{am.Name}
upstreams := GetAlertmanagers()
for _, upstream := range upstreams {
if upstream.Name == am.Name {
continue
}
for _, peerID := range upstream.ClusterPeers() {
if slices.StringInSlice(am.status.peerIDs, peerID) {
if !slices.StringInSlice(members, upstream.Name) {
members = append(members, upstream.Name)
}
}
}
}
sort.Strings(members)
return members
}
// ClusterID returns the ID (sha1) of the cluster this Alertmanager instance
// belongs to
func (am *Alertmanager) ClusterID() string {
members := am.ClusterMemberNames()
id, err := slices.StringSliceToSHA1(members)
if err != nil {
log.Errorf("slices.StringSliceToSHA1 error: %s", err)
return am.Name
}
return id
}