mirror of
https://github.com/prymitive/karma
synced 2026-05-07 03:26:52 +00:00
Merge pull request #202 from cloudflare/proxy
Add support for proxying user connection to Alertmanager
This commit is contained in:
@@ -49,6 +49,7 @@ alertmanager:
|
||||
- name: string
|
||||
uri: string
|
||||
timeout: duration
|
||||
proxy: bool
|
||||
```
|
||||
|
||||
* `interval` - how often alerts should be refreshed, a string in
|
||||
@@ -70,8 +71,12 @@ alertmanager:
|
||||
of unsee with `make run`.
|
||||
* `timeout` - timeout for requests send to this Alertmanager server, a string in
|
||||
[time.Duration](https://golang.org/pkg/time/#ParseDuration) format.
|
||||
* `proxy` - if enabled requests from user browsers to this Alertmanager will be
|
||||
proxied via unsee. This applies to requests made when managing
|
||||
silences via unsee (creating or expiring silences).
|
||||
|
||||
Example:
|
||||
Example with two production Alertmanager instances running in HA mode and a
|
||||
staging instance that is also proxied:
|
||||
|
||||
```yaml
|
||||
alertmanager:
|
||||
@@ -80,12 +85,15 @@ alertmanager:
|
||||
- name: production1
|
||||
uri: https://alertmanager1.prod.example.com
|
||||
timeout: 20s
|
||||
proxy: false
|
||||
- name: production2
|
||||
uri: https://alertmanager2.prod.example.com
|
||||
timeout: 20s
|
||||
proxy: false
|
||||
- name: staging
|
||||
uri: https://alertmanager.staging.example.com
|
||||
timeout: 30s
|
||||
proxy: true
|
||||
```
|
||||
|
||||
Defaults:
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
alertmanager:
|
||||
interval: 60s
|
||||
servers:
|
||||
- name: mock
|
||||
uri: file://internal/mock/0.11.0
|
||||
- name: local
|
||||
uri: http://localhost:9093
|
||||
timeout: 10s
|
||||
proxy: true
|
||||
annotations:
|
||||
default:
|
||||
hidden: false
|
||||
@@ -29,6 +30,7 @@ listen:
|
||||
port: 8080
|
||||
prefix: /
|
||||
log:
|
||||
config: false
|
||||
level: info
|
||||
jira:
|
||||
- regex: DEVOPS-[0-9]+
|
||||
|
||||
@@ -16,7 +16,9 @@ import (
|
||||
func init() {
|
||||
log.SetLevel(log.ErrorLevel)
|
||||
for i, uri := range mock.ListAllMockURIs() {
|
||||
alertmanager.NewAlertmanager(fmt.Sprintf("dedup-mock-%d", i), uri, time.Second)
|
||||
name := fmt.Sprintf("dedup-mock-%d", i)
|
||||
am := alertmanager.NewAlertmanager(name, uri, alertmanager.WithRequestTimeout(time.Second))
|
||||
alertmanager.RegisterAlertmanager(am)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,10 +2,13 @@ package alertmanager
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cloudflare/unsee/internal/config"
|
||||
"github.com/cloudflare/unsee/internal/mapper"
|
||||
"github.com/cloudflare/unsee/internal/models"
|
||||
"github.com/cloudflare/unsee/internal/transform"
|
||||
@@ -26,9 +29,11 @@ type alertmanagerMetrics struct {
|
||||
|
||||
// Alertmanager represents Alertmanager upstream instance
|
||||
type Alertmanager struct {
|
||||
URI string `json:"uri"`
|
||||
Timeout time.Duration `json:"timeout"`
|
||||
Name string `json:"name"`
|
||||
URI string `json:"uri"`
|
||||
RequestTimeout time.Duration `json:"timeout"`
|
||||
Name string `json:"name"`
|
||||
// whenever this instance should be proxied
|
||||
ProxyRequests bool
|
||||
// lock protects data access while updating
|
||||
lock sync.RWMutex
|
||||
// fields for storing pulled data
|
||||
@@ -51,7 +56,7 @@ func (am *Alertmanager) detectVersion() string {
|
||||
return defaultVersion
|
||||
}
|
||||
ver := alertmanagerVersion{}
|
||||
err = transport.ReadJSON(url, am.Timeout, &ver)
|
||||
err = transport.ReadJSON(url, am.RequestTimeout, &ver)
|
||||
if err != nil {
|
||||
log.Errorf("[%s] %s request failed: %s", am.Name, url, err.Error())
|
||||
return defaultVersion
|
||||
@@ -87,7 +92,7 @@ func (am *Alertmanager) pullSilences(version string) error {
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
silences, err := mapper.GetSilences(am.URI, am.Timeout)
|
||||
silences, err := mapper.GetSilences(am.URI, am.RequestTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -107,6 +112,22 @@ func (am *Alertmanager) pullSilences(version string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// this is the URI of this Alertmanager we put in JSON reponse
|
||||
// it's either real full URI or a proxy relative URI
|
||||
func (am *Alertmanager) publicURI() string {
|
||||
if am.ProxyRequests {
|
||||
sub := fmt.Sprintf("/proxy/alertmanager/%s", am.Name)
|
||||
uri := path.Join(config.Config.Listen.Prefix, sub)
|
||||
if strings.HasSuffix(sub, "/") {
|
||||
// if sub path had trailing slash then add it here, since path.Join will
|
||||
// skip it
|
||||
return uri + "/"
|
||||
}
|
||||
return uri
|
||||
}
|
||||
return am.URI
|
||||
}
|
||||
|
||||
func (am *Alertmanager) pullAlerts(version string) error {
|
||||
mapper, err := mapper.GetAlertMapper(version)
|
||||
if err != nil {
|
||||
@@ -114,7 +135,7 @@ func (am *Alertmanager) pullAlerts(version string) error {
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
groups, err := mapper.GetAlerts(am.URI, am.Timeout)
|
||||
groups, err := mapper.GetAlerts(am.URI, am.RequestTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -163,7 +184,7 @@ func (am *Alertmanager) pullAlerts(version string) error {
|
||||
alert.Alertmanager = []models.AlertmanagerInstance{
|
||||
models.AlertmanagerInstance{
|
||||
Name: am.Name,
|
||||
URI: am.URI,
|
||||
URI: am.publicURI(),
|
||||
State: alert.State,
|
||||
StartsAt: alert.StartsAt,
|
||||
EndsAt: alert.EndsAt,
|
||||
|
||||
@@ -10,31 +10,24 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Option allows to pass functional options to NewAlertmanager()
|
||||
type Option func(am *Alertmanager)
|
||||
|
||||
var (
|
||||
upstreams = map[string]*Alertmanager{}
|
||||
)
|
||||
|
||||
// NewAlertmanager creates a new Alertmanager instance
|
||||
func NewAlertmanager(name, uri string, timeout time.Duration) error {
|
||||
if _, found := upstreams[name]; found {
|
||||
return fmt.Errorf("Alertmanager upstream '%s' already exist", name)
|
||||
}
|
||||
|
||||
for _, am := range upstreams {
|
||||
if am.URI == uri {
|
||||
return fmt.Errorf("Alertmanager upstream '%s' already collects from '%s'", am.Name, am.URI)
|
||||
}
|
||||
}
|
||||
|
||||
upstreams[name] = &Alertmanager{
|
||||
URI: uri,
|
||||
Timeout: timeout,
|
||||
Name: name,
|
||||
lock: sync.RWMutex{},
|
||||
alertGroups: []models.AlertGroup{},
|
||||
silences: map[string]models.Silence{},
|
||||
colors: models.LabelsColorMap{},
|
||||
autocomplete: []models.Autocomplete{},
|
||||
func NewAlertmanager(name, uri string, opts ...Option) *Alertmanager {
|
||||
am := &Alertmanager{
|
||||
URI: uri,
|
||||
RequestTimeout: time.Second * 10,
|
||||
Name: name,
|
||||
lock: sync.RWMutex{},
|
||||
alertGroups: []models.AlertGroup{},
|
||||
silences: map[string]models.Silence{},
|
||||
colors: models.LabelsColorMap{},
|
||||
autocomplete: []models.Autocomplete{},
|
||||
metrics: alertmanagerMetrics{
|
||||
errors: map[string]float64{
|
||||
labelValueErrorsAlerts: 0,
|
||||
@@ -43,8 +36,27 @@ func NewAlertmanager(name, uri string, timeout time.Duration) error {
|
||||
},
|
||||
}
|
||||
|
||||
log.Infof("[%s] Configured Alertmanager source at %s", name, uri)
|
||||
for _, opt := range opts {
|
||||
opt(am)
|
||||
}
|
||||
|
||||
return am
|
||||
}
|
||||
|
||||
// RegisterAlertmanager will add an Alertmanager instance to the list of
|
||||
// instances used when pulling alerts from upstreams
|
||||
func RegisterAlertmanager(am *Alertmanager) error {
|
||||
if _, found := upstreams[am.Name]; found {
|
||||
return fmt.Errorf("Alertmanager upstream '%s' already exist", am.Name)
|
||||
}
|
||||
|
||||
for _, existingAM := range upstreams {
|
||||
if existingAM.URI == am.URI {
|
||||
return fmt.Errorf("Alertmanager upstream '%s' already collects from '%s'", existingAM.Name, existingAM.URI)
|
||||
}
|
||||
}
|
||||
upstreams[am.Name] = am
|
||||
log.Infof("[%s] Configured Alertmanager source at %s (proxied: %v)", am.Name, am.URI, am.ProxyRequests)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -66,3 +78,19 @@ func GetAlertmanagerByName(name string) *Alertmanager {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WithProxy option can be passed to NewAlertmanager in order to enable request
|
||||
// proxying for unsee clients
|
||||
func WithProxy(proxied bool) Option {
|
||||
return func(am *Alertmanager) {
|
||||
am.ProxyRequests = proxied
|
||||
}
|
||||
}
|
||||
|
||||
// WithRequestTimeout option can be passed to NewAlertmanager in order to set
|
||||
// a custom timeout for Alertmanager upstream requests
|
||||
func WithRequestTimeout(timeout time.Duration) Option {
|
||||
return func(am *Alertmanager) {
|
||||
am.RequestTimeout = timeout
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ func testReadConfig(t *testing.T) {
|
||||
- name: default
|
||||
uri: http://localhost
|
||||
timeout: 40s
|
||||
proxy: false
|
||||
annotations:
|
||||
default:
|
||||
hidden: true
|
||||
|
||||
@@ -6,6 +6,7 @@ type alertmanagerConfig struct {
|
||||
Name string
|
||||
URI string
|
||||
Timeout time.Duration
|
||||
Proxy bool
|
||||
}
|
||||
|
||||
type jiraRule struct {
|
||||
|
||||
@@ -485,11 +485,7 @@ var tests = []filterTest{
|
||||
func TestFilters(t *testing.T) {
|
||||
log.SetLevel(log.ErrorLevel)
|
||||
|
||||
err := alertmanager.NewAlertmanager("test", "http://localhost", time.Second)
|
||||
am := alertmanager.GetAlertmanagerByName("test")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
am := alertmanager.NewAlertmanager("test", "http://localhost", alertmanager.WithRequestTimeout(time.Second))
|
||||
for _, ft := range tests {
|
||||
alert := models.Alert(ft.Alert)
|
||||
if &ft.Silence != nil {
|
||||
|
||||
6
main.go
6
main.go
@@ -60,7 +60,8 @@ func setupRouter(router *gin.Engine) {
|
||||
|
||||
func setupUpstreams() {
|
||||
for _, s := range config.Config.Alertmanager.Servers {
|
||||
err := alertmanager.NewAlertmanager(s.Name, s.URI, s.Timeout)
|
||||
am := alertmanager.NewAlertmanager(s.Name, s.URI, alertmanager.WithRequestTimeout(s.Timeout), alertmanager.WithProxy(s.Proxy))
|
||||
err := alertmanager.RegisterAlertmanager(am)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to configure Alertmanager '%s' with URI '%s': %s", s.Name, s.URI, err)
|
||||
}
|
||||
@@ -151,6 +152,9 @@ func main() {
|
||||
}
|
||||
|
||||
setupRouter(router)
|
||||
for _, am := range alertmanager.GetAlertmanagers() {
|
||||
setupRouterProxyHandlers(router, am)
|
||||
}
|
||||
listen := fmt.Sprintf("%s:%d", config.Config.Listen.Address, config.Config.Listen.Port)
|
||||
log.Infof("Listening on %s", listen)
|
||||
err := router.Run(listen)
|
||||
|
||||
62
proxy.go
Normal file
62
proxy.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
|
||||
"github.com/cloudflare/unsee/internal/alertmanager"
|
||||
"github.com/cloudflare/unsee/internal/config"
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func proxyPathPrefix(name string) string {
|
||||
return fmt.Sprintf("%sproxy/alertmanager/%s", config.Config.Listen.Prefix, name)
|
||||
}
|
||||
|
||||
func proxyPath(name, path string) string {
|
||||
return fmt.Sprintf("%s%s", proxyPathPrefix(name), path)
|
||||
}
|
||||
|
||||
// NewAlertmanagerProxy creates a proxy instance for given alertmanager instance
|
||||
func NewAlertmanagerProxy(alertmanager *alertmanager.Alertmanager) (*httputil.ReverseProxy, error) {
|
||||
upstreamURL, err := url.Parse(alertmanager.URI)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
proxy := httputil.ReverseProxy{
|
||||
Director: func(req *http.Request) {
|
||||
req.URL.Scheme = upstreamURL.Scheme
|
||||
req.URL.Host = upstreamURL.Host
|
||||
// drop Accept-Encoding header so we always get uncompressed reponses from
|
||||
// upstream, there's a gzip middleware that's global so we don't want it
|
||||
// to gzip twice
|
||||
req.Header.Del("Accept-Encoding")
|
||||
log.Debugf("[%s] Proxy request for %s", alertmanager.Name, req.URL.Path)
|
||||
},
|
||||
ModifyResponse: func(resp *http.Response) error {
|
||||
// drop Content-Length header from upstream responses, gzip middleware
|
||||
// will compress those and that could cause a mismatch
|
||||
resp.Header.Del("Content-Length")
|
||||
return nil
|
||||
},
|
||||
}
|
||||
return &proxy, nil
|
||||
}
|
||||
|
||||
func setupRouterProxyHandlers(router *gin.Engine, alertmanager *alertmanager.Alertmanager) error {
|
||||
proxy, err := NewAlertmanagerProxy(alertmanager)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
router.POST(
|
||||
proxyPath(alertmanager.Name, "/api/v1/silences"),
|
||||
gin.WrapH(http.StripPrefix(proxyPathPrefix(alertmanager.Name), proxy)))
|
||||
router.DELETE(
|
||||
proxyPath(alertmanager.Name, "/api/v1/silence/*id"),
|
||||
gin.WrapH(http.StripPrefix(proxyPathPrefix(alertmanager.Name), proxy)))
|
||||
return nil
|
||||
}
|
||||
122
proxy_test.go
Normal file
122
proxy_test.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cloudflare/unsee/internal/alertmanager"
|
||||
|
||||
httpmock "gopkg.in/jarcoal/httpmock.v1"
|
||||
)
|
||||
|
||||
// httptest.NewRecorder() doesn't implement http.CloseNotifier
|
||||
type closeNotifyingRecorder struct {
|
||||
*httptest.ResponseRecorder
|
||||
closed chan bool
|
||||
}
|
||||
|
||||
func newCloseNotifyingRecorder() *closeNotifyingRecorder {
|
||||
return &closeNotifyingRecorder{
|
||||
httptest.NewRecorder(),
|
||||
make(chan bool, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *closeNotifyingRecorder) close() {
|
||||
c.closed <- true
|
||||
}
|
||||
|
||||
func (c *closeNotifyingRecorder) CloseNotify() <-chan bool {
|
||||
return c.closed
|
||||
}
|
||||
|
||||
type proxyTest struct {
|
||||
method string
|
||||
localPath string
|
||||
upstreamURI string
|
||||
code int
|
||||
response string
|
||||
}
|
||||
|
||||
var proxyTests = []proxyTest{
|
||||
// valid alertmanager and methods
|
||||
{
|
||||
method: "POST",
|
||||
localPath: "/proxy/alertmanager/dummy/api/v1/silences",
|
||||
upstreamURI: "http://localhost:9093/api/v1/silences",
|
||||
code: 200,
|
||||
response: "{\"status\":\"success\",\"data\":{\"silenceId\":\"d8a61ca8-ee2e-4076-999f-276f1e986bf3\"}}",
|
||||
},
|
||||
{
|
||||
method: "DELETE",
|
||||
localPath: "/proxy/alertmanager/dummy/api/v1/silence/d8a61ca8-ee2e-4076-999f-276f1e986bf3",
|
||||
upstreamURI: "http://localhost:9093/api/v1/silence/d8a61ca8-ee2e-4076-999f-276f1e986bf3",
|
||||
code: 200,
|
||||
response: "{\"status\":\"success\"}",
|
||||
},
|
||||
// invalid alertmanager name
|
||||
{
|
||||
method: "POST",
|
||||
localPath: "/proxy/alertmanager/INVALID/api/v1/silences",
|
||||
upstreamURI: "",
|
||||
code: 404,
|
||||
response: "404 page not found",
|
||||
},
|
||||
{
|
||||
method: "DELETE",
|
||||
localPath: "/proxy/alertmanager/INVALID/api/v1/silence/d8a61ca8-ee2e-4076-999f-276f1e986bf3",
|
||||
upstreamURI: "http://localhost:9093/api/v1/silence/d8a61ca8-ee2e-4076-999f-276f1e986bf3",
|
||||
code: 404,
|
||||
response: "404 page not found",
|
||||
},
|
||||
// valid alertmanager name, but invalid method
|
||||
{
|
||||
method: "GET",
|
||||
localPath: "/proxy/alertmanager/dummy/api/v1/silences",
|
||||
upstreamURI: "",
|
||||
code: 404,
|
||||
response: "404 page not found",
|
||||
},
|
||||
{
|
||||
method: "GET",
|
||||
localPath: "/proxy/alertmanager/dummy/api/v1/silence/d8a61ca8-ee2e-4076-999f-276f1e986bf3",
|
||||
upstreamURI: "http://localhost:9093/api/v1/silence/d8a61ca8-ee2e-4076-999f-276f1e986bf3",
|
||||
code: 404,
|
||||
response: "404 page not found",
|
||||
},
|
||||
}
|
||||
|
||||
func TestProxy(t *testing.T) {
|
||||
r := ginTestEngine()
|
||||
am := alertmanager.NewAlertmanager(
|
||||
"dummy",
|
||||
"http://localhost:9093",
|
||||
alertmanager.WithRequestTimeout(time.Second*5),
|
||||
alertmanager.WithProxy(true),
|
||||
)
|
||||
setupRouterProxyHandlers(r, am)
|
||||
|
||||
httpmock.Activate()
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
for _, testCase := range proxyTests {
|
||||
httpmock.Reset()
|
||||
if testCase.upstreamURI != "" {
|
||||
httpmock.RegisterResponder(testCase.method, testCase.upstreamURI, httpmock.NewStringResponder(testCase.code, testCase.response))
|
||||
}
|
||||
req, _ := http.NewRequest(testCase.method, testCase.localPath, nil)
|
||||
resp := newCloseNotifyingRecorder()
|
||||
r.ServeHTTP(resp, req)
|
||||
if resp.Code != testCase.code {
|
||||
t.Errorf("%s %s proxied to %s returned status %d while %d was expected",
|
||||
testCase.method, testCase.localPath, testCase.upstreamURI, resp.Code, testCase.code)
|
||||
}
|
||||
body := resp.Body.String()
|
||||
if body != testCase.response {
|
||||
t.Errorf("%s %s proxied to %s returned content '%s' while '%s' was expected",
|
||||
testCase.method, testCase.localPath, testCase.upstreamURI, body, testCase.response)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user