mirror of
https://github.com/kubevela/kubevela.git
synced 2026-05-06 17:37:09 +00:00
* add context when run pipeline Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit72f3ad792e) * Feat: implement pipeline API Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitf560c346cc) * Extract get log logic and implement getPipelineRunLog API Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit060c6ab9e9) * Init and delete pipeline contexts Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit5e96bd3106) * fix panic Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit51072f7947) * Allow not specifying context Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit810ddcf0bd) * change pipeline to path parameter Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit3d51c0cb2d) * Add permission check filter Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit6883767430) * project -> projects in route Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit1f09f3996b) * fix route conflict Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit7eea696830) * Add project alias Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitb07dd72338) * Feat: change the list pipeline API Signed-off-by: barnettZQG <barnett.zqg@gmail.com> (cherry picked from commitbd804734b0) * Feat: filter the project Signed-off-by: barnettZQG <barnett.zqg@gmail.com> (cherry picked from commit82eee2cc11) * Fix: the error of the run APi Signed-off-by: barnettZQG <barnett.zqg@gmail.com> (cherry picked from commitac87bd3f1a) * fix log pipeline run API Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitacde8e981e) * Fix lint, fix the error of log api Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitb8373e6cde) * fix error returning Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit2e9b4792b0) * Fix: change the lable to annotation Signed-off-by: barnettZQG <barnett.zqg@gmail.com> (cherry picked from commitbf08275fde) * remove log config not found error Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitcdd77dfd8f) * fix pipeline list api return no context info Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitcdcfa165d1) * Fix: create the namespace Signed-off-by: barnettZQG <barnett.zqg@gmail.com> (cherry picked from commitb6888dd87d) * get pipeline lastrun info Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commita943423d22) * allow query single step output Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commite2310bbf34) * organize code in api layer Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit6fd53ed078) * fix project filter, add context value when get pp run, extend lastRun Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit861f69d555) * fix get output and implement get input api Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit61495ee70d) * Fix: change the last run Signed-off-by: barnettZQG <barnett.zqg@gmail.com> (cherry picked from commitaeb842a45e) * if query sub-step outout, return it directly Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit467ba25751) * Fix: change the run stats Signed-off-by: barnettZQG <barnett.zqg@gmail.com> (cherry picked from commit7a90e7e310) * Fix: change the output Signed-off-by: barnettZQG <barnett.zqg@gmail.com> (cherry picked from commit595a871b0d) * flatten the input/output api Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitefc9692354) * more info for i/o vars Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit7fe0e1109c) * fix nested i/o struct Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit201d1228bd) * add fromStep in input api Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit2400018962) * add e2e test skeleton Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitf20f9a1ac6) * add more e2e test Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit98b27f886b) * use db to store pipeline Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit08962c4f2f) * keep the last 5k lines of log Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit22b352da14) * use stern param to keep last lines of logs Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit3eadbf91c8) * filter, nil labels, spec check Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitbad90b3f7a) * empty res, index, detail param Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitb12d889e97) * Add e2e test Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitae617a928e) * fix e2e test and unit test Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit0f4e030b84) * add context e2e test Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit88879e6e43) * goimports Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit58429978b3) * add more test Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit75760521a4) * review Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitcc4c706466) * remove optional tag in returned value, unify the imports name Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit658b184aef) * fix e2e test Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commita9e9c96856) * add stop test Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit58aa2e5125) * more coverage Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit01ecb51323) * single case selct Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitd9e8fd0342) * optimize log color Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitbe9840c3cb) * add default permission and role Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commitcf074444ac) * fix permission ut Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit1bdcad63a2) * change the log api implementation Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit587f745430) * add color, add container order Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit6e7f187605) * lint Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit64ba029031) * fix filter nil will cut all log Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit799dfe377a) * longer timeout and lint Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> (cherry picked from commit89873f1f66) Co-authored-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com> Co-authored-by: barnettZQG <barnett.zqg@gmail.com>
336 lines
10 KiB
Go
336 lines
10 KiB
Go
/*
|
|
Copyright 2021 The KubeVela Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package utils
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"regexp"
|
|
"strings"
|
|
"text/template"
|
|
"time"
|
|
|
|
"github.com/fatih/color"
|
|
"github.com/pkg/errors"
|
|
"github.com/wercker/stern/stern"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/selection"
|
|
"k8s.io/client-go/kubernetes"
|
|
|
|
querytypes "github.com/oam-dev/kubevela/pkg/velaql/providers/query/types"
|
|
|
|
authv1 "k8s.io/api/authentication/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
k8stypes "k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/rest"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
|
|
|
"github.com/oam-dev/kubevela/pkg/oam/util"
|
|
velaerr "github.com/oam-dev/kubevela/pkg/utils/errors"
|
|
)
|
|
|
|
// MutateOption defines the function pattern for mutate
|
|
type MutateOption func(object metav1.Object) error
|
|
|
|
// MergeOverrideLabels will merge the existing labels and override by the labels passed in
|
|
func MergeOverrideLabels(labels map[string]string) MutateOption {
|
|
return func(object metav1.Object) error {
|
|
util.AddLabels(object, labels)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// MergeOverrideAnnotations will merge the existing annotations and override by the annotations passed in
|
|
func MergeOverrideAnnotations(annotations map[string]string) MutateOption {
|
|
return func(object metav1.Object) error {
|
|
util.AddAnnotations(object, annotations)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// MergeNoConflictLabels will merge the existing labels with the labels passed in, it will report conflicts if exists
|
|
func MergeNoConflictLabels(labels map[string]string) MutateOption {
|
|
return func(object metav1.Object) error {
|
|
existingLabels := object.GetLabels()
|
|
// check and fill the labels
|
|
for k, v := range labels {
|
|
ev, ok := existingLabels[k]
|
|
if ok && ev != "" && ev != v {
|
|
return fmt.Errorf("%s for object %s, key: %s, conflicts value: %s <-> %s", velaerr.LabelConflict, object.GetName(), k, ev, v)
|
|
}
|
|
existingLabels[k] = v
|
|
}
|
|
object.SetLabels(existingLabels)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// CreateOrUpdateNamespace will create a namespace if not exist, it will also update a namespace if exists
|
|
// It will report an error if the labels conflict while it will override the annotations
|
|
func CreateOrUpdateNamespace(ctx context.Context, kubeClient client.Client, name string, options ...MutateOption) error {
|
|
err := CreateNamespace(ctx, kubeClient, name, options...)
|
|
// only if namespace don't have the env label that we need to update it
|
|
if apierrors.IsAlreadyExists(err) {
|
|
return UpdateNamespace(ctx, kubeClient, name, options...)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// CreateNamespace will create a namespace with mutate option
|
|
func CreateNamespace(ctx context.Context, kubeClient client.Client, name string, options ...MutateOption) error {
|
|
obj := &corev1.Namespace{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: name,
|
|
},
|
|
Spec: corev1.NamespaceSpec{},
|
|
}
|
|
for _, op := range options {
|
|
if err := op(obj); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return kubeClient.Create(ctx, obj)
|
|
}
|
|
|
|
// GetNamespace will return a namespace with mutate option
|
|
func GetNamespace(ctx context.Context, kubeClient client.Client, name string) (*corev1.Namespace, error) {
|
|
obj := &corev1.Namespace{}
|
|
err := kubeClient.Get(ctx, client.ObjectKey{Name: name}, obj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return obj, nil
|
|
}
|
|
|
|
// UpdateNamespace will update a namespace with mutate option
|
|
func UpdateNamespace(ctx context.Context, kubeClient client.Client, name string, options ...MutateOption) error {
|
|
var namespace corev1.Namespace
|
|
err := kubeClient.Get(ctx, k8stypes.NamespacedName{Name: name}, &namespace)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, op := range options {
|
|
if err = op(&namespace); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return kubeClient.Update(ctx, &namespace)
|
|
}
|
|
|
|
// GetServiceAccountSubjectFromConfig extract ServiceAccountName subject from token
|
|
func GetServiceAccountSubjectFromConfig(cfg *rest.Config) string {
|
|
sub, _ := GetTokenSubject(cfg.BearerToken)
|
|
return sub
|
|
}
|
|
|
|
// GetCertificateCommonNameAndOrganizationsFromConfig extract CommonName and Organizations from Certificate
|
|
func GetCertificateCommonNameAndOrganizationsFromConfig(cfg *rest.Config) (string, []string) {
|
|
cert := cfg.CertData
|
|
if len(cert) == 0 && cfg.CertFile != "" {
|
|
cert, _ = os.ReadFile(cfg.CertFile)
|
|
}
|
|
name, _ := GetCertificateSubject(cert)
|
|
if name == nil {
|
|
return "", nil
|
|
}
|
|
return name.CommonName, name.Organization
|
|
}
|
|
|
|
// GetUserInfoFromConfig extract UserInfo from KubeConfig
|
|
func GetUserInfoFromConfig(cfg *rest.Config) *authv1.UserInfo {
|
|
if sub := GetServiceAccountSubjectFromConfig(cfg); sub != "" {
|
|
return &authv1.UserInfo{Username: sub}
|
|
}
|
|
if cn, orgs := GetCertificateCommonNameAndOrganizationsFromConfig(cfg); cn != "" {
|
|
return &authv1.UserInfo{Username: cn, Groups: orgs}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AutoSetSelfImpersonationInConfig set impersonate username and group to the identity in the original rest config
|
|
func AutoSetSelfImpersonationInConfig(cfg *rest.Config) {
|
|
if userInfo := GetUserInfoFromConfig(cfg); userInfo != nil {
|
|
cfg.Impersonate.UserName = userInfo.Username
|
|
cfg.Impersonate.Groups = append(cfg.Impersonate.Groups, userInfo.Groups...)
|
|
}
|
|
}
|
|
|
|
// CreateOrUpdate create or update a kubernetes object
|
|
func CreateOrUpdate(ctx context.Context, cli client.Client, obj client.Object) (controllerutil.OperationResult, error) {
|
|
bs, err := json.Marshal(obj)
|
|
if err != nil {
|
|
return controllerutil.OperationResultNone, err
|
|
}
|
|
return controllerutil.CreateOrUpdate(ctx, cli, obj, func() error {
|
|
createTimestamp := obj.GetCreationTimestamp()
|
|
resourceVersion := obj.GetResourceVersion()
|
|
deletionTimestamp := obj.GetDeletionTimestamp()
|
|
generation := obj.GetGeneration()
|
|
managedFields := obj.GetManagedFields()
|
|
if e := json.Unmarshal(bs, obj); err != nil {
|
|
return e
|
|
}
|
|
obj.SetCreationTimestamp(createTimestamp)
|
|
obj.SetResourceVersion(resourceVersion)
|
|
obj.SetDeletionTimestamp(deletionTimestamp)
|
|
obj.SetGeneration(generation)
|
|
obj.SetManagedFields(managedFields)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// EscapeResourceNameToLabelValue parse characters in resource name to label valid name
|
|
func EscapeResourceNameToLabelValue(resourceName string) string {
|
|
return strings.ReplaceAll(resourceName, ":", "_")
|
|
}
|
|
|
|
// IsClusterScope check if the gvk is cluster scoped
|
|
func IsClusterScope(gvk schema.GroupVersionKind, mapper meta.RESTMapper) (bool, error) {
|
|
mappings, err := mapper.RESTMappings(gvk.GroupKind(), gvk.Version)
|
|
isClusterScope := len(mappings) > 0 && mappings[0].Scope.Name() == meta.RESTScopeNameRoot
|
|
return isClusterScope, err
|
|
}
|
|
|
|
// GetPodsLogs get logs from pods
|
|
func GetPodsLogs(ctx context.Context, config *rest.Config, containerName string, selectPods []*querytypes.PodBase, tmpl string, logC chan<- string, tailLines *int64) error {
|
|
if err := verifyPods(selectPods); err != nil {
|
|
return err
|
|
}
|
|
podRegex := getPodRegex(selectPods)
|
|
pods, err := regexp.Compile(podRegex)
|
|
if err != nil {
|
|
return fmt.Errorf("fail to compile '%s' for logs query", podRegex)
|
|
}
|
|
container := regexp.MustCompile(".*")
|
|
if containerName != "" {
|
|
container = regexp.MustCompile(containerName + ".*")
|
|
}
|
|
// These pods are from the same namespace, so we can use the first one to get the namespace
|
|
namespace := selectPods[0].Metadata.Namespace
|
|
selector := labels.NewSelector()
|
|
// Only use the labels to select pod if query one pod's log. It is only used when query vela-core log
|
|
if len(selectPods) == 1 {
|
|
for k, v := range selectPods[0].Metadata.Labels {
|
|
req, _ := labels.NewRequirement(k, selection.Equals, []string{v})
|
|
if req != nil {
|
|
selector = selector.Add(*req)
|
|
}
|
|
}
|
|
}
|
|
clientSet, err := kubernetes.NewForConfig(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
added, removed, err := stern.Watch(ctx,
|
|
clientSet.CoreV1().Pods(namespace),
|
|
pods,
|
|
container,
|
|
nil,
|
|
[]stern.ContainerState{stern.RUNNING, stern.TERMINATED},
|
|
selector,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tails := make(map[string]*stern.Tail)
|
|
|
|
funs := map[string]interface{}{
|
|
"json": func(in interface{}) (string, error) {
|
|
b, err := json.Marshal(in)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return string(b), nil
|
|
},
|
|
"color": func(color color.Color, text string) string {
|
|
return color.SprintFunc()(text)
|
|
},
|
|
}
|
|
template, err := template.New("log").Funcs(funs).Parse(tmpl)
|
|
if err != nil {
|
|
return errors.Wrap(err, "unable to parse template")
|
|
}
|
|
|
|
go func() {
|
|
for p := range added {
|
|
id := p.GetID()
|
|
if tails[id] != nil {
|
|
continue
|
|
}
|
|
// 48h
|
|
dur, _ := time.ParseDuration("48h")
|
|
tail := stern.NewTail(p.Namespace, p.Pod, p.Container, template, &stern.TailOptions{
|
|
Timestamps: true,
|
|
SinceSeconds: int64(dur.Seconds()),
|
|
Exclude: nil,
|
|
Include: nil,
|
|
Namespace: false,
|
|
TailLines: tailLines, // default for all logs
|
|
})
|
|
tails[id] = tail
|
|
|
|
tail.Start(ctx, clientSet.CoreV1().Pods(p.Namespace), logC)
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for p := range removed {
|
|
id := p.GetID()
|
|
if tails[id] == nil {
|
|
continue
|
|
}
|
|
tails[id].Close()
|
|
delete(tails, id)
|
|
}
|
|
}()
|
|
|
|
<-ctx.Done()
|
|
close(logC)
|
|
return nil
|
|
}
|
|
|
|
func getPodRegex(pods []*querytypes.PodBase) string {
|
|
var podNames []string
|
|
for _, pod := range pods {
|
|
podNames = append(podNames, fmt.Sprintf("(%s.*)", pod.Metadata.Name))
|
|
}
|
|
return strings.Join(podNames, "|")
|
|
}
|
|
|
|
func verifyPods(pods []*querytypes.PodBase) error {
|
|
if len(pods) == 0 {
|
|
return errors.New("no pods selected")
|
|
}
|
|
if len(pods) == 1 {
|
|
return nil
|
|
}
|
|
namespace := pods[0].Metadata.Namespace
|
|
for _, pod := range pods {
|
|
if pod.Metadata.Namespace != namespace {
|
|
return errors.New("cannot select pods from different namespaces")
|
|
}
|
|
}
|
|
return nil
|
|
}
|