Files
flagger/pkg/loadtester/server.go
stefanprodan 3c11749f80 loadtester: release v0.16.0
- update helm to 2.16.5
- update helmv3 to 3.1.2
- remove http server timeouts
2020-03-31 12:48:16 +03:00

390 lines
10 KiB
Go

package loadtester
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
"go.uber.org/zap"
)
// ListenAndServe starts a web server and waits for SIGTERM
func ListenAndServe(port string, timeout time.Duration, logger *zap.SugaredLogger, taskRunner *TaskRunner, gate *GateStorage, stopCh <-chan struct{}) {
mux := http.DefaultServeMux
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/healthz", HandleHealthz)
mux.HandleFunc("/gate/approve", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
mux.HandleFunc("/gate/halt", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte("Forbidden"))
})
mux.HandleFunc("/gate/check", func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error("reading the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()
canary := &flaggerv1.CanaryWebhookPayload{}
err = json.Unmarshal(body, canary)
if err != nil {
logger.Error("decoding the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
canaryName := fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)
approved := gate.isOpen(canaryName)
if approved {
w.WriteHeader(http.StatusOK)
w.Write([]byte("Approved"))
} else {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte("Forbidden"))
}
logger.Infof("%s gate check: approved %v", canaryName, approved)
})
mux.HandleFunc("/gate/open", func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error("reading the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()
canary := &flaggerv1.CanaryWebhookPayload{}
err = json.Unmarshal(body, canary)
if err != nil {
logger.Error("decoding the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
canaryName := fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)
gate.open(canaryName)
w.WriteHeader(http.StatusAccepted)
logger.Infof("%s gate opened", canaryName)
})
mux.HandleFunc("/gate/close", func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error("reading the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()
canary := &flaggerv1.CanaryWebhookPayload{}
err = json.Unmarshal(body, canary)
if err != nil {
logger.Error("decoding the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
canaryName := fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)
gate.close(canaryName)
w.WriteHeader(http.StatusAccepted)
logger.Infof("%s gate closed", canaryName)
})
mux.HandleFunc("/rollback/check", func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error("reading the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()
canary := &flaggerv1.CanaryWebhookPayload{}
err = json.Unmarshal(body, canary)
if err != nil {
logger.Error("decoding the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
canaryName := fmt.Sprintf("rollback.%s.%s", canary.Name, canary.Namespace)
approved := gate.isOpen(canaryName)
if approved {
w.WriteHeader(http.StatusOK)
w.Write([]byte("Approved"))
} else {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte("Forbidden"))
}
logger.Infof("%s rollback check: approved %v", canaryName, approved)
})
mux.HandleFunc("/rollback/open", func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error("reading the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()
canary := &flaggerv1.CanaryWebhookPayload{}
err = json.Unmarshal(body, canary)
if err != nil {
logger.Error("decoding the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
canaryName := fmt.Sprintf("rollback.%s.%s", canary.Name, canary.Namespace)
gate.open(canaryName)
w.WriteHeader(http.StatusAccepted)
logger.Infof("%s rollback opened", canaryName)
})
mux.HandleFunc("/rollback/close", func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error("reading the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()
canary := &flaggerv1.CanaryWebhookPayload{}
err = json.Unmarshal(body, canary)
if err != nil {
logger.Error("decoding the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
canaryName := fmt.Sprintf("rollback.%s.%s", canary.Name, canary.Namespace)
gate.close(canaryName)
w.WriteHeader(http.StatusAccepted)
logger.Infof("%s rollback closed", canaryName)
})
mux.HandleFunc("/", HandleNewTask(logger, taskRunner))
srv := &http.Server{
Addr: ":" + port,
Handler: mux,
}
// run server in background
go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
logger.Fatalf("HTTP server crashed %v", err)
}
}()
// wait for SIGTERM or SIGINT
<-stopCh
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
logger.Errorf("HTTP server graceful shutdown failed %v", err)
} else {
logger.Info("HTTP server stopped")
}
}
// HandleHealthz handles heath check requests
func HandleHealthz(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}
// HandleNewTask handles task creation requests
func HandleNewTask(logger *zap.SugaredLogger, taskRunner TaskRunnerInterface) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error("reading the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()
payload := &flaggerv1.CanaryWebhookPayload{}
err = json.Unmarshal(body, payload)
if err != nil {
logger.Error("decoding the request body failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
if len(payload.Metadata) > 0 {
metadata := payload.Metadata
var typ, ok = metadata["type"]
if !ok {
typ = TaskTypeShell
}
rtnCmdOutput := false
if rtn, ok := metadata["returnCmdOutput"]; ok {
rtnCmdOutput, err = strconv.ParseBool(rtn)
}
// run bash command (blocking task)
if typ == TaskTypeBash {
logger.With("canary", payload.Name).Infof("bash command %s", payload.Metadata["cmd"])
bashTask := BashTask{
command: payload.Metadata["cmd"],
logCmdOutput: true,
TaskBase: TaskBase{
canary: fmt.Sprintf("%s.%s", payload.Name, payload.Namespace),
logger: logger,
},
}
ctx, cancel := context.WithTimeout(context.Background(), taskRunner.Timeout())
defer cancel()
result, err := bashTask.Run(ctx)
if !result.ok {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
if rtnCmdOutput {
w.Write(result.out)
}
return
}
// run helm command (blocking task)
if typ == TaskTypeHelm {
helm := HelmTask{
command: payload.Metadata["cmd"],
logCmdOutput: true,
TaskBase: TaskBase{
canary: fmt.Sprintf("%s.%s", payload.Name, payload.Namespace),
logger: logger,
},
}
ctx, cancel := context.WithTimeout(context.Background(), taskRunner.Timeout())
defer cancel()
result, err := helm.Run(ctx)
if !result.ok {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
if rtnCmdOutput {
w.Write(result.out)
}
return
}
// run helmv3 command (blocking task)
if typ == TaskTypeHelmv3 {
helm := HelmTaskv3{
command: payload.Metadata["cmd"],
logCmdOutput: true,
TaskBase: TaskBase{
canary: fmt.Sprintf("%s.%s", payload.Name, payload.Namespace),
logger: logger,
},
}
ctx, cancel := context.WithTimeout(context.Background(), taskRunner.Timeout())
defer cancel()
result, err := helm.Run(ctx)
if !result.ok {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
if rtnCmdOutput {
w.Write(result.out)
}
return
}
// run concord job (blocking task)
if typ == TaskTypeConcord {
concord, err := NewConcordTask(payload.Metadata, fmt.Sprintf("%s.%s", payload.Name, payload.Namespace), logger)
if err != nil {
logger.With("canary", payload.Name).Errorf("concord task init error: %s", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
ctx, cancel := context.WithTimeout(context.Background(), taskRunner.Timeout())
defer cancel()
result, err := concord.Run(ctx)
if !result.ok {
if err != nil {
logger.With("canary", payload.Name).Errorf("concord task error: %s", err)
}
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusOK)
if rtnCmdOutput {
w.Write(result.out)
}
return
}
taskFactory, ok := GetTaskFactory(typ)
if !ok {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf("unknown task type %s", typ)))
return
}
canary := fmt.Sprintf("%s.%s", payload.Name, payload.Namespace)
task, err := taskFactory(metadata, canary, logger)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
taskRunner.Add(task)
} else {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("metadata not found in payload"))
return
}
w.WriteHeader(http.StatusAccepted)
}
}