Files
flagger/pkg/loadtester/concord.go
2021-07-31 08:25:46 +09:00

283 lines
8.2 KiB
Go

/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package loadtester
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"mime/multipart"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"go.uber.org/zap"
)
// TaskTypeConcord represents the concord type as string
const TaskTypeConcord = "concord"
// default process values
const concordBasePath = "/api/v1/process"
const defaultPollInterval = 5
const defaultPollTimeout = 30
// concord statuses
const concordStatusSuccess = "FINISHED"
const concordStatusFailed = "FAILED"
const ARGUMENTS_INPUT_PREFIX = "arguments."
// ConcordTask represents a concord task
type ConcordTask struct {
TaskBase
Command string
Org string
Project string
Repo string
Entrypoint string
Arguments map[string]string
APIKeyPath string
Endpoint string
PollInterval time.Duration
PollTimeout time.Duration
BaseURL *url.URL
httpClient *http.Client
}
// NewConcordTask instantiates a new Concord Task
func NewConcordTask(metadata map[string]string, canary string, logger *zap.SugaredLogger) (*ConcordTask, error) {
var pollIntervalInt, pollTimeoutInt int
var arguments = make(map[string]string)
if _, found := metadata["server"]; !found {
return nil, errors.New("`server` is required with type concord")
}
pURL, err := url.Parse(metadata["server"])
if err != nil {
return nil, errors.New("failed to create base URL from metadata `concord_url`")
}
if _, found := metadata["org"]; !found {
return nil, errors.New("`org` is required with type concord")
}
if _, found := metadata["project"]; !found {
return nil, errors.New("`project` is required with type concord")
}
if _, found := metadata["repo"]; found == false {
return nil, errors.New("`repo` is required with type concord")
}
if _, found := metadata["entrypoint"]; found == false {
return nil, errors.New("`entrypoint` is required with type concord")
}
if _, found := metadata["apiKeyPath"]; found == false {
return nil, errors.New("`apiKeyPath` is required with type concord")
}
_, err = os.Stat(metadata["apiKeyPath"])
if os.IsNotExist(err) {
return nil, fmt.Errorf("`apiKeyPath` file doesn't exist %s", metadata["apiKeyPath"])
}
if _, found := metadata["endpoint"]; found == false {
return nil, errors.New("`endpoint` is required with type concord")
}
if _, found := metadata["pollInterval"]; found == false {
pollIntervalInt = defaultPollInterval
} else {
pollIntervalInt, err = strconv.Atoi(metadata["pollInterval"])
if err != nil {
return nil, errors.New("unable to convert `pollInterval` to int")
}
}
if _, found := metadata["pollTimeout"]; found == false {
pollTimeoutInt = defaultPollTimeout
} else {
pollTimeoutInt, err = strconv.Atoi(metadata["pollTimeout"])
if err != nil {
return nil, errors.New("unable to convert `pollTimeout` to int")
}
}
for key, value := range metadata {
if key == "arguments.endpoint" {
return nil, errors.New("You cannot override Endpoint through arguments. You must override Endpoint directly")
}
if strings.HasPrefix(key, ARGUMENTS_INPUT_PREFIX) {
arguments[key[len(ARGUMENTS_INPUT_PREFIX):]] = value
}
}
return &ConcordTask{
TaskBase: TaskBase{
logger: logger,
},
BaseURL: pURL,
Org: metadata["org"],
Project: metadata["project"],
Repo: metadata["repo"],
Entrypoint: metadata["entrypoint"],
APIKeyPath: metadata["apiKeyPath"],
Endpoint: metadata["endpoint"],
Arguments: arguments,
PollInterval: time.Duration(pollIntervalInt) * time.Second,
PollTimeout: time.Duration(pollTimeoutInt) * time.Second,
httpClient: &http.Client{Timeout: 60 * time.Second},
}, nil
}
func (task *ConcordTask) Hash() string {
return hash(task.canary + task.Org + task.Project + task.Repo + task.Entrypoint)
}
func (task *ConcordTask) String() string {
return fmt.Sprintf("%s %s %s %s", task.Org, task.Project, task.Repo, task.Entrypoint)
}
func (task *ConcordTask) Run(ctx context.Context) (*TaskRunResult, error) {
instance, err := task.startProcess()
if err != nil {
task.logger.Errorf("failed to start process: %s", err.Error())
return &TaskRunResult{false, nil}, err
}
ok, err := task.checkStatus(ctx, instance, task.PollInterval)
return &TaskRunResult{ok, nil}, err
}
type concordProcess struct {
InstanceID string `json:"instanceId,omitempty"`
ParentInstanceID string `json:"parentInstanceID,omitempty"`
ProjectName string `json:"projectName,omitempty"`
CreatedAt string `json:"createdAt,omitempty"`
Initiator string `json:"initiator,omitempty"`
LastUpdatedAt string `json:"lastUpdatedAt,omitempty"`
Status string `json:"status,omitempty"`
ChildrenIds []string `json:"childrenIds,omitempty"`
OK bool `json:"ok,omitempty"`
}
func (task *ConcordTask) newRequest(method, path string, contentType string, body io.Reader) (*http.Request, error) {
rel := &url.URL{Path: path}
u := task.BaseURL.ResolveReference(rel)
req, err := http.NewRequest(method, u.String(), body)
if err != nil {
return nil, err
}
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
apiKey := ""
dat, err := os.ReadFile(task.APIKeyPath)
if err != nil {
return req, err
}
apiKey = string(dat)
if apiKey != "" {
req.Header.Set("Authorization", apiKey)
}
return req, nil
}
func (task *ConcordTask) do(req *http.Request, v interface{}) (*http.Response, error) {
task.logger.With("canary", task.canary).Infof("calling endpoint %s", req.URL.String())
resp, err := task.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
err = json.NewDecoder(resp.Body).Decode(v)
return resp, err
}
func (task *ConcordTask) buildFields(w *multipart.Writer) {
_ = w.WriteField("org", task.Org)
_ = w.WriteField("project", task.Project)
_ = w.WriteField("repo", task.Repo)
_ = w.WriteField("entryPoint", task.Entrypoint)
_ = w.WriteField("arguments.endpoint", task.Endpoint)
for key, value := range task.Arguments {
_ = w.WriteField(fmt.Sprintf("arguments.%s", key), value)
}
}
func (task *ConcordTask) startProcess() (string, error) {
requestBody := new(bytes.Buffer)
writer := multipart.NewWriter(requestBody)
task.buildFields(writer)
err := writer.Close()
if err != nil {
return "", nil
}
req, err := task.newRequest(http.MethodPost, concordBasePath, writer.FormDataContentType(), requestBody)
if err != nil {
return "", err
}
process := concordProcess{}
_, err = task.do(req, &process)
if err != nil {
return "", err
}
task.logger.With("canary", task.canary).Infof("created process id [%s] OK status is %t", process.InstanceID, process.OK)
return process.InstanceID, nil
}
func (task *ConcordTask) checkStatus(ctx context.Context, instanceID string, interval time.Duration) (bool, error) {
tickChan := time.NewTicker(interval).C
for {
select {
case <-tickChan:
req, err := task.newRequest(http.MethodGet, fmt.Sprintf("%s/%s", concordBasePath, instanceID), "", nil)
if err != nil {
return false, fmt.Errorf("failed to generate request: %s", err)
}
process := concordProcess{}
_, err = task.do(req, &process)
if err != nil {
return false, fmt.Errorf("failed checking status: %s", err)
}
task.logger.With("canary", task.canary).Infof("process id [%s] current status is %s", process.InstanceID, process.Status)
if process.Status == concordStatusSuccess {
return true, nil
}
if process.Status == concordStatusFailed {
return false, fmt.Errorf("concord instanceID: %s failed", instanceID)
}
case <-time.After(task.PollTimeout):
return false, fmt.Errorf("concord process timed out, after %d seconds", int64(task.PollTimeout/time.Second))
case <-ctx.Done():
return false, errors.New("context timedout")
}
}
}