mirror of
https://github.com/fluxcd/flagger.git
synced 2026-02-14 18:10:00 +00:00
283 lines
8.2 KiB
Go
283 lines
8.2 KiB
Go
/*
|
|
Copyright 2020 The Flux authors
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package loadtester
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// TaskTypeConcord represents the concord type as string
|
|
const TaskTypeConcord = "concord"
|
|
|
|
// default process values
|
|
const concordBasePath = "/api/v1/process"
|
|
const defaultPollInterval = 5
|
|
const defaultPollTimeout = 30
|
|
|
|
// concord statuses
|
|
const concordStatusSuccess = "FINISHED"
|
|
const concordStatusFailed = "FAILED"
|
|
|
|
const ARGUMENTS_INPUT_PREFIX = "arguments."
|
|
|
|
// ConcordTask represents a concord task
|
|
type ConcordTask struct {
|
|
TaskBase
|
|
Command string
|
|
Org string
|
|
Project string
|
|
Repo string
|
|
Entrypoint string
|
|
Arguments map[string]string
|
|
APIKeyPath string
|
|
Endpoint string
|
|
PollInterval time.Duration
|
|
PollTimeout time.Duration
|
|
BaseURL *url.URL
|
|
httpClient *http.Client
|
|
}
|
|
|
|
// NewConcordTask instantiates a new Concord Task
|
|
func NewConcordTask(metadata map[string]string, canary string, logger *zap.SugaredLogger) (*ConcordTask, error) {
|
|
var pollIntervalInt, pollTimeoutInt int
|
|
var arguments = make(map[string]string)
|
|
|
|
if _, found := metadata["server"]; !found {
|
|
return nil, errors.New("`server` is required with type concord")
|
|
}
|
|
pURL, err := url.Parse(metadata["server"])
|
|
if err != nil {
|
|
return nil, errors.New("failed to create base URL from metadata `concord_url`")
|
|
}
|
|
if _, found := metadata["org"]; !found {
|
|
return nil, errors.New("`org` is required with type concord")
|
|
}
|
|
if _, found := metadata["project"]; !found {
|
|
return nil, errors.New("`project` is required with type concord")
|
|
}
|
|
if _, found := metadata["repo"]; found == false {
|
|
return nil, errors.New("`repo` is required with type concord")
|
|
}
|
|
if _, found := metadata["entrypoint"]; found == false {
|
|
return nil, errors.New("`entrypoint` is required with type concord")
|
|
}
|
|
if _, found := metadata["apiKeyPath"]; found == false {
|
|
return nil, errors.New("`apiKeyPath` is required with type concord")
|
|
}
|
|
_, err = os.Stat(metadata["apiKeyPath"])
|
|
if os.IsNotExist(err) {
|
|
return nil, fmt.Errorf("`apiKeyPath` file doesn't exist %s", metadata["apiKeyPath"])
|
|
}
|
|
if _, found := metadata["endpoint"]; found == false {
|
|
return nil, errors.New("`endpoint` is required with type concord")
|
|
}
|
|
if _, found := metadata["pollInterval"]; found == false {
|
|
pollIntervalInt = defaultPollInterval
|
|
} else {
|
|
pollIntervalInt, err = strconv.Atoi(metadata["pollInterval"])
|
|
if err != nil {
|
|
return nil, errors.New("unable to convert `pollInterval` to int")
|
|
}
|
|
}
|
|
if _, found := metadata["pollTimeout"]; found == false {
|
|
pollTimeoutInt = defaultPollTimeout
|
|
} else {
|
|
pollTimeoutInt, err = strconv.Atoi(metadata["pollTimeout"])
|
|
if err != nil {
|
|
return nil, errors.New("unable to convert `pollTimeout` to int")
|
|
}
|
|
}
|
|
|
|
for key, value := range metadata {
|
|
if key == "arguments.endpoint" {
|
|
return nil, errors.New("You cannot override Endpoint through arguments. You must override Endpoint directly")
|
|
}
|
|
|
|
if strings.HasPrefix(key, ARGUMENTS_INPUT_PREFIX) {
|
|
arguments[key[len(ARGUMENTS_INPUT_PREFIX):]] = value
|
|
}
|
|
}
|
|
|
|
return &ConcordTask{
|
|
TaskBase: TaskBase{
|
|
logger: logger,
|
|
},
|
|
BaseURL: pURL,
|
|
Org: metadata["org"],
|
|
Project: metadata["project"],
|
|
Repo: metadata["repo"],
|
|
Entrypoint: metadata["entrypoint"],
|
|
APIKeyPath: metadata["apiKeyPath"],
|
|
Endpoint: metadata["endpoint"],
|
|
Arguments: arguments,
|
|
PollInterval: time.Duration(pollIntervalInt) * time.Second,
|
|
PollTimeout: time.Duration(pollTimeoutInt) * time.Second,
|
|
httpClient: &http.Client{Timeout: 60 * time.Second},
|
|
}, nil
|
|
}
|
|
|
|
func (task *ConcordTask) Hash() string {
|
|
return hash(task.canary + task.Org + task.Project + task.Repo + task.Entrypoint)
|
|
}
|
|
|
|
func (task *ConcordTask) String() string {
|
|
return fmt.Sprintf("%s %s %s %s", task.Org, task.Project, task.Repo, task.Entrypoint)
|
|
}
|
|
|
|
func (task *ConcordTask) Run(ctx context.Context) (*TaskRunResult, error) {
|
|
instance, err := task.startProcess()
|
|
if err != nil {
|
|
task.logger.Errorf("failed to start process: %s", err.Error())
|
|
return &TaskRunResult{false, nil}, err
|
|
}
|
|
|
|
ok, err := task.checkStatus(ctx, instance, task.PollInterval)
|
|
return &TaskRunResult{ok, nil}, err
|
|
}
|
|
|
|
type concordProcess struct {
|
|
InstanceID string `json:"instanceId,omitempty"`
|
|
ParentInstanceID string `json:"parentInstanceID,omitempty"`
|
|
ProjectName string `json:"projectName,omitempty"`
|
|
CreatedAt string `json:"createdAt,omitempty"`
|
|
Initiator string `json:"initiator,omitempty"`
|
|
LastUpdatedAt string `json:"lastUpdatedAt,omitempty"`
|
|
Status string `json:"status,omitempty"`
|
|
ChildrenIds []string `json:"childrenIds,omitempty"`
|
|
OK bool `json:"ok,omitempty"`
|
|
}
|
|
|
|
func (task *ConcordTask) newRequest(method, path string, contentType string, body io.Reader) (*http.Request, error) {
|
|
rel := &url.URL{Path: path}
|
|
u := task.BaseURL.ResolveReference(rel)
|
|
req, err := http.NewRequest(method, u.String(), body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if contentType != "" {
|
|
req.Header.Set("Content-Type", contentType)
|
|
}
|
|
|
|
apiKey := ""
|
|
dat, err := os.ReadFile(task.APIKeyPath)
|
|
if err != nil {
|
|
return req, err
|
|
}
|
|
apiKey = string(dat)
|
|
|
|
if apiKey != "" {
|
|
req.Header.Set("Authorization", apiKey)
|
|
}
|
|
|
|
return req, nil
|
|
}
|
|
|
|
func (task *ConcordTask) do(req *http.Request, v interface{}) (*http.Response, error) {
|
|
task.logger.With("canary", task.canary).Infof("calling endpoint %s", req.URL.String())
|
|
resp, err := task.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
err = json.NewDecoder(resp.Body).Decode(v)
|
|
return resp, err
|
|
}
|
|
|
|
func (task *ConcordTask) buildFields(w *multipart.Writer) {
|
|
_ = w.WriteField("org", task.Org)
|
|
_ = w.WriteField("project", task.Project)
|
|
_ = w.WriteField("repo", task.Repo)
|
|
_ = w.WriteField("entryPoint", task.Entrypoint)
|
|
_ = w.WriteField("arguments.endpoint", task.Endpoint)
|
|
|
|
for key, value := range task.Arguments {
|
|
_ = w.WriteField(fmt.Sprintf("arguments.%s", key), value)
|
|
}
|
|
}
|
|
|
|
func (task *ConcordTask) startProcess() (string, error) {
|
|
requestBody := new(bytes.Buffer)
|
|
writer := multipart.NewWriter(requestBody)
|
|
task.buildFields(writer)
|
|
|
|
err := writer.Close()
|
|
if err != nil {
|
|
return "", nil
|
|
}
|
|
|
|
req, err := task.newRequest(http.MethodPost, concordBasePath, writer.FormDataContentType(), requestBody)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
process := concordProcess{}
|
|
_, err = task.do(req, &process)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
task.logger.With("canary", task.canary).Infof("created process id [%s] OK status is %t", process.InstanceID, process.OK)
|
|
return process.InstanceID, nil
|
|
}
|
|
|
|
func (task *ConcordTask) checkStatus(ctx context.Context, instanceID string, interval time.Duration) (bool, error) {
|
|
tickChan := time.NewTicker(interval).C
|
|
for {
|
|
select {
|
|
case <-tickChan:
|
|
req, err := task.newRequest(http.MethodGet, fmt.Sprintf("%s/%s", concordBasePath, instanceID), "", nil)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to generate request: %s", err)
|
|
}
|
|
|
|
process := concordProcess{}
|
|
_, err = task.do(req, &process)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed checking status: %s", err)
|
|
}
|
|
task.logger.With("canary", task.canary).Infof("process id [%s] current status is %s", process.InstanceID, process.Status)
|
|
|
|
if process.Status == concordStatusSuccess {
|
|
return true, nil
|
|
}
|
|
if process.Status == concordStatusFailed {
|
|
return false, fmt.Errorf("concord instanceID: %s failed", instanceID)
|
|
}
|
|
case <-time.After(task.PollTimeout):
|
|
return false, fmt.Errorf("concord process timed out, after %d seconds", int64(task.PollTimeout/time.Second))
|
|
case <-ctx.Done():
|
|
return false, errors.New("context timedout")
|
|
}
|
|
|
|
}
|
|
}
|