Files
kubevela/pkg/utils/k8s.go
github-actions[bot] 0736e85e07 [Backport release-1.6] Feat: implement pipeline APIs (#4969)
* add context when run pipeline

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 72f3ad792e)

* Feat: implement pipeline API

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit f560c346cc)

* Extract get log logic and implement getPipelineRunLog API

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 060c6ab9e9)

* Init and delete pipeline contexts

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 5e96bd3106)

* fix panic

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 51072f7947)

* Allow not specifying context

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 810ddcf0bd)

* change pipeline to path parameter

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 3d51c0cb2d)

* Add permission check filter

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 6883767430)

* project -> projects in route

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 1f09f3996b)

* fix route conflict

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 7eea696830)

* Add project alias

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit b07dd72338)

* Feat: change the list pipeline API

Signed-off-by: barnettZQG <barnett.zqg@gmail.com>
(cherry picked from commit bd804734b0)

* Feat: filter the project

Signed-off-by: barnettZQG <barnett.zqg@gmail.com>
(cherry picked from commit 82eee2cc11)

* Fix: the error of the run APi

Signed-off-by: barnettZQG <barnett.zqg@gmail.com>
(cherry picked from commit ac87bd3f1a)

* fix log pipeline run API

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit acde8e981e)

* Fix lint, fix the error of log api

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit b8373e6cde)

* fix error returning

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 2e9b4792b0)

* Fix: change the lable to annotation

Signed-off-by: barnettZQG <barnett.zqg@gmail.com>
(cherry picked from commit bf08275fde)

* remove log config not found error

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit cdd77dfd8f)

* fix pipeline list api return no context info

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit cdcfa165d1)

* Fix: create the namespace

Signed-off-by: barnettZQG <barnett.zqg@gmail.com>
(cherry picked from commit b6888dd87d)

* get pipeline lastrun info

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit a943423d22)

* allow query single step output

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit e2310bbf34)

* organize code in api layer

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 6fd53ed078)

* fix project filter, add context value when get pp run, extend lastRun

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 861f69d555)

* fix get output and implement get input api

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 61495ee70d)

* Fix: change the last run

Signed-off-by: barnettZQG <barnett.zqg@gmail.com>
(cherry picked from commit aeb842a45e)

* if query sub-step outout, return it directly

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 467ba25751)

* Fix: change the run stats

Signed-off-by: barnettZQG <barnett.zqg@gmail.com>
(cherry picked from commit 7a90e7e310)

* Fix: change the output

Signed-off-by: barnettZQG <barnett.zqg@gmail.com>
(cherry picked from commit 595a871b0d)

* flatten the input/output api

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit efc9692354)

* more info for i/o vars

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 7fe0e1109c)

* fix nested i/o struct

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 201d1228bd)

* add fromStep in input api

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 2400018962)

* add e2e test skeleton

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit f20f9a1ac6)

* add more e2e test

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 98b27f886b)

* use db to store pipeline

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 08962c4f2f)

* keep the last 5k lines of log

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 22b352da14)

* use stern param to keep last lines of logs

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 3eadbf91c8)

* filter, nil labels, spec check

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit bad90b3f7a)

* empty res, index, detail param

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit b12d889e97)

* Add e2e test

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit ae617a928e)

* fix e2e test and unit test

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 0f4e030b84)

* add context e2e test

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 88879e6e43)

* goimports

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 58429978b3)

* add more test

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 75760521a4)

* review

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit cc4c706466)

* remove optional tag in returned value, unify the imports name

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 658b184aef)

* fix e2e test

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit a9e9c96856)

* add stop test

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 58aa2e5125)

* more coverage

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 01ecb51323)

* single case selct

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit d9e8fd0342)

* optimize log color

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit be9840c3cb)

* add default permission and role

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit cf074444ac)

* fix permission ut

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 1bdcad63a2)

* change the log api implementation

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 587f745430)

* add color, add container order

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 6e7f187605)

* lint

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 64ba029031)

* fix filter nil will cut all log

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 799dfe377a)

* longer timeout and lint

Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
(cherry picked from commit 89873f1f66)

Co-authored-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
Co-authored-by: barnettZQG <barnett.zqg@gmail.com>
2022-11-01 00:13:37 +08:00

336 lines
10 KiB
Go

/*
Copyright 2021 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
"context"
"encoding/json"
"fmt"
"os"
"regexp"
"strings"
"text/template"
"time"
"github.com/fatih/color"
"github.com/pkg/errors"
"github.com/wercker/stern/stern"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/kubernetes"
querytypes "github.com/oam-dev/kubevela/pkg/velaql/providers/query/types"
authv1 "k8s.io/api/authentication/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"github.com/oam-dev/kubevela/pkg/oam/util"
velaerr "github.com/oam-dev/kubevela/pkg/utils/errors"
)
// MutateOption defines the function pattern for mutate
type MutateOption func(object metav1.Object) error
// MergeOverrideLabels will merge the existing labels and override by the labels passed in
func MergeOverrideLabels(labels map[string]string) MutateOption {
return func(object metav1.Object) error {
util.AddLabels(object, labels)
return nil
}
}
// MergeOverrideAnnotations will merge the existing annotations and override by the annotations passed in
func MergeOverrideAnnotations(annotations map[string]string) MutateOption {
return func(object metav1.Object) error {
util.AddAnnotations(object, annotations)
return nil
}
}
// MergeNoConflictLabels will merge the existing labels with the labels passed in, it will report conflicts if exists
func MergeNoConflictLabels(labels map[string]string) MutateOption {
return func(object metav1.Object) error {
existingLabels := object.GetLabels()
// check and fill the labels
for k, v := range labels {
ev, ok := existingLabels[k]
if ok && ev != "" && ev != v {
return fmt.Errorf("%s for object %s, key: %s, conflicts value: %s <-> %s", velaerr.LabelConflict, object.GetName(), k, ev, v)
}
existingLabels[k] = v
}
object.SetLabels(existingLabels)
return nil
}
}
// CreateOrUpdateNamespace will create a namespace if not exist, it will also update a namespace if exists
// It will report an error if the labels conflict while it will override the annotations
func CreateOrUpdateNamespace(ctx context.Context, kubeClient client.Client, name string, options ...MutateOption) error {
err := CreateNamespace(ctx, kubeClient, name, options...)
// only if namespace don't have the env label that we need to update it
if apierrors.IsAlreadyExists(err) {
return UpdateNamespace(ctx, kubeClient, name, options...)
}
return err
}
// CreateNamespace will create a namespace with mutate option
func CreateNamespace(ctx context.Context, kubeClient client.Client, name string, options ...MutateOption) error {
obj := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: corev1.NamespaceSpec{},
}
for _, op := range options {
if err := op(obj); err != nil {
return err
}
}
return kubeClient.Create(ctx, obj)
}
// GetNamespace will return a namespace with mutate option
func GetNamespace(ctx context.Context, kubeClient client.Client, name string) (*corev1.Namespace, error) {
obj := &corev1.Namespace{}
err := kubeClient.Get(ctx, client.ObjectKey{Name: name}, obj)
if err != nil {
return nil, err
}
return obj, nil
}
// UpdateNamespace will update a namespace with mutate option
func UpdateNamespace(ctx context.Context, kubeClient client.Client, name string, options ...MutateOption) error {
var namespace corev1.Namespace
err := kubeClient.Get(ctx, k8stypes.NamespacedName{Name: name}, &namespace)
if err != nil {
return err
}
for _, op := range options {
if err = op(&namespace); err != nil {
return err
}
}
return kubeClient.Update(ctx, &namespace)
}
// GetServiceAccountSubjectFromConfig extract ServiceAccountName subject from token
func GetServiceAccountSubjectFromConfig(cfg *rest.Config) string {
sub, _ := GetTokenSubject(cfg.BearerToken)
return sub
}
// GetCertificateCommonNameAndOrganizationsFromConfig extract CommonName and Organizations from Certificate
func GetCertificateCommonNameAndOrganizationsFromConfig(cfg *rest.Config) (string, []string) {
cert := cfg.CertData
if len(cert) == 0 && cfg.CertFile != "" {
cert, _ = os.ReadFile(cfg.CertFile)
}
name, _ := GetCertificateSubject(cert)
if name == nil {
return "", nil
}
return name.CommonName, name.Organization
}
// GetUserInfoFromConfig extract UserInfo from KubeConfig
func GetUserInfoFromConfig(cfg *rest.Config) *authv1.UserInfo {
if sub := GetServiceAccountSubjectFromConfig(cfg); sub != "" {
return &authv1.UserInfo{Username: sub}
}
if cn, orgs := GetCertificateCommonNameAndOrganizationsFromConfig(cfg); cn != "" {
return &authv1.UserInfo{Username: cn, Groups: orgs}
}
return nil
}
// AutoSetSelfImpersonationInConfig set impersonate username and group to the identity in the original rest config
func AutoSetSelfImpersonationInConfig(cfg *rest.Config) {
if userInfo := GetUserInfoFromConfig(cfg); userInfo != nil {
cfg.Impersonate.UserName = userInfo.Username
cfg.Impersonate.Groups = append(cfg.Impersonate.Groups, userInfo.Groups...)
}
}
// CreateOrUpdate create or update a kubernetes object
func CreateOrUpdate(ctx context.Context, cli client.Client, obj client.Object) (controllerutil.OperationResult, error) {
bs, err := json.Marshal(obj)
if err != nil {
return controllerutil.OperationResultNone, err
}
return controllerutil.CreateOrUpdate(ctx, cli, obj, func() error {
createTimestamp := obj.GetCreationTimestamp()
resourceVersion := obj.GetResourceVersion()
deletionTimestamp := obj.GetDeletionTimestamp()
generation := obj.GetGeneration()
managedFields := obj.GetManagedFields()
if e := json.Unmarshal(bs, obj); err != nil {
return e
}
obj.SetCreationTimestamp(createTimestamp)
obj.SetResourceVersion(resourceVersion)
obj.SetDeletionTimestamp(deletionTimestamp)
obj.SetGeneration(generation)
obj.SetManagedFields(managedFields)
return nil
})
}
// EscapeResourceNameToLabelValue parse characters in resource name to label valid name
func EscapeResourceNameToLabelValue(resourceName string) string {
return strings.ReplaceAll(resourceName, ":", "_")
}
// IsClusterScope check if the gvk is cluster scoped
func IsClusterScope(gvk schema.GroupVersionKind, mapper meta.RESTMapper) (bool, error) {
mappings, err := mapper.RESTMappings(gvk.GroupKind(), gvk.Version)
isClusterScope := len(mappings) > 0 && mappings[0].Scope.Name() == meta.RESTScopeNameRoot
return isClusterScope, err
}
// GetPodsLogs get logs from pods
func GetPodsLogs(ctx context.Context, config *rest.Config, containerName string, selectPods []*querytypes.PodBase, tmpl string, logC chan<- string, tailLines *int64) error {
if err := verifyPods(selectPods); err != nil {
return err
}
podRegex := getPodRegex(selectPods)
pods, err := regexp.Compile(podRegex)
if err != nil {
return fmt.Errorf("fail to compile '%s' for logs query", podRegex)
}
container := regexp.MustCompile(".*")
if containerName != "" {
container = regexp.MustCompile(containerName + ".*")
}
// These pods are from the same namespace, so we can use the first one to get the namespace
namespace := selectPods[0].Metadata.Namespace
selector := labels.NewSelector()
// Only use the labels to select pod if query one pod's log. It is only used when query vela-core log
if len(selectPods) == 1 {
for k, v := range selectPods[0].Metadata.Labels {
req, _ := labels.NewRequirement(k, selection.Equals, []string{v})
if req != nil {
selector = selector.Add(*req)
}
}
}
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}
added, removed, err := stern.Watch(ctx,
clientSet.CoreV1().Pods(namespace),
pods,
container,
nil,
[]stern.ContainerState{stern.RUNNING, stern.TERMINATED},
selector,
)
if err != nil {
return err
}
tails := make(map[string]*stern.Tail)
funs := map[string]interface{}{
"json": func(in interface{}) (string, error) {
b, err := json.Marshal(in)
if err != nil {
return "", err
}
return string(b), nil
},
"color": func(color color.Color, text string) string {
return color.SprintFunc()(text)
},
}
template, err := template.New("log").Funcs(funs).Parse(tmpl)
if err != nil {
return errors.Wrap(err, "unable to parse template")
}
go func() {
for p := range added {
id := p.GetID()
if tails[id] != nil {
continue
}
// 48h
dur, _ := time.ParseDuration("48h")
tail := stern.NewTail(p.Namespace, p.Pod, p.Container, template, &stern.TailOptions{
Timestamps: true,
SinceSeconds: int64(dur.Seconds()),
Exclude: nil,
Include: nil,
Namespace: false,
TailLines: tailLines, // default for all logs
})
tails[id] = tail
tail.Start(ctx, clientSet.CoreV1().Pods(p.Namespace), logC)
}
}()
go func() {
for p := range removed {
id := p.GetID()
if tails[id] == nil {
continue
}
tails[id].Close()
delete(tails, id)
}
}()
<-ctx.Done()
close(logC)
return nil
}
func getPodRegex(pods []*querytypes.PodBase) string {
var podNames []string
for _, pod := range pods {
podNames = append(podNames, fmt.Sprintf("(%s.*)", pod.Metadata.Name))
}
return strings.Join(podNames, "|")
}
func verifyPods(pods []*querytypes.PodBase) error {
if len(pods) == 0 {
return errors.New("no pods selected")
}
if len(pods) == 1 {
return nil
}
namespace := pods[0].Metadata.Namespace
for _, pod := range pods {
if pod.Metadata.Namespace != namespace {
return errors.New("cannot select pods from different namespaces")
}
}
return nil
}