Refactor: use cuex engine (#6575)

* refactor: use cuex engine

Signed-off-by: FogDong <fog@bentoml.com>

* fix: fix lint

Signed-off-by: FogDong <fog@bentoml.com>

* fix: fix unit test

Signed-off-by: FogDong <fog@bentoml.com>

* fix: fix static check and sdk tests

Signed-off-by: FogDong <fog@bentoml.com>

* fix: fix testdata

Signed-off-by: FogDong <fog@bentoml.com>

* fix: fix velaql unit test

Signed-off-by: FogDong <fog@bentoml.com>

* fix: fix docgen parser

Signed-off-by: FogDong <fog@bentoml.com>

* fix: fix cuegen

Signed-off-by: FogDong <fog@bentoml.com>

* fix: fix velaql

Signed-off-by: FogDong <fog@bentoml.com>

* fix: delete useless print

Signed-off-by: FogDong <fog@bentoml.com>

* fix: set client for ql

Signed-off-by: FogDong <fog@bentoml.com>

* fix: fix mt tests

Signed-off-by: FogDong <fog@bentoml.com>

* fix: set kubeclient in generator

Signed-off-by: FogDong <fog@bentoml.com>

* fix: use pass kube client

Signed-off-by: FogDong <fog@bentoml.com>

* fix: simplify ql

Signed-off-by: FogDong <fog@bentoml.com>

* fix: fix lint

Signed-off-by: FogDong <fog@bentoml.com>

* fix: add wf debug back

Signed-off-by: FogDong <fog@bentoml.com>

* fix: add loader

Signed-off-by: FogDong <fog@bentoml.com>

---------

Signed-off-by: FogDong <fog@bentoml.com>
This commit is contained in:
Tianxin Dong
2024-07-27 17:44:20 +08:00
committed by GitHub
parent a565b48ae6
commit 4f8bf44684
202 changed files with 3150 additions and 3845 deletions

View File

@@ -1,110 +0,0 @@
/*
Copyright 2021. The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package velaql
import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
wfContext "github.com/kubevela/workflow/pkg/context"
"github.com/kubevela/workflow/pkg/cue/model/value"
)
// NewViewContext new view context
func NewViewContext() (wfContext.Context, error) {
viewContext := &ViewContext{}
var err error
viewContext.vars, err = value.NewValue("", nil, "")
return viewContext, err
}
// ViewContext is view context
type ViewContext struct {
vars *value.Value
}
// GetVar get variable from workflow context.
func (c ViewContext) GetVar(paths ...string) (*value.Value, error) {
return c.vars.LookupValue(paths...)
}
// SetVar set variable to workflow context.
func (c ViewContext) SetVar(v *value.Value, paths ...string) error {
str, err := v.String()
if err != nil {
return errors.WithMessage(err, "compile var")
}
if err := c.vars.FillRaw(str, paths...); err != nil {
return err
}
return c.vars.Error()
}
// GetStore get configmap of workflow context.
func (c ViewContext) GetStore() *corev1.ConfigMap {
return nil
}
// GetMutableValue get mutable data from workflow context.
func (c ViewContext) GetMutableValue(_ ...string) string {
return ""
}
// SetMutableValue set mutable data in workflow context config map.
func (c ViewContext) SetMutableValue(_ string, _ ...string) {
}
// IncreaseCountValueInMemory increase count in workflow context memory store.
func (c ViewContext) IncreaseCountValueInMemory(_ ...string) int {
return 0
}
// SetValueInMemory set data in workflow context memory store.
func (c ViewContext) SetValueInMemory(_ interface{}, _ ...string) {
}
// GetValueInMemory get data in workflow context memory store.
func (c ViewContext) GetValueInMemory(_ ...string) (interface{}, bool) {
return "", true
}
// DeleteValueInMemory delete data in workflow context memory store.
func (c ViewContext) DeleteValueInMemory(_ ...string) {
}
// DeleteMutableValue delete mutable data in workflow context.
func (c ViewContext) DeleteMutableValue(_ ...string) {
}
// Commit the workflow context and persist it's content.
func (c ViewContext) Commit() error {
return errors.New("not support func Commit")
}
// MakeParameter make 'value' with string
func (c ViewContext) MakeParameter(parameter string) (*value.Value, error) {
if parameter == "" {
parameter = "{}"
}
return c.vars.MakeValue(parameter)
}
// StoreRef return the store reference of workflow context.
func (c ViewContext) StoreRef() *corev1.ObjectReference {
return nil
}

View File

@@ -17,15 +17,16 @@
package velaql
import (
"context"
"regexp"
"strconv"
"strings"
"cuelang.org/go/cue"
"github.com/pkg/errors"
"github.com/kubevela/workflow/pkg/cue/model/value"
"github.com/oam-dev/kubevela/pkg/utils"
"github.com/oam-dev/kubevela/pkg/workflow/providers"
)
// QueryView contains query data
@@ -98,20 +99,19 @@ func ParseVelaQL(ql string) (QueryView, error) {
}
// ParseVelaQLFromPath will parse a velaQL file path to QueryView
func ParseVelaQLFromPath(velaQLViewPath string) (*QueryView, error) {
func ParseVelaQLFromPath(ctx context.Context, velaQLViewPath string) (*QueryView, error) {
body, err := utils.ReadRemoteOrLocalPath(velaQLViewPath, false)
if err != nil {
return nil, errors.Errorf("read view file from %s: %v", velaQLViewPath, err)
}
val, err := value.NewValue(string(body), nil, "")
val, err := providers.Compiler.Get().CompileString(ctx, string(body))
if err != nil {
return nil, errors.Errorf("new value for view: %v", err)
return nil, errors.Errorf("error when parsing view: %v", err)
}
var expStr string
exp, err := val.LookupValue(KeyWordExport)
if err == nil {
exp := val.LookupPath(cue.ParsePath(KeyWordExport))
if exp.Err() == nil {
expStr, err = exp.String()
if err != nil {
expStr = DefaultExportValue

View File

@@ -1,339 +0,0 @@
/*
Copyright 2021. The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package query
import (
"context"
"github.com/hashicorp/go-version"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/multicluster"
"github.com/oam-dev/kubevela/pkg/oam"
"github.com/oam-dev/kubevela/pkg/resourcetracker"
"github.com/oam-dev/kubevela/pkg/velaql/providers/query/types"
)
// AppCollector collect resource created by application
type AppCollector struct {
k8sClient client.Client
opt Option
}
// NewAppCollector create a app collector
func NewAppCollector(cli client.Client, opt Option) *AppCollector {
return &AppCollector{
k8sClient: cli,
opt: opt,
}
}
const velaVersionNumberToUpgradeVelaQL = "v1.2.0-rc.1"
// CollectResourceFromApp collect resources created by application
func (c *AppCollector) CollectResourceFromApp(ctx context.Context) ([]Resource, error) {
app := new(v1beta1.Application)
appKey := client.ObjectKey{Name: c.opt.Name, Namespace: c.opt.Namespace}
if err := c.k8sClient.Get(ctx, appKey, app); err != nil {
return nil, err
}
var currentVersionNumber string
if annotations := app.GetAnnotations(); annotations != nil && annotations[oam.AnnotationKubeVelaVersion] != "" {
currentVersionNumber = annotations[oam.AnnotationKubeVelaVersion]
}
velaVersionToUpgradeVelaQL, _ := version.NewVersion(velaVersionNumberToUpgradeVelaQL)
currentVersion, err := version.NewVersion(currentVersionNumber)
if err != nil {
resources, err := c.FindResourceFromResourceTrackerSpec(ctx, app)
if err != nil {
return c.FindResourceFromAppliedResourcesField(ctx, app)
}
return resources, nil
}
if velaVersionToUpgradeVelaQL.GreaterThan(currentVersion) {
return c.FindResourceFromAppliedResourcesField(ctx, app)
}
return c.FindResourceFromResourceTrackerSpec(ctx, app)
}
// ListApplicationResources list application applied resources from tracker
func (c *AppCollector) ListApplicationResources(ctx context.Context, app *v1beta1.Application) ([]*types.AppliedResource, error) {
rootRT, currentRT, historyRTs, _, err := resourcetracker.ListApplicationResourceTrackers(ctx, c.k8sClient, app)
if err != nil {
return nil, err
}
var managedResources []*types.AppliedResource
existResources := make(map[common.ClusterObjectReference]bool, len(app.Spec.Components))
if c.opt.Filter.QueryNewest {
historyRTs = nil
}
for _, rt := range append(historyRTs, rootRT, currentRT) {
if rt != nil {
for _, managedResource := range rt.Spec.ManagedResources {
if isResourceInTargetCluster(c.opt.Filter, managedResource.ClusterObjectReference) &&
isResourceInTargetComponent(c.opt.Filter, managedResource.Component) &&
(c.opt.WithTree || isResourceMatchKindAndVersion(c.opt.Filter, managedResource.Kind, managedResource.APIVersion)) {
if c.opt.WithTree {
// If we want to query the tree, we only need to query once for the same resource.
if _, exist := existResources[managedResource.ClusterObjectReference]; exist {
continue
}
existResources[managedResource.ClusterObjectReference] = true
}
managedResources = append(managedResources, &types.AppliedResource{
Cluster: func() string {
if managedResource.Cluster != "" {
return managedResource.Cluster
}
return "local"
}(),
Kind: managedResource.Kind,
Component: managedResource.Component,
Trait: managedResource.Trait,
Name: managedResource.Name,
Namespace: managedResource.Namespace,
APIVersion: managedResource.APIVersion,
ResourceVersion: managedResource.ResourceVersion,
UID: managedResource.UID,
PublishVersion: oam.GetPublishVersion(rt),
DeployVersion: func() string {
obj, _ := managedResource.ToUnstructuredWithData()
if obj != nil {
return oam.GetDeployVersion(obj)
}
return ""
}(),
Revision: rt.GetLabels()[oam.LabelAppRevision],
Latest: currentRT != nil && rt.Name == currentRT.Name,
})
}
}
}
}
if !c.opt.WithTree {
return managedResources, nil
}
// merge user defined customize rule before every request.
err = mergeCustomRules(ctx, c.k8sClient)
if err != nil {
return managedResources, err
}
filter := func(node types.ResourceTreeNode) bool {
return isResourceMatchKindAndVersion(c.opt.Filter, node.Kind, node.APIVersion)
}
var matchedResources []*types.AppliedResource
// error from leaf nodes won't block the results
for i := range managedResources {
resource := managedResources[i]
root := types.ResourceTreeNode{
Cluster: resource.Cluster,
APIVersion: resource.APIVersion,
Kind: resource.Kind,
Namespace: resource.Namespace,
Name: resource.Name,
UID: resource.UID,
}
root.LeafNodes, err = iterateListSubResources(ctx, resource.Cluster, c.k8sClient, root, 1, filter)
if err != nil {
// if the resource has been deleted, continue access next appliedResource don't break the whole request
if kerrors.IsNotFound(err) {
continue
}
klog.Errorf("query leaf node resource apiVersion=%s kind=%s namespace=%s name=%s failure %s, skip this resource", root.APIVersion, root.Kind, root.Namespace, root.Name, err.Error())
continue
}
if !filter(root) && len(root.LeafNodes) == 0 {
continue
}
rootObject, err := fetchObjectWithResourceTreeNode(ctx, resource.Cluster, c.k8sClient, root)
if err != nil {
// if the resource has been deleted, continue access next appliedResource don't break the whole request
if kerrors.IsNotFound(err) {
continue
}
klog.Errorf("fetch object for resource apiVersion=%s kind=%s namespace=%s name=%s failure %s, skip this resource", root.APIVersion, root.Kind, root.Namespace, root.Name, err.Error())
continue
}
rootStatus, err := CheckResourceStatus(*rootObject)
if err != nil {
klog.Errorf("check status for resource apiVersion=%s kind=%s namespace=%s name=%s failure %s, skip this resource", root.APIVersion, root.Kind, root.Namespace, root.Name, err.Error())
continue
}
root.HealthStatus = *rootStatus
addInfo, err := additionalInfo(*rootObject)
if err != nil {
klog.Errorf("check additionalInfo for resource apiVersion=%s kind=%s namespace=%s name=%s failure %s, skip this resource", root.APIVersion, root.Kind, root.Namespace, root.Name, err.Error())
continue
}
root.AdditionalInfo = addInfo
root.CreationTimestamp = rootObject.GetCreationTimestamp().Time
if !rootObject.GetDeletionTimestamp().IsZero() {
root.DeletionTimestamp = rootObject.GetDeletionTimestamp().Time
}
root.Object = *rootObject
resource.ResourceTree = &root
matchedResources = append(matchedResources, resource)
}
return matchedResources, nil
}
// FindResourceFromResourceTrackerSpec find resources from ResourceTracker spec
func (c *AppCollector) FindResourceFromResourceTrackerSpec(ctx context.Context, app *v1beta1.Application) ([]Resource, error) {
rootRT, currentRT, historyRTs, _, err := resourcetracker.ListApplicationResourceTrackers(ctx, c.k8sClient, app)
if err != nil {
klog.Errorf("query the resourcetrackers failure %s", err.Error())
return nil, err
}
var resources = []Resource{}
existResources := make(map[common.ClusterObjectReference]bool, len(app.Spec.Components))
for _, rt := range append([]*v1beta1.ResourceTracker{rootRT, currentRT}, historyRTs...) {
if rt != nil {
for _, managedResource := range rt.Spec.ManagedResources {
if isResourceInTargetCluster(c.opt.Filter, managedResource.ClusterObjectReference) &&
isResourceInTargetComponent(c.opt.Filter, managedResource.Component) &&
isResourceMatchKindAndVersion(c.opt.Filter, managedResource.Kind, managedResource.APIVersion) {
if _, exist := existResources[managedResource.ClusterObjectReference]; exist {
continue
}
existResources[managedResource.ClusterObjectReference] = true
obj, err := managedResource.ToUnstructuredWithData()
if err != nil || c.opt.WithStatus {
// For the application with apply once policy, there is no data in RT.
// IF the WithStatus is true, get the object from cluster
_, obj, err = getObjectCreatedByComponent(ctx, c.k8sClient, managedResource.ObjectReference, managedResource.Cluster)
if err != nil {
klog.Errorf("get obj from the cluster failure %s", err.Error())
continue
}
}
clusterName := managedResource.Cluster
if clusterName == "" {
clusterName = multicluster.ClusterLocalName
}
resources = append(resources, Resource{
Cluster: clusterName,
Revision: oam.GetPublishVersion(rt),
Component: managedResource.Component,
Object: obj,
})
}
}
}
}
return resources, nil
}
// FindResourceFromAppliedResourcesField find resources from AppliedResources field
func (c *AppCollector) FindResourceFromAppliedResourcesField(ctx context.Context, app *v1beta1.Application) ([]Resource, error) {
resources := make([]Resource, 0, len(app.Spec.Components))
for _, res := range app.Status.AppliedResources {
if !isResourceInTargetCluster(c.opt.Filter, res) {
continue
}
if !isResourceMatchKindAndVersion(c.opt.Filter, res.APIVersion, res.Kind) {
continue
}
compName, obj, err := getObjectCreatedByComponent(ctx, c.k8sClient, res.ObjectReference, res.Cluster)
if err != nil {
return nil, err
}
if len(compName) != 0 && isResourceInTargetComponent(c.opt.Filter, compName) {
resources = append(resources, Resource{
Component: compName,
Revision: obj.GetLabels()[oam.LabelAppRevision],
Cluster: res.Cluster,
Object: obj,
})
}
}
if len(resources) == 0 {
return nil, errors.Errorf("fail to find resources created by application: %v", c.opt.Name)
}
return resources, nil
}
// getObjectCreatedByComponent get k8s obj created by components
func getObjectCreatedByComponent(ctx context.Context, cli client.Client, objRef corev1.ObjectReference, cluster string) (string, *unstructured.Unstructured, error) {
ctx = multicluster.ContextWithClusterName(ctx, cluster)
obj := new(unstructured.Unstructured)
obj.SetGroupVersionKind(objRef.GroupVersionKind())
obj.SetNamespace(objRef.Namespace)
obj.SetName(objRef.Name)
if err := cli.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil {
if kerrors.IsNotFound(err) {
return "", nil, nil
}
return "", nil, err
}
componentName := obj.GetLabels()[oam.LabelAppComponent]
return componentName, obj, nil
}
func getEventFieldSelector(obj *unstructured.Unstructured) fields.Selector {
field := fields.Set{}
field["involvedObject.name"] = obj.GetName()
field["involvedObject.namespace"] = obj.GetNamespace()
field["involvedObject.kind"] = obj.GetObjectKind().GroupVersionKind().Kind
field["involvedObject.uid"] = string(obj.GetUID())
return field.AsSelector()
}
func isResourceInTargetCluster(opt FilterOption, resource common.ClusterObjectReference) bool {
if opt.Cluster == "" && opt.ClusterNamespace == "" {
return true
}
if (opt.Cluster == resource.Cluster || (opt.Cluster == "local" && resource.Cluster == "")) &&
(opt.ClusterNamespace == resource.ObjectReference.Namespace || opt.ClusterNamespace == "") {
return true
}
return false
}
func isResourceInTargetComponent(opt FilterOption, componentName string) bool {
if len(opt.Components) == 0 {
return true
}
for _, component := range opt.Components {
if component == componentName {
return true
}
}
return false
}
func isResourceMatchKindAndVersion(opt FilterOption, kind, version string) bool {
if opt.APIVersion != "" && opt.APIVersion != version {
return false
}
if opt.Kind != "" && opt.Kind != kind {
return false
}
return true
}

View File

@@ -1,454 +0,0 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package query
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"github.com/kubevela/pkg/util/slices"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/networking/v1"
networkv1beta1 "k8s.io/api/networking/v1beta1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
monitorContext "github.com/kubevela/pkg/monitor/context"
wfContext "github.com/kubevela/workflow/pkg/context"
"github.com/kubevela/workflow/pkg/cue/model/value"
"github.com/kubevela/workflow/pkg/types"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
apis "github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/multicluster"
querytypes "github.com/oam-dev/kubevela/pkg/velaql/providers/query/types"
)
// CollectServiceEndpoints generator service endpoints is available for common component type,
// such as webservice or helm
// it can not support the cloud service component currently
func (h *provider) CollectServiceEndpoints(ctx monitorContext.Context, _ wfContext.Context, v *value.Value, _ types.Action) error {
val, err := v.LookupValue("app")
if err != nil {
return err
}
opt := Option{}
if err = val.UnmarshalTo(&opt); err != nil {
return err
}
app := new(v1beta1.Application)
err = findResource(ctx, h.cli, app, opt.Name, opt.Namespace, "")
if err != nil {
return fmt.Errorf("query app failure %w", err)
}
serviceEndpoints := make([]querytypes.ServiceEndpoint, 0)
var clusterGatewayNodeIP = make(map[string]string)
collector := NewAppCollector(h.cli, opt)
resources, err := collector.ListApplicationResources(ctx, app)
if err != nil {
return err
}
for i, resource := range resources {
cluster := resources[i].Cluster
cachedSelectorNodeIP := func() string {
if ip, exist := clusterGatewayNodeIP[cluster]; exist {
return ip
}
ip := selectorNodeIP(ctx, cluster, h.cli)
if ip != "" {
clusterGatewayNodeIP[cluster] = ip
}
return ip
}
if resource.ResourceTree != nil {
serviceEndpoints = append(serviceEndpoints, getEndpointFromNode(ctx, h.cli, resource.ResourceTree, resource.Component, cachedSelectorNodeIP)...)
} else {
serviceEndpoints = append(serviceEndpoints, getServiceEndpoints(ctx, h.cli, resource.GroupVersionKind(), resource.Name, resource.Namespace, resource.Cluster, resource.Component, cachedSelectorNodeIP)...)
}
}
return fillQueryResult(v, serviceEndpoints, "list")
}
func getEndpointFromNode(ctx context.Context, cli client.Client, node *querytypes.ResourceTreeNode, component string, cachedSelectorNodeIP func() string) []querytypes.ServiceEndpoint {
if node == nil {
return nil
}
var serviceEndpoints []querytypes.ServiceEndpoint
serviceEndpoints = append(serviceEndpoints, getServiceEndpoints(ctx, cli, node.GroupVersionKind(), node.Name, node.Namespace, node.Cluster, component, cachedSelectorNodeIP)...)
for _, child := range node.LeafNodes {
serviceEndpoints = append(serviceEndpoints, getEndpointFromNode(ctx, cli, child, component, cachedSelectorNodeIP)...)
}
return serviceEndpoints
}
func getServiceEndpoints(ctx context.Context, cli client.Client, gvk schema.GroupVersionKind, name, namespace, cluster, component string, cachedSelectorNodeIP func() string) []querytypes.ServiceEndpoint {
var serviceEndpoints []querytypes.ServiceEndpoint
switch gvk.Kind {
case "Ingress":
if gvk.Group == networkv1beta1.GroupName && (gvk.Version == "v1beta1" || gvk.Version == "v1") {
var ingress v1.Ingress
ingress.SetGroupVersionKind(gvk)
if err := findResource(ctx, cli, &ingress, name, namespace, cluster); err != nil {
klog.Error(err, fmt.Sprintf("find v1 Ingress %s/%s from cluster %s failure", name, namespace, cluster))
return nil
}
serviceEndpoints = append(serviceEndpoints, generatorFromIngress(ingress, cluster, component)...)
} else {
klog.Warning("not support ingress version", "version", gvk)
}
case "Service":
var service corev1.Service
service.SetGroupVersionKind(gvk)
if err := findResource(ctx, cli, &service, name, namespace, cluster); err != nil {
klog.Error(err, fmt.Sprintf("find v1 Service %s/%s from cluster %s failure", name, namespace, cluster))
return nil
}
serviceEndpoints = append(serviceEndpoints, generatorFromService(service, cachedSelectorNodeIP, cluster, component, "")...)
case "SeldonDeployment":
obj := new(unstructured.Unstructured)
obj.SetGroupVersionKind(gvk)
if err := findResource(ctx, cli, obj, name, namespace, cluster); err != nil {
klog.Error(err, fmt.Sprintf("find v1 Seldon Deployment %s/%s from cluster %s failure", name, namespace, cluster))
return nil
}
anno := obj.GetAnnotations()
serviceName := "ambassador"
serviceNS := apis.DefaultKubeVelaNS
if anno != nil {
if anno[annoAmbassadorServiceName] != "" {
serviceName = anno[annoAmbassadorServiceName]
}
if anno[annoAmbassadorServiceNamespace] != "" {
serviceNS = anno[annoAmbassadorServiceNamespace]
}
}
var service corev1.Service
if err := findResource(ctx, cli, &service, serviceName, serviceNS, cluster); err != nil {
klog.Error(err, fmt.Sprintf("find v1 Service %s/%s from cluster %s failure", serviceName, serviceNS, cluster))
return nil
}
serviceEndpoints = append(serviceEndpoints, generatorFromService(service, cachedSelectorNodeIP, cluster, component, fmt.Sprintf("/seldon/%s/%s", namespace, name))...)
case "HTTPRoute":
var route gatewayv1beta1.HTTPRoute
route.SetGroupVersionKind(gvk)
if err := findResource(ctx, cli, &route, name, namespace, cluster); err != nil {
klog.Error(err, fmt.Sprintf("find HTTPRoute %s/%s from cluster %s failure", name, namespace, cluster))
return nil
}
serviceEndpoints = append(serviceEndpoints, generatorFromHTTPRoute(ctx, cli, route, cluster, component)...)
}
return serviceEndpoints
}
func findResource(ctx context.Context, cli client.Client, obj client.Object, name, namespace, cluster string) error {
obj.SetNamespace(namespace)
obj.SetName(name)
gctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
if err := cli.Get(multicluster.ContextWithClusterName(gctx, cluster),
client.ObjectKeyFromObject(obj), obj); err != nil {
if kerrors.IsNotFound(err) {
return nil
}
return err
}
return nil
}
func generatorFromService(service corev1.Service, selectorNodeIP func() string, cluster, component, path string) []querytypes.ServiceEndpoint {
var serviceEndpoints []querytypes.ServiceEndpoint
var objRef = corev1.ObjectReference{
Kind: "Service",
Namespace: service.ObjectMeta.Namespace,
Name: service.ObjectMeta.Name,
UID: service.UID,
APIVersion: service.APIVersion,
ResourceVersion: service.ResourceVersion,
}
formatEndpoint := func(host, appProtocol string, portName string, portProtocol corev1.Protocol, portNum int32, inner bool) querytypes.ServiceEndpoint {
return querytypes.ServiceEndpoint{
Endpoint: querytypes.Endpoint{
Protocol: portProtocol,
AppProtocol: &appProtocol,
Host: host,
Port: int(portNum),
PortName: portName,
Path: path,
Inner: inner,
},
Ref: objRef,
Cluster: cluster,
Component: component,
}
}
switch service.Spec.Type {
case corev1.ServiceTypeLoadBalancer:
for _, port := range service.Spec.Ports {
appp := judgeAppProtocol(port.Port)
for _, ingress := range service.Status.LoadBalancer.Ingress {
if ingress.Hostname != "" {
serviceEndpoints = append(serviceEndpoints, formatEndpoint(ingress.Hostname, appp, port.Name, port.Protocol, port.Port, false))
}
if ingress.IP != "" {
serviceEndpoints = append(serviceEndpoints, formatEndpoint(ingress.IP, appp, port.Name, port.Protocol, port.Port, false))
}
}
}
case corev1.ServiceTypeNodePort:
for _, port := range service.Spec.Ports {
appp := judgeAppProtocol(port.Port)
serviceEndpoints = append(serviceEndpoints, formatEndpoint(selectorNodeIP(), appp, port.Name, port.Protocol, port.NodePort, false))
}
case corev1.ServiceTypeClusterIP, corev1.ServiceTypeExternalName:
for _, port := range service.Spec.Ports {
appp := judgeAppProtocol(port.Port)
serviceEndpoints = append(serviceEndpoints, formatEndpoint(fmt.Sprintf("%s.%s", service.Name, service.Namespace), appp, port.Name, port.Protocol, port.Port, true))
}
}
return serviceEndpoints
}
func generatorFromIngress(ingress v1.Ingress, cluster, component string) (serviceEndpoints []querytypes.ServiceEndpoint) {
getAppProtocol := func(host string) string {
if len(ingress.Spec.TLS) > 0 {
for _, tls := range ingress.Spec.TLS {
if len(tls.Hosts) > 0 && slices.Contains(tls.Hosts, host) {
return querytypes.HTTPS
}
if len(tls.Hosts) == 0 {
return querytypes.HTTPS
}
}
}
return querytypes.HTTP
}
// It depends on the Ingress Controller
getEndpointPort := func(appProtocol string) int {
if appProtocol == querytypes.HTTPS {
if port, err := strconv.Atoi(ingress.Annotations[apis.AnnoIngressControllerHTTPSPort]); port > 0 && err == nil {
return port
}
return 443
}
if port, err := strconv.Atoi(ingress.Annotations[apis.AnnoIngressControllerHTTPPort]); port > 0 && err == nil {
return port
}
return 80
}
// The host in rule maybe empty, means access the application by the Gateway Host(IP)
getHost := func(host string) string {
if host != "" {
return host
}
return ingress.Annotations[apis.AnnoIngressControllerHost]
}
for _, rule := range ingress.Spec.Rules {
var appProtocol = getAppProtocol(rule.Host)
var appPort = getEndpointPort(appProtocol)
if rule.HTTP != nil {
for _, path := range rule.HTTP.Paths {
serviceEndpoints = append(serviceEndpoints, querytypes.ServiceEndpoint{
Endpoint: querytypes.Endpoint{
Protocol: corev1.ProtocolTCP,
AppProtocol: &appProtocol,
Host: getHost(rule.Host),
Path: path.Path,
Port: appPort,
},
Ref: corev1.ObjectReference{
Kind: "Ingress",
Namespace: ingress.ObjectMeta.Namespace,
Name: ingress.ObjectMeta.Name,
UID: ingress.UID,
APIVersion: ingress.APIVersion,
ResourceVersion: ingress.ResourceVersion,
},
Cluster: cluster,
Component: component,
})
}
}
}
return serviceEndpoints
}
func getGatewayPortAndProtocol(ctx context.Context, cli client.Client, defaultNamespace, cluster string, parents []gatewayv1beta1.ParentReference) (string, int) {
for _, parent := range parents {
if parent.Kind != nil && *parent.Kind == "Gateway" {
var gateway gatewayv1beta1.Gateway
namespace := defaultNamespace
if parent.Namespace != nil {
namespace = string(*parent.Namespace)
}
if err := findResource(ctx, cli, &gateway, string(parent.Name), namespace, cluster); err != nil {
klog.Errorf("query the Gateway %s/%s/%s failure %s", cluster, namespace, string(parent.Name), err.Error())
}
var listener *gatewayv1beta1.Listener
if parent.SectionName != nil {
for i, lis := range gateway.Spec.Listeners {
if lis.Name == *parent.SectionName {
listener = &gateway.Spec.Listeners[i]
break
}
}
} else if len(gateway.Spec.Listeners) > 0 {
listener = &gateway.Spec.Listeners[0]
}
if listener != nil {
var protocol = querytypes.HTTP
if listener.Protocol == gatewayv1beta1.HTTPSProtocolType {
protocol = querytypes.HTTPS
}
var port = int(listener.Port)
// The gateway listener port may not be the externally exposed port.
// For example, the traefik addon has a default port mapping configuration of 8443->443 8000->80
// So users could set the `ports-mapping` annotation.
if mapping := gateway.Annotations["ports-mapping"]; mapping != "" {
for _, portItem := range strings.Split(mapping, ",") {
if portMap := strings.Split(portItem, ":"); len(portMap) == 2 {
if portMap[0] == fmt.Sprintf("%d", listener.Port) {
newPort, err := strconv.Atoi(portMap[1])
if err == nil {
port = newPort
}
}
}
}
}
return protocol, port
}
}
}
return querytypes.HTTP, 80
}
func generatorFromHTTPRoute(ctx context.Context, cli client.Client, route gatewayv1beta1.HTTPRoute, cluster, component string) []querytypes.ServiceEndpoint {
existPath := make(map[string]bool)
var serviceEndpoints []querytypes.ServiceEndpoint
for _, rule := range route.Spec.Rules {
for _, host := range route.Spec.Hostnames {
appProtocol, appPort := getGatewayPortAndProtocol(ctx, cli, route.Namespace, cluster, route.Spec.ParentRefs)
for _, match := range rule.Matches {
path := ""
if match.Path != nil && (match.Path.Type == nil || string(*match.Path.Type) == string(gatewayv1beta1.PathMatchPathPrefix)) {
path = *match.Path.Value
}
if !existPath[path] {
existPath[path] = true
serviceEndpoints = append(serviceEndpoints, querytypes.ServiceEndpoint{
Endpoint: querytypes.Endpoint{
Protocol: corev1.ProtocolTCP,
AppProtocol: &appProtocol,
Host: string(host),
Path: path,
Port: appPort,
},
Ref: corev1.ObjectReference{
Kind: route.Kind,
Namespace: route.ObjectMeta.Namespace,
Name: route.ObjectMeta.Name,
UID: route.UID,
APIVersion: route.APIVersion,
ResourceVersion: route.ResourceVersion,
},
Cluster: cluster,
Component: component,
})
}
}
}
}
return serviceEndpoints
}
func selectorNodeIP(ctx context.Context, clusterName string, client client.Client) string {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
var nodes corev1.NodeList
if err := client.List(multicluster.ContextWithClusterName(ctx, clusterName), &nodes); err != nil {
return ""
}
if len(nodes.Items) == 0 {
return ""
}
return selectGatewayIP(nodes.Items)
}
// judgeAppProtocol RFC-6335 and http://www.iana.org/assignments/service-names).
func judgeAppProtocol(port int32) string {
switch port {
case 80, 8080:
return querytypes.HTTP
case 443:
return querytypes.HTTPS
case 3306:
return querytypes.Mysql
case 6379:
return querytypes.Redis
default:
return ""
}
}
// selectGatewayIP will choose one gateway IP from all nodes, it will pick up external IP first. If there isn't any, it will pick the first node's internal IP.
func selectGatewayIP(nodes []corev1.Node) string {
var gatewayNode *corev1.Node
var workerNodes []corev1.Node
for i, node := range nodes {
if _, exist := node.Labels[apis.LabelNodeRoleGateway]; exist {
gatewayNode = &nodes[i]
break
} else if _, exist := node.Labels[apis.LabelNodeRoleWorker]; exist {
workerNodes = append(workerNodes, nodes[i])
}
}
var candidates = nodes
if gatewayNode != nil {
candidates = []corev1.Node{*gatewayNode}
} else if len(workerNodes) > 0 {
candidates = workerNodes
}
if len(candidates) == 0 {
return ""
}
var addressMaps = make([]map[corev1.NodeAddressType]string, 0)
for _, node := range candidates {
var addressMap = make(map[corev1.NodeAddressType]string)
for _, address := range node.Status.Addresses {
addressMap[address.Type] = address.Address
}
// first get external ip
if ip, exist := addressMap[corev1.NodeExternalIP]; exist {
return ip
}
addressMaps = append(addressMaps, addressMap)
}
return addressMaps[0][corev1.NodeInternalIP]
}

View File

@@ -1,419 +0,0 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package query
import (
"context"
"fmt"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"
monitorContext "github.com/kubevela/pkg/monitor/context"
"github.com/kubevela/workflow/pkg/cue/model/value"
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/oam"
"github.com/oam-dev/kubevela/pkg/oam/util"
querytypes "github.com/oam-dev/kubevela/pkg/velaql/providers/query/types"
)
var _ = Describe("Test query endpoints", func() {
BeforeEach(func() {
})
Context("Test Generate Endpoints", func() {
It("Test endpoints with additional rules", func() {
err := k8sClient.Create(context.TODO(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "vela-system",
},
})
Expect(err).Should(SatisfyAny(BeNil(), util.AlreadyExistMatcher{}))
sts := common.AppStatus{
AppliedResources: []common.ClusterObjectReference{
{
Cluster: "",
ObjectReference: corev1.ObjectReference{
APIVersion: "machinelearning.seldon.io/v1",
Kind: "SeldonDeployment",
Namespace: "default",
Name: "sdep2",
},
},
},
}
testApp := &v1beta1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "endpoints-app-2",
Namespace: "default",
},
Spec: v1beta1.ApplicationSpec{
Components: []common.ApplicationComponent{
{
Name: "endpoints-test-2",
Type: "webservice",
},
},
},
Status: sts,
}
Expect(k8sClient.Create(context.TODO(), testApp)).Should(BeNil())
var gtapp v1beta1.Application
Expect(k8sClient.Get(context.TODO(), client.ObjectKey{Name: "endpoints-app-2", Namespace: "default"}, &gtapp)).Should(BeNil())
gtapp.Status = sts
Expect(k8sClient.Status().Update(ctx, &gtapp)).Should(BeNil())
var mr []v1beta1.ManagedResource
for _, ar := range sts.AppliedResources {
smr := v1beta1.ManagedResource{
ClusterObjectReference: ar,
}
smr.Component = "endpoints-test-2"
mr = append(mr, smr)
}
rt := &v1beta1.ResourceTracker{
ObjectMeta: metav1.ObjectMeta{
Name: "endpoints-app-2",
Namespace: "default",
Labels: map[string]string{
oam.LabelAppName: testApp.Name,
oam.LabelAppNamespace: testApp.Namespace,
},
},
Spec: v1beta1.ResourceTrackerSpec{
Type: v1beta1.ResourceTrackerTypeRoot,
ManagedResources: mr,
},
}
err = k8sClient.Create(context.TODO(), rt)
Expect(err).Should(BeNil())
By("Prepare configmap for relationship")
err = k8sClient.Create(context.TODO(), &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "rule-for-seldon-test",
Namespace: types.DefaultKubeVelaNS,
Labels: map[string]string{
oam.LabelResourceRules: "true",
oam.LabelResourceRuleFormat: oam.ResourceTopologyFormatJSON,
},
},
Data: map[string]string{
"rules": `[
{
"parentResourceType": {
"group": "machinelearning.seldon.io",
"kind": "SeldonDeployment"
},
"childrenResourceType": [
{
"apiVersion": "v1",
"kind": "Service"
}
]
}
]`,
},
})
Expect(err).Should(BeNil())
testServiceList := []map[string]interface{}{
{
"name": "clusterip-2",
"ports": []corev1.ServicePort{
{Port: 80, TargetPort: intstr.FromInt(80), Name: "80port"},
{Port: 81, TargetPort: intstr.FromInt(81), Name: "81port"},
},
"type": corev1.ServiceTypeClusterIP,
},
{
"name": "load-balancer",
"ports": []corev1.ServicePort{
{Port: 8080, TargetPort: intstr.FromInt(8080), Name: "8080port", NodePort: 30020},
},
"type": corev1.ServiceTypeLoadBalancer,
"status": corev1.ServiceStatus{
LoadBalancer: corev1.LoadBalancerStatus{
Ingress: []corev1.LoadBalancerIngress{
{
IP: "2.2.2.2",
},
},
},
},
},
{
"name": "seldon-ambassador-2",
"ports": []corev1.ServicePort{
{Port: 80, TargetPort: intstr.FromInt(80), Name: "80port"},
},
"type": corev1.ServiceTypeLoadBalancer,
"status": corev1.ServiceStatus{
LoadBalancer: corev1.LoadBalancerStatus{
Ingress: []corev1.LoadBalancerIngress{
{
IP: "1.1.1.1",
},
},
},
},
},
}
abgvk := schema.GroupVersionKind{
Group: "machinelearning.seldon.io",
Version: "v1",
Kind: "SeldonDeployment",
}
obj := &unstructured.Unstructured{}
obj.SetName("sdep2")
obj.SetNamespace("default")
obj.SetAnnotations(map[string]string{
annoAmbassadorServiceName: "seldon-ambassador-2",
annoAmbassadorServiceNamespace: "default",
})
obj.SetGroupVersionKind(abgvk)
err = k8sClient.Create(context.TODO(), obj)
Expect(err).Should(BeNil())
abobj := &unstructured.Unstructured{}
abobj.SetGroupVersionKind(abgvk)
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: "sdep2", Namespace: "default"}, abobj)).Should(BeNil())
for _, s := range testServiceList {
ns := "default"
if s["namespace"] != nil {
ns = s["namespace"].(string)
}
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: s["name"].(string),
Namespace: ns,
OwnerReferences: []metav1.OwnerReference{
{APIVersion: "machinelearning.seldon.io/v1", Kind: "SeldonDeployment", Name: "sdep2", UID: abobj.GetUID()},
},
},
Spec: corev1.ServiceSpec{
Ports: s["ports"].([]corev1.ServicePort),
Type: s["type"].(corev1.ServiceType),
},
}
if s["labels"] != nil {
service.Labels = s["labels"].(map[string]string)
}
err := k8sClient.Create(context.TODO(), service)
Expect(err).Should(BeNil())
if s["status"] != nil {
service.Status = s["status"].(corev1.ServiceStatus)
err := k8sClient.Status().Update(context.TODO(), service)
Expect(err).Should(BeNil())
}
}
opt := `app: {
name: "endpoints-app-2"
namespace: "default"
filter: {
cluster: "",
clusterNamespace: "default",
}
withTree: true
}`
v, err := value.NewValue(opt, nil, "")
Expect(err).Should(BeNil())
pr := &provider{
cli: k8sClient,
}
logCtx := monitorContext.NewTraceContext(ctx, "")
err = pr.CollectServiceEndpoints(logCtx, nil, v, nil)
Expect(err).Should(BeNil())
urls := []string{
"http://1.1.1.1/seldon/default/sdep2",
"http://clusterip-2.default",
"clusterip-2.default:81",
"http://2.2.2.2:8080",
"http://1.1.1.1",
}
endValue, err := v.Field("list")
Expect(err).Should(BeNil())
var endpoints []querytypes.ServiceEndpoint
err = endValue.Decode(&endpoints)
Expect(err).Should(BeNil())
var edps []string
for _, e := range endpoints {
edps = append(edps, e.String())
}
Expect(edps).Should(BeEquivalentTo(urls))
})
It("Test select gateway IP", func() {
masterNode := corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
Labels: map[string]string{
"node-role.kubernetes.io/master": "true",
},
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Type: corev1.NodeInternalIP,
Address: "node1-internal-ip",
},
},
},
}
workerNode1 := corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-2",
Labels: map[string]string{
"node-role.kubernetes.io/worker": "true",
},
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Type: corev1.NodeInternalIP,
Address: "node2-internal-ip",
},
{
Type: corev1.NodeExternalIP,
Address: "node2-external-ip",
},
},
},
}
workerNode2 := corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-3",
Labels: map[string]string{
"node-role.kubernetes.io/worker": "true",
},
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Type: corev1.NodeInternalIP,
Address: "node3-internal-ip",
},
},
},
}
gatewayNode := corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-4",
Labels: map[string]string{
"node-role.kubernetes.io/gateway": "true",
},
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Type: corev1.NodeInternalIP,
Address: "node4-internal-ip",
},
{
Type: corev1.NodeExternalIP,
Address: "node4-external-ip",
},
},
},
}
testCase := []struct {
note string
nodes []corev1.Node
wantIP string
}{
{
note: "only master node",
nodes: []corev1.Node{masterNode},
wantIP: "node1-internal-ip",
},
{
note: "with worker node, select external ip first",
nodes: []corev1.Node{masterNode, workerNode1},
wantIP: "node2-external-ip",
},
{
note: "with worker node, select worker's internal ip",
nodes: []corev1.Node{masterNode, workerNode2},
wantIP: "node3-internal-ip",
},
{
note: "with gateway node, gateway node first",
nodes: []corev1.Node{masterNode, workerNode1, workerNode1, gatewayNode},
wantIP: "node4-external-ip",
},
}
for _, tc := range testCase {
By(tc.note)
ip := selectGatewayIP(tc.nodes)
Expect(ip).Should(Equal(tc.wantIP))
}
})
})
})
var _ = Describe("Test get ingress endpoint", func() {
It("Test get ingress endpoint with different apiVersion", func() {
ingress1 := v1.Ingress{}
Expect(yaml.Unmarshal([]byte(ingressYaml1), &ingress1)).Should(BeNil())
err := k8sClient.Create(ctx, &ingress1)
Expect(err).Should(BeNil())
gvk := schema.GroupVersionKind{Group: "networking.k8s.io", Version: "v1", Kind: "Ingress"}
Eventually(func() error {
eps := getServiceEndpoints(ctx, k8sClient, gvk, ingress1.Name, ingress1.Namespace, "", "", nil)
if len(eps) != 1 {
return fmt.Errorf("result length missmatch")
}
return nil
}, 2*time.Second, 500*time.Millisecond).Should(BeNil())
})
})
var ingressYaml1 = `
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ingress-1
namespace: default
spec:
rules:
- http:
paths:
- path: /testpath
pathType: Prefix
backend:
service:
name: test
port:
number: 80
`

View File

@@ -1,312 +0,0 @@
/*
Copyright 2021. The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package query
import (
"bufio"
"bytes"
"encoding/base64"
"fmt"
"io"
"time"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
monitorContext "github.com/kubevela/pkg/monitor/context"
pkgmulticluster "github.com/kubevela/pkg/multicluster"
wfContext "github.com/kubevela/workflow/pkg/context"
"github.com/kubevela/workflow/pkg/cue/model/value"
"github.com/kubevela/workflow/pkg/types"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/pkg/multicluster"
querytypes "github.com/oam-dev/kubevela/pkg/velaql/providers/query/types"
)
const (
// ProviderName is provider name for install.
ProviderName = "query"
// HelmReleaseKind is the kind of HelmRelease
HelmReleaseKind = "HelmRelease"
annoAmbassadorServiceName = "ambassador.service/name"
annoAmbassadorServiceNamespace = "ambassador.service/namespace"
)
type provider struct {
cli client.Client
cfg *rest.Config
}
// Resource refer to an object with cluster info
type Resource struct {
Cluster string `json:"cluster"`
Component string `json:"component"`
Revision string `json:"revision"`
Object *unstructured.Unstructured `json:"object"`
}
// Option is the query option
type Option struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
Filter FilterOption `json:"filter,omitempty"`
// WithStatus means query the object from the cluster and get the latest status
// This field only suitable for ListResourcesInApp
WithStatus bool `json:"withStatus,omitempty"`
// WithTree means recursively query the resource tree.
WithTree bool `json:"withTree,omitempty"`
}
// FilterOption filter resource created by component
type FilterOption struct {
Cluster string `json:"cluster,omitempty"`
ClusterNamespace string `json:"clusterNamespace,omitempty"`
Components []string `json:"components,omitempty"`
APIVersion string `json:"apiVersion,omitempty"`
Kind string `json:"kind,omitempty"`
QueryNewest bool `json:"queryNewest,omitempty"`
}
// ListResourcesInApp lists CRs created by Application, this provider queries the object data.
func (h *provider) ListResourcesInApp(ctx monitorContext.Context, _ wfContext.Context, v *value.Value, _ types.Action) error {
val, err := v.LookupValue("app")
if err != nil {
return err
}
opt := Option{}
if err = val.UnmarshalTo(&opt); err != nil {
return err
}
collector := NewAppCollector(h.cli, opt)
appResList, err := collector.CollectResourceFromApp(ctx)
if err != nil {
return v.FillObject(err.Error(), "err")
}
if appResList == nil {
appResList = make([]Resource, 0)
}
return fillQueryResult(v, appResList, "list")
}
// ListAppliedResources list applied resource from tracker, this provider only queries the metadata.
func (h *provider) ListAppliedResources(ctx monitorContext.Context, _ wfContext.Context, v *value.Value, _ types.Action) error {
val, err := v.LookupValue("app")
if err != nil {
return err
}
opt := Option{}
if err = val.UnmarshalTo(&opt); err != nil {
return v.FillObject(err.Error(), "err")
}
collector := NewAppCollector(h.cli, opt)
app := new(v1beta1.Application)
appKey := client.ObjectKey{Name: opt.Name, Namespace: opt.Namespace}
if err = h.cli.Get(ctx, appKey, app); err != nil {
return v.FillObject(err.Error(), "err")
}
appResList, err := collector.ListApplicationResources(ctx, app)
if err != nil {
return v.FillObject(err.Error(), "err")
}
if appResList == nil {
appResList = make([]*querytypes.AppliedResource, 0)
}
return fillQueryResult(v, appResList, "list")
}
func (h *provider) CollectResources(ctx monitorContext.Context, _ wfContext.Context, v *value.Value, _ types.Action) error {
val, err := v.LookupValue("app")
if err != nil {
return err
}
opt := Option{}
if err = val.UnmarshalTo(&opt); err != nil {
return v.FillObject(err.Error(), "err")
}
collector := NewAppCollector(h.cli, opt)
app := new(v1beta1.Application)
appKey := client.ObjectKey{Name: opt.Name, Namespace: opt.Namespace}
if err = h.cli.Get(ctx, appKey, app); err != nil {
return v.FillObject(err.Error(), "err")
}
appResList, err := collector.ListApplicationResources(ctx, app)
if err != nil {
return v.FillObject(err.Error(), "err")
}
var resources = make([]querytypes.ResourceItem, 0)
for _, res := range appResList {
if res.ResourceTree != nil {
resources = append(resources, buildResourceArray(*res, res.ResourceTree, res.ResourceTree, opt.Filter.Kind, opt.Filter.APIVersion)...)
} else if res.Kind == opt.Filter.Kind && res.APIVersion == opt.Filter.APIVersion {
var object unstructured.Unstructured
object.SetAPIVersion(opt.Filter.APIVersion)
object.SetKind(opt.Filter.Kind)
if err := h.cli.Get(ctx, apimachinerytypes.NamespacedName{Namespace: res.Namespace, Name: res.Name}, &object); err == nil {
resources = append(resources, buildResourceItem(*res, querytypes.Workload{
APIVersion: app.APIVersion,
Kind: app.Kind,
Name: app.Name,
Namespace: app.Namespace,
}, object))
} else {
klog.Errorf("failed to get the service:%s", err.Error())
}
}
}
return fillQueryResult(v, resources, "list")
}
func (h *provider) SearchEvents(ctx monitorContext.Context, _ wfContext.Context, v *value.Value, _ types.Action) error {
val, err := v.LookupValue("value")
if err != nil {
return err
}
cluster, err := v.GetString("cluster")
if err != nil {
return err
}
obj := new(unstructured.Unstructured)
if err = val.UnmarshalTo(obj); err != nil {
return err
}
listCtx := multicluster.ContextWithClusterName(ctx, cluster)
fieldSelector := getEventFieldSelector(obj)
eventList := corev1.EventList{}
listOpts := []client.ListOption{
client.InNamespace(obj.GetNamespace()),
client.MatchingFieldsSelector{
Selector: fieldSelector,
},
}
if err := h.cli.List(listCtx, &eventList, listOpts...); err != nil {
return v.FillObject(err.Error(), "err")
}
return fillQueryResult(v, eventList.Items, "list")
}
func (h *provider) CollectLogsInPod(ctx monitorContext.Context, _ wfContext.Context, v *value.Value, _ types.Action) error {
cluster, err := v.GetString("cluster")
if err != nil {
return errors.Wrapf(err, "invalid cluster")
}
namespace, err := v.GetString("namespace")
if err != nil {
return errors.Wrapf(err, "invalid namespace")
}
pod, err := v.GetString("pod")
if err != nil {
return errors.Wrapf(err, "invalid pod name")
}
val, err := v.LookupValue("options")
if err != nil {
return errors.Wrapf(err, "invalid log options")
}
opts := &corev1.PodLogOptions{}
if err = val.UnmarshalTo(opts); err != nil {
return errors.Wrapf(err, "invalid log options content")
}
cliCtx := multicluster.ContextWithClusterName(ctx, cluster)
h.cfg.Wrap(pkgmulticluster.NewTransportWrapper())
clientSet, err := kubernetes.NewForConfig(h.cfg)
if err != nil {
return errors.Wrapf(err, "failed to create kubernetes client")
}
var defaultOutputs = make(map[string]interface{})
var errMsg string
podInst, err := clientSet.CoreV1().Pods(namespace).Get(cliCtx, pod, v1.GetOptions{})
if err != nil {
errMsg += fmt.Sprintf("failed to get pod: %s; ", err.Error())
}
req := clientSet.CoreV1().Pods(namespace).GetLogs(pod, opts)
readCloser, err := req.Stream(cliCtx)
if err != nil {
errMsg += fmt.Sprintf("failed to get stream logs %s; ", err.Error())
}
if readCloser != nil && podInst != nil {
r := bufio.NewReader(readCloser)
buffer := bytes.NewBuffer(nil)
var readErr error
defer func() {
_ = readCloser.Close()
}()
for {
s, err := r.ReadString('\n')
buffer.WriteString(s)
if err != nil {
if !errors.Is(err, io.EOF) {
readErr = err
}
break
}
}
toDate := v1.Now()
var fromDate v1.Time
// nolint
if opts.SinceTime != nil {
fromDate = *opts.SinceTime
} else if opts.SinceSeconds != nil {
fromDate = v1.NewTime(toDate.Add(time.Duration(-(*opts.SinceSeconds) * int64(time.Second))))
} else {
fromDate = podInst.CreationTimestamp
}
// the cue string can not support the special characters
logs := base64.StdEncoding.EncodeToString(buffer.Bytes())
defaultOutputs = map[string]interface{}{
"logs": logs,
"info": map[string]interface{}{
"fromDate": fromDate,
"toDate": toDate,
},
}
if readErr != nil {
errMsg += readErr.Error()
}
}
if errMsg != "" {
klog.Warningf(errMsg)
defaultOutputs["err"] = errMsg
}
return v.FillObject(defaultOutputs, "outputs")
}
// Install register handlers to provider discover.
func Install(p types.Providers, cli client.Client, cfg *rest.Config) {
prd := &provider{
cli: cli,
cfg: cfg,
}
p.Register(ProviderName, map[string]types.Handler{
"listResourcesInApp": prd.ListResourcesInApp,
"listAppliedResources": prd.ListAppliedResources,
"collectResources": prd.CollectResources,
"searchEvents": prd.SearchEvents,
"collectLogsInPod": prd.CollectLogsInPod,
"collectServiceEndpoints": prd.CollectServiceEndpoints,
})
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,85 +0,0 @@
/*
Copyright 2021. The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package query
import (
"context"
"testing"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/client-go/rest"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"github.com/oam-dev/kubevela/pkg/utils/common"
)
var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment
var ctx context.Context
var _ = BeforeSuite(func() {
By("bootstrapping test environment")
testEnv = &envtest.Environment{
ControlPlaneStartTimeout: time.Minute * 3,
ControlPlaneStopTimeout: time.Minute,
UseExistingCluster: pointer.Bool(false),
CRDDirectoryPaths: []string{
"./testdata/gateway/crds",
"../../../../charts/vela-core/crds",
"./testdata/machinelearning.seldon.io_seldondeployments.yaml",
"./testdata/helm-release-crd.yaml",
},
}
By("start kube test env")
var err error
cfg, err = testEnv.Start()
Expect(err).ShouldNot(HaveOccurred())
Expect(cfg).ToNot(BeNil())
By("new kube client")
cfg.Timeout = time.Minute * 2
scheme := common.Scheme
batchv1.AddToScheme(scheme)
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme})
Expect(err).Should(BeNil())
Expect(k8sClient).ToNot(BeNil())
ctx = context.Background()
Expect(err).To(BeNil())
})
var _ = AfterSuite(func() {
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).ToNot(HaveOccurred())
})
func TestQueryProvider(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "VelaQL Suite")
}

View File

@@ -1,39 +0,0 @@
apiVersion: gateway.networking.k8s.io/v1beta1
kind: Gateway
metadata:
annotations:
oam.dev/kubevela-version: v1.5.0-alpha.2
ports-mapping: "8000:80,8443:443"
labels:
addons.oam.dev/name: velaux
addons.oam.dev/registry: KubeVela
addons.oam.dev/version: v1.5.0-alpha.3
app.oam.dev/app-revision-hash: 33e813ddfe9a34be
app.oam.dev/appRevision: addon-velaux-v36
app.oam.dev/cluster: ""
app.oam.dev/component: velaux
app.oam.dev/name: addon-velaux
app.oam.dev/namespace: vela-system
app.oam.dev/resourceType: TRAIT
app.oam.dev/revision: velaux-v16
oam.dev/render-hash: e7be271bfad2cb55
trait.oam.dev/resource: gateway
trait.oam.dev/type: https-route
name: velaux-gateway-tls
namespace: vela-system
spec:
gatewayClassName: traefik
listeners:
- allowedRoutes:
namespaces:
from: Same
name: kubevela
port: 8443
protocol: HTTPS
tls:
certificateRefs:
- group: ""
kind: Secret
name: kubevela
namespace: vela-system
mode: Terminate

View File

@@ -1,16 +0,0 @@
apiVersion: gateway.networking.k8s.io/v1beta1
kind: Gateway
metadata:
name: traefik-gateway
namespace: vela-system
annotations:
ports-mapping: "8000:80,8443:443"
spec:
gatewayClassName: traefik
listeners:
- allowedRoutes:
namespaces:
from: All
name: web
port: 8000
protocol: HTTP

View File

@@ -1,40 +0,0 @@
apiVersion: gateway.networking.k8s.io/v1beta1
kind: HTTPRoute
metadata:
name: http-test-route
namespace: default
spec:
hostnames:
- gateway.domain
parentRefs:
- group: gateway.networking.k8s.io
kind: Gateway
name: traefik-gateway
namespace: vela-system
sectionName: web
rules:
- backendRefs:
- group: ""
kind: Service
name: game2048
port: 80
weight: 1
matches:
- path:
type: PathPrefix
value: /
- group: ""
kind: Service
name: game2048
port: 80
weight: 1
- backendRefs:
- group: ""
kind: Service
name: game2048-2
port: 80
weight: 1
matches:
- path:
type: PathPrefix
value: /api

View File

@@ -1,41 +0,0 @@
apiVersion: gateway.networking.k8s.io/v1beta1
kind: HTTPRoute
metadata:
annotations:
oam.dev/kubevela-version: v1.5.0-alpha.2
labels:
addons.oam.dev/name: velaux
addons.oam.dev/registry: KubeVela
addons.oam.dev/version: v1.5.0-alpha.3
app.oam.dev/app-revision-hash: 33e813ddfe9a34be
app.oam.dev/appRevision: addon-velaux-v36
app.oam.dev/cluster: ""
app.oam.dev/component: velaux
app.oam.dev/name: addon-velaux
app.oam.dev/namespace: vela-system
app.oam.dev/resourceType: TRAIT
app.oam.dev/revision: velaux-v16
oam.dev/render-hash: 2e8aa179bec2b4ec
trait.oam.dev/resource: httpsRoute
trait.oam.dev/type: https-route
name: velaux-ssl
namespace: default
spec:
hostnames:
- demo.kubevela.net
parentRefs:
- group: gateway.networking.k8s.io
kind: Gateway
name: velaux-gateway-tls
namespace: vela-system
rules:
- backendRefs:
- group: ""
kind: Service
name: velaux
port: 80
weight: 1
matches:
- path:
type: PathPrefix
value: /

View File

@@ -1,736 +0,0 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: helmreleases.helm.toolkit.fluxcd.io
spec:
conversion:
strategy: None
group: helm.toolkit.fluxcd.io
names:
kind: HelmRelease
listKind: HelmReleaseList
plural: helmreleases
shortNames:
- hr
singular: helmrelease
scope: Namespaced
versions:
- additionalPrinterColumns:
- jsonPath: .status.conditions[?(@.type=="Ready")].status
name: Ready
type: string
- jsonPath: .status.conditions[?(@.type=="Ready")].message
name: Status
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
name: v2beta1
schema:
openAPIV3Schema:
description: HelmRelease is the Schema for the helmreleases API
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: HelmReleaseSpec defines the desired state of a Helm release.
properties:
chart:
description: Chart defines the template of the v1beta1.HelmChart that
should be created for this HelmRelease.
properties:
spec:
description: Spec holds the template for the v1beta1.HelmChartSpec
for this HelmRelease.
properties:
chart:
description: The name or path the Helm chart is available
at in the SourceRef.
type: string
interval:
description: Interval at which to check the v1beta1.Source
for updates. Defaults to 'HelmReleaseSpec.Interval'.
type: string
sourceRef:
description: The name and namespace of the v1beta1.Source
the chart is available at.
properties:
apiVersion:
description: APIVersion of the referent.
type: string
kind:
description: Kind of the referent.
enum:
- HelmRepository
- GitRepository
- Bucket
type: string
name:
description: Name of the referent.
maxLength: 253
minLength: 1
type: string
namespace:
description: Namespace of the referent.
maxLength: 63
minLength: 1
type: string
required:
- name
type: object
valuesFile:
description: Alternative values file to use as the default
chart values, expected to be a relative path in the SourceRef.
Deprecated in favor of ValuesFiles, for backwards compatibility
the file defined here is merged before the ValuesFiles items.
Ignored when omitted.
type: string
valuesFiles:
description: Alternative list of values files to use as the
chart values (values.yaml is not included by default), expected
to be a relative path in the SourceRef. Values files are
merged in the order of this list with the last file overriding
the first. Ignored when omitted.
items:
type: string
type: array
version:
default: '*'
description: Version semver expression, ignored for charts
from v1beta1.GitRepository and v1beta1.Bucket sources. Defaults
to latest when omitted.
type: string
required:
- chart
- sourceRef
type: object
required:
- spec
type: object
dependsOn:
description: DependsOn may contain a dependency.CrossNamespaceDependencyReference
slice with references to HelmRelease resources that must be ready
before this HelmRelease can be reconciled.
items:
description: CrossNamespaceDependencyReference holds the reference
to a dependency.
properties:
name:
description: Name holds the name reference of a dependency.
type: string
namespace:
description: Namespace holds the namespace reference of a dependency.
type: string
required:
- name
type: object
type: array
install:
description: Install holds the configuration for Helm install actions
for this HelmRelease.
properties:
crds:
description: "CRDs upgrade CRDs from the Helm Chart's crds directory
according to the CRD upgrade policy provided here. Valid values
are `Skip`, `Create` or `CreateReplace`. Default is `Create`
and if omitted CRDs are installed but not updated. \n Skip:
do neither install nor replace (update) any CRDs. \n Create:
new CRDs are created, existing CRDs are neither updated nor
deleted. \n CreateReplace: new CRDs are created, existing CRDs
are updated (replaced) but not deleted. \n By default, CRDs
are applied (installed) during Helm install action. With this
option users can opt-in to CRD replace existing CRDs on Helm
install actions, which is not (yet) natively supported by Helm.
https://helm.sh/docs/chart_best_practices/custom_resource_definitions."
enum:
- Skip
- Create
- CreateReplace
type: string
createNamespace:
description: CreateNamespace tells the Helm install action to
create the HelmReleaseSpec.TargetNamespace if it does not exist
yet. On uninstall, the namespace will not be garbage collected.
type: boolean
disableHooks:
description: DisableHooks prevents hooks from running during the
Helm install action.
type: boolean
disableOpenAPIValidation:
description: DisableOpenAPIValidation prevents the Helm install
action from validating rendered templates against the Kubernetes
OpenAPI Schema.
type: boolean
disableWait:
description: DisableWait disables the waiting for resources to
be ready after a Helm install has been performed.
type: boolean
disableWaitForJobs:
description: DisableWaitForJobs disables waiting for jobs to complete
after a Helm install has been performed.
type: boolean
remediation:
description: Remediation holds the remediation configuration for
when the Helm install action for the HelmRelease fails. The
default is to not perform any action.
properties:
ignoreTestFailures:
description: IgnoreTestFailures tells the controller to skip
remediation when the Helm tests are run after an install
action but fail. Defaults to 'Test.IgnoreFailures'.
type: boolean
remediateLastFailure:
description: RemediateLastFailure tells the controller to
remediate the last failure, when no retries remain. Defaults
to 'false'.
type: boolean
retries:
description: Retries is the number of retries that should
be attempted on failures before bailing. Remediation, using
an uninstall, is performed between each attempt. Defaults
to '0', a negative integer equals to unlimited retries.
type: integer
type: object
replace:
description: Replace tells the Helm install action to re-use the
'ReleaseName', but only if that name is a deleted release which
remains in the history.
type: boolean
skipCRDs:
description: "SkipCRDs tells the Helm install action to not install
any CRDs. By default, CRDs are installed if not already present.
\n Deprecated use CRD policy (`crds`) attribute with value `Skip`
instead."
type: boolean
timeout:
description: Timeout is the time to wait for any individual Kubernetes
operation (like Jobs for hooks) during the performance of a
Helm install action. Defaults to 'HelmReleaseSpec.Timeout'.
type: string
type: object
interval:
description: Interval at which to reconcile the Helm release.
type: string
kubeConfig:
description: KubeConfig for reconciling the HelmRelease on a remote
cluster. When specified, KubeConfig takes precedence over ServiceAccountName.
properties:
secretRef:
description: SecretRef holds the name to a secret that contains
a 'value' key with the kubeconfig file as the value. It must
be in the same namespace as the HelmRelease. It is recommended
that the kubeconfig is self-contained, and the secret is regularly
updated if credentials such as a cloud-access-token expire.
Cloud specific `cmd-path` auth helpers will not function without
adding binaries and credentials to the Pod that is responsible
for reconciling the HelmRelease.
properties:
name:
description: Name of the referent
type: string
required:
- name
type: object
type: object
maxHistory:
description: MaxHistory is the number of revisions saved by Helm for
this HelmRelease. Use '0' for an unlimited number of revisions;
defaults to '10'.
type: integer
postRenderers:
description: PostRenderers holds an array of Helm PostRenderers, which
will be applied in order of their definition.
items:
description: PostRenderer contains a Helm PostRenderer specification.
properties:
kustomize:
description: Kustomization to apply as PostRenderer.
properties:
images:
description: Images is a list of (image name, new name,
new tag or digest) for changing image names, tags or digests.
This can also be achieved with a patch, but this operator
is simpler to specify.
items:
description: Image contains an image name, a new name,
a new tag or digest, which will replace the original
name and tag.
properties:
digest:
description: Digest is the value used to replace the
original image tag. If digest is present NewTag
value is ignored.
type: string
name:
description: Name is a tag-less image name.
type: string
newName:
description: NewName is the value used to replace
the original name.
type: string
newTag:
description: NewTag is the value used to replace the
original tag.
type: string
required:
- name
type: object
type: array
patchesJson6902:
description: JSON 6902 patches, defined as inline YAML objects.
items:
description: JSON6902Patch contains a JSON6902 patch and
the target the patch should be applied to.
properties:
patch:
description: Patch contains the JSON6902 patch document
with an array of operation objects.
items:
description: JSON6902 is a JSON6902 operation object.
https://tools.ietf.org/html/rfc6902#section-4
properties:
from:
type: string
op:
enum:
- test
- remove
- add
- replace
- move
- copy
type: string
path:
type: string
value:
x-kubernetes-preserve-unknown-fields: true
required:
- op
- path
type: object
type: array
target:
description: Target points to the resources that the
patch document should be applied to.
properties:
annotationSelector:
description: AnnotationSelector is a string that
follows the label selection expression https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#api
It matches with the resource annotations.
type: string
group:
description: Group is the API group to select
resources from. Together with Version and Kind
it is capable of unambiguously identifying and/or
selecting resources. https://github.com/kubernetes/community/blob/master/contributors/design-proposals/api-machinery/api-group.md
type: string
kind:
description: Kind of the API Group to select resources
from. Together with Group and Version it is
capable of unambiguously identifying and/or
selecting resources. https://github.com/kubernetes/community/blob/master/contributors/design-proposals/api-machinery/api-group.md
type: string
labelSelector:
description: LabelSelector is a string that follows
the label selection expression https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#api
It matches with the resource labels.
type: string
name:
description: Name to match resources with.
type: string
namespace:
description: Namespace to select resources from.
type: string
version:
description: Version of the API Group to select
resources from. Together with Group and Kind
it is capable of unambiguously identifying and/or
selecting resources. https://github.com/kubernetes/community/blob/master/contributors/design-proposals/api-machinery/api-group.md
type: string
type: object
required:
- patch
- target
type: object
type: array
patchesStrategicMerge:
description: Strategic merge patches, defined as inline
YAML objects.
items:
x-kubernetes-preserve-unknown-fields: true
type: array
type: object
type: object
type: array
releaseName:
description: ReleaseName used for the Helm release. Defaults to a
composition of '[TargetNamespace-]Name'.
maxLength: 53
minLength: 1
type: string
rollback:
description: Rollback holds the configuration for Helm rollback actions
for this HelmRelease.
properties:
cleanupOnFail:
description: CleanupOnFail allows deletion of new resources created
during the Helm rollback action when it fails.
type: boolean
disableHooks:
description: DisableHooks prevents hooks from running during the
Helm rollback action.
type: boolean
disableWait:
description: DisableWait disables the waiting for resources to
be ready after a Helm rollback has been performed.
type: boolean
disableWaitForJobs:
description: DisableWaitForJobs disables waiting for jobs to complete
after a Helm rollback has been performed.
type: boolean
force:
description: Force forces resource updates through a replacement
strategy.
type: boolean
recreate:
description: Recreate performs pod restarts for the resource if
applicable.
type: boolean
timeout:
description: Timeout is the time to wait for any individual Kubernetes
operation (like Jobs for hooks) during the performance of a
Helm rollback action. Defaults to 'HelmReleaseSpec.Timeout'.
type: string
type: object
serviceAccountName:
description: The name of the Kubernetes service account to impersonate
when reconciling this HelmRelease.
type: string
storageNamespace:
description: StorageNamespace used for the Helm storage. Defaults
to the namespace of the HelmRelease.
maxLength: 63
minLength: 1
type: string
suspend:
description: Suspend tells the controller to suspend reconciliation
for this HelmRelease, it does not apply to already started reconciliations.
Defaults to false.
type: boolean
targetNamespace:
description: TargetNamespace to target when performing operations
for the HelmRelease. Defaults to the namespace of the HelmRelease.
maxLength: 63
minLength: 1
type: string
test:
description: Test holds the configuration for Helm test actions for
this HelmRelease.
properties:
enable:
description: Enable enables Helm test actions for this HelmRelease
after an Helm install or upgrade action has been performed.
type: boolean
ignoreFailures:
description: IgnoreFailures tells the controller to skip remediation
when the Helm tests are run but fail. Can be overwritten for
tests run after install or upgrade actions in 'Install.IgnoreTestFailures'
and 'Upgrade.IgnoreTestFailures'.
type: boolean
timeout:
description: Timeout is the time to wait for any individual Kubernetes
operation during the performance of a Helm test action. Defaults
to 'HelmReleaseSpec.Timeout'.
type: string
type: object
timeout:
description: Timeout is the time to wait for any individual Kubernetes
operation (like Jobs for hooks) during the performance of a Helm
action. Defaults to '5m0s'.
type: string
uninstall:
description: Uninstall holds the configuration for Helm uninstall
actions for this HelmRelease.
properties:
disableHooks:
description: DisableHooks prevents hooks from running during the
Helm rollback action.
type: boolean
keepHistory:
description: KeepHistory tells Helm to remove all associated resources
and mark the release as deleted, but retain the release history.
type: boolean
timeout:
description: Timeout is the time to wait for any individual Kubernetes
operation (like Jobs for hooks) during the performance of a
Helm uninstall action. Defaults to 'HelmReleaseSpec.Timeout'.
type: string
type: object
upgrade:
description: Upgrade holds the configuration for Helm upgrade actions
for this HelmRelease.
properties:
cleanupOnFail:
description: CleanupOnFail allows deletion of new resources created
during the Helm upgrade action when it fails.
type: boolean
crds:
description: "CRDs upgrade CRDs from the Helm Chart's crds directory
according to the CRD upgrade policy provided here. Valid values
are `Skip`, `Create` or `CreateReplace`. Default is `Skip` and
if omitted CRDs are neither installed nor upgraded. \n Skip:
do neither install nor replace (update) any CRDs. \n Create:
new CRDs are created, existing CRDs are neither updated nor
deleted. \n CreateReplace: new CRDs are created, existing CRDs
are updated (replaced) but not deleted. \n By default, CRDs
are not applied during Helm upgrade action. With this option
users can opt-in to CRD upgrade, which is not (yet) natively
supported by Helm. https://helm.sh/docs/chart_best_practices/custom_resource_definitions."
enum:
- Skip
- Create
- CreateReplace
type: string
disableHooks:
description: DisableHooks prevents hooks from running during the
Helm upgrade action.
type: boolean
disableOpenAPIValidation:
description: DisableOpenAPIValidation prevents the Helm upgrade
action from validating rendered templates against the Kubernetes
OpenAPI Schema.
type: boolean
disableWait:
description: DisableWait disables the waiting for resources to
be ready after a Helm upgrade has been performed.
type: boolean
disableWaitForJobs:
description: DisableWaitForJobs disables waiting for jobs to complete
after a Helm upgrade has been performed.
type: boolean
force:
description: Force forces resource updates through a replacement
strategy.
type: boolean
preserveValues:
description: PreserveValues will make Helm reuse the last release's
values and merge in overrides from 'Values'. Setting this flag
makes the HelmRelease non-declarative.
type: boolean
remediation:
description: Remediation holds the remediation configuration for
when the Helm upgrade action for the HelmRelease fails. The
default is to not perform any action.
properties:
ignoreTestFailures:
description: IgnoreTestFailures tells the controller to skip
remediation when the Helm tests are run after an upgrade
action but fail. Defaults to 'Test.IgnoreFailures'.
type: boolean
remediateLastFailure:
description: RemediateLastFailure tells the controller to
remediate the last failure, when no retries remain. Defaults
to 'false' unless 'Retries' is greater than 0.
type: boolean
retries:
description: Retries is the number of retries that should
be attempted on failures before bailing. Remediation, using
'Strategy', is performed between each attempt. Defaults
to '0', a negative integer equals to unlimited retries.
type: integer
strategy:
description: Strategy to use for failure remediation. Defaults
to 'rollback'.
enum:
- rollback
- uninstall
type: string
type: object
timeout:
description: Timeout is the time to wait for any individual Kubernetes
operation (like Jobs for hooks) during the performance of a
Helm upgrade action. Defaults to 'HelmReleaseSpec.Timeout'.
type: string
type: object
values:
description: Values holds the values for this Helm release.
x-kubernetes-preserve-unknown-fields: true
valuesFrom:
description: ValuesFrom holds references to resources containing Helm
values for this HelmRelease, and information about how they should
be merged.
items:
description: ValuesReference contains a reference to a resource
containing Helm values, and optionally the key they can be found
at.
properties:
kind:
description: Kind of the values referent, valid values are ('Secret',
'ConfigMap').
enum:
- Secret
- ConfigMap
type: string
name:
description: Name of the values referent. Should reside in the
same namespace as the referring resource.
maxLength: 253
minLength: 1
type: string
optional:
description: Optional marks this ValuesReference as optional.
When set, a not found error for the values reference is ignored,
but any ValuesKey, TargetPath or transient error will still
result in a reconciliation failure.
type: boolean
targetPath:
description: TargetPath is the YAML dot notation path the value
should be merged at. When set, the ValuesKey is expected to
be a single flat value. Defaults to 'None', which results
in the values getting merged at the root.
type: string
valuesKey:
description: ValuesKey is the data key where the values.yaml
or a specific value can be found at. Defaults to 'values.yaml'.
type: string
required:
- kind
- name
type: object
type: array
required:
- chart
- interval
type: object
status:
description: HelmReleaseStatus defines the observed state of a HelmRelease.
properties:
conditions:
description: Conditions holds the conditions for the HelmRelease.
items:
description: "Condition contains details for one aspect of the current
state of this API Resource. --- This struct is intended for direct
use as an array at the field path .status.conditions. For example,
type FooStatus struct{ // Represents the observations of a
foo's current state. // Known .status.conditions.type are:
\"Available\", \"Progressing\", and \"Degraded\" // +patchMergeKey=type
\ // +patchStrategy=merge // +listType=map // +listMapKey=type
\ Conditions []metav1.Condition `json:\"conditions,omitempty\"
patchStrategy:\"merge\" patchMergeKey:\"type\" protobuf:\"bytes,1,rep,name=conditions\"`
\n // other fields }"
properties:
lastTransitionTime:
description: lastTransitionTime is the last time the condition
transitioned from one status to another. This should be when
the underlying condition changed. If that is not known, then
using the time when the API field changed is acceptable.
format: date-time
type: string
message:
description: message is a human readable message indicating
details about the transition. This may be an empty string.
maxLength: 32768
type: string
observedGeneration:
description: observedGeneration represents the .metadata.generation
that the condition was set based upon. For instance, if .metadata.generation
is currently 12, but the .status.conditions[x].observedGeneration
is 9, the condition is out of date with respect to the current
state of the instance.
format: int64
minimum: 0
type: integer
reason:
description: reason contains a programmatic identifier indicating
the reason for the condition's last transition. Producers
of specific condition types may define expected values and
meanings for this field, and whether the values are considered
a guaranteed API. The value should be a CamelCase string.
This field may not be empty.
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
type: string
status:
description: status of the condition, one of True, False, Unknown.
enum:
- "True"
- "False"
- Unknown
type: string
type:
description: type of condition in CamelCase or in foo.example.com/CamelCase.
--- Many .condition.type values are consistent across resources
like Available, but because arbitrary conditions can be useful
(see .node.status.conditions), the ability to deconflict is
important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt)
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string
required:
- lastTransitionTime
- message
- reason
- status
- type
type: object
type: array
failures:
description: Failures is the reconciliation failure count against
the latest desired state. It is reset after a successful reconciliation.
format: int64
type: integer
helmChart:
description: HelmChart is the namespaced name of the HelmChart resource
created by the controller for the HelmRelease.
type: string
installFailures:
description: InstallFailures is the install failure count against
the latest desired state. It is reset after a successful reconciliation.
format: int64
type: integer
lastAppliedRevision:
description: LastAppliedRevision is the revision of the last successfully
applied source.
type: string
lastAttemptedRevision:
description: LastAttemptedRevision is the revision of the last reconciliation
attempt.
type: string
lastAttemptedValuesChecksum:
description: LastAttemptedValuesChecksum is the SHA1 checksum of the
values of the last reconciliation attempt.
type: string
lastHandledReconcileAt:
description: LastHandledReconcileAt holds the value of the most recent
reconcile request value, so a change can be detected.
type: string
lastReleaseRevision:
description: LastReleaseRevision is the revision of the last successful
Helm release.
type: integer
observedGeneration:
description: ObservedGeneration is the last observed generation.
format: int64
type: integer
upgradeFailures:
description: UpgradeFailures is the upgrade failure count against
the latest desired state. It is reset after a successful reconciliation.
format: int64
type: integer
type: object
type: object
served: true
storage: true
subresources:
status: {}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,38 +0,0 @@
/*
Copyright 2021 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
// HealthStatusCode Represents resource health status
type HealthStatusCode string
const (
// HealthStatusHealthy resource is healthy
HealthStatusHealthy HealthStatusCode = "Healthy"
// HealthStatusUnHealthy resource is unhealthy
HealthStatusUnHealthy HealthStatusCode = "UnHealthy"
// HealthStatusProgressing resource is still progressing
HealthStatusProgressing HealthStatusCode = "Progressing"
// HealthStatusUnKnown health status is unknown
HealthStatusUnKnown HealthStatusCode = "UnKnown"
)
// HealthStatus the resource health status
type HealthStatus struct {
Status HealthStatusCode `json:"statusCode"`
Reason string `json:"reason"`
Message string `json:"message"`
}

View File

@@ -1,189 +0,0 @@
/*
Copyright 2021 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"fmt"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
)
const (
// HTTPS https protocol name
HTTPS = "https"
// HTTP http protocol name
HTTP = "http"
// Mysql mysql protocol name
Mysql = "mysql"
// Redis redis protocol name
Redis = "redis"
)
// ServiceEndpoint record the access endpoints of the application services
type ServiceEndpoint struct {
Endpoint Endpoint `json:"endpoint"`
Ref corev1.ObjectReference `json:"ref"`
Cluster string `json:"cluster"`
Component string `json:"component"`
}
// String return endpoint URL
func (s *ServiceEndpoint) String() string {
if s.Endpoint.Host == "" && s.Endpoint.Port == 0 {
return "-"
}
protocol := strings.ToLower(string(s.Endpoint.Protocol))
if s.Endpoint.AppProtocol != nil && *s.Endpoint.AppProtocol != "" {
protocol = *s.Endpoint.AppProtocol
}
path := s.Endpoint.Path
if s.Endpoint.Path == "/" {
path = ""
}
if (protocol == HTTPS && s.Endpoint.Port == 443) || (protocol == HTTP && s.Endpoint.Port == 80) {
return fmt.Sprintf("%s://%s%s", protocol, s.Endpoint.Host, path)
}
if protocol == "tcp" {
return fmt.Sprintf("%s:%d%s", s.Endpoint.Host, s.Endpoint.Port, path)
}
if s.Endpoint.Port == 0 {
return fmt.Sprintf("%s://%s%s", protocol, s.Endpoint.Host, path)
}
return fmt.Sprintf("%s://%s:%d%s", protocol, s.Endpoint.Host, s.Endpoint.Port, path)
}
// Endpoint create by ingress or service
type Endpoint struct {
// The protocol for this endpoint. Supports "TCP", "UDP", and "SCTP".
// Default is TCP.
// +default="TCP"
// +optional
Protocol corev1.Protocol `json:"protocol,omitempty"`
// The protocol for this endpoint.
// Un-prefixed names are reserved for IANA standard service names (as per
// RFC-6335 and http://www.iana.org/assignments/service-names).
// +optional
AppProtocol *string `json:"appProtocol,omitempty"`
// the host for the endpoint, it could be IP or domain
Host string `json:"host"`
// the port for the endpoint
// Default is 80.
Port int `json:"port"`
// +optional
// the name of the port
PortName string `json:"portName,omitempty"`
// the path for the endpoint
Path string `json:"path,omitempty"`
// Inner means the endpoint is only accessible within the cluster.
Inner bool `json:"inner,omitempty"`
}
// AppliedResource resource metadata
type AppliedResource struct {
Cluster string `json:"cluster"`
Component string `json:"component"`
Trait string `json:"trait"`
Kind string `json:"kind"`
Namespace string `json:"namespace,omitempty"`
Name string `json:"name,omitempty"`
UID types.UID `json:"uid,omitempty"`
APIVersion string `json:"apiVersion,omitempty"`
ResourceVersion string `json:"resourceVersion,omitempty"`
DeployVersion string `json:"deployVersion,omitempty"`
PublishVersion string `json:"publishVersion,omitempty"`
Revision string `json:"revision,omitempty"`
Latest bool `json:"latest"`
ResourceTree *ResourceTreeNode `json:"resourceTree,omitempty"`
}
// ResourceTreeNode is the tree node of every resource
type ResourceTreeNode struct {
Cluster string `json:"cluster"`
APIVersion string `json:"apiVersion,omitempty"`
Kind string `json:"kind"`
Namespace string `json:"namespace,omitempty"`
Name string `json:"name,omitempty"`
UID types.UID `json:"uid,omitempty"`
HealthStatus HealthStatus `json:"healthStatus,omitempty"`
DeletionTimestamp time.Time `json:"deletionTimestamp,omitempty"`
CreationTimestamp time.Time `json:"creationTimestamp,omitempty"`
LeafNodes []*ResourceTreeNode `json:"leafNodes,omitempty"`
AdditionalInfo map[string]interface{} `json:"additionalInfo,omitempty"`
Object unstructured.Unstructured `json:"-"`
}
// GroupVersionKind returns the stored group, version, and kind from AppliedResource
func (obj *AppliedResource) GroupVersionKind() schema.GroupVersionKind {
return schema.FromAPIVersionAndKind(obj.APIVersion, obj.Kind)
}
// GroupVersionKind returns the stored group, version, and kind from ResourceTreeNode
func (rtn *ResourceTreeNode) GroupVersionKind() schema.GroupVersionKind {
return schema.FromAPIVersionAndKind(rtn.APIVersion, rtn.Kind)
}
// ResourceItem the resource base info struct
type ResourceItem struct {
Cluster string `json:"cluster"`
Workload Workload `json:"workload"`
Component string `json:"component"`
Object unstructured.Unstructured `json:"object"`
PublishVersion string `json:"publishVersion"`
DeployVersion string `json:"deployVersion"`
}
// Workload workload resource base info
type Workload struct {
APIVersion string `json:"apiVersion"`
Kind string `json:"kind"`
Name string `json:"name"`
Namespace string `json:"namespace"`
}
// PodBase the struct of pod list
type PodBase struct {
Cluster string `json:"cluster"`
Workload Workload `json:"workload"`
Component string `json:"component"`
Metadata struct {
CreationTime string `json:"creationTime"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Version struct {
PublishVersion string `json:"publishVersion"`
DeployVersion string `json:"deployVersion"`
} `json:"version"`
Labels map[string]string `json:"labels"`
} `json:"metadata"`
Status struct {
HostIP string `json:"hostIP"`
NodeName string `json:"nodeName"`
Phase string `json:"phase"`
PodIP string `json:"podIP"`
} `json:"status"`
}

View File

@@ -1,83 +0,0 @@
/*
Copyright 2022. The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package query
import (
"encoding/json"
cuejson "cuelang.org/go/pkg/encoding/json"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/kubevela/workflow/pkg/cue/model/value"
"github.com/oam-dev/kubevela/pkg/oam"
querytypes "github.com/oam-dev/kubevela/pkg/velaql/providers/query/types"
)
// fillQueryResult help fill query result which contains k8s object to *value.Value
func fillQueryResult(v *value.Value, res interface{}, paths ...string) error {
b, err := json.Marshal(res)
if err != nil {
return v.FillObject(err, "err")
}
expr, err := cuejson.Unmarshal(b)
if err != nil {
return v.FillObject(err, "err")
}
err = v.FillObject(expr, paths...)
if err != nil {
return err
}
return v.Error()
}
func buildResourceArray(res querytypes.AppliedResource, parent, node *querytypes.ResourceTreeNode, kind string, apiVersion string) (pods []querytypes.ResourceItem) {
if node.LeafNodes != nil {
for _, subNode := range node.LeafNodes {
pods = append(pods, buildResourceArray(res, node, subNode, kind, apiVersion)...)
}
} else if node.Kind == kind && node.APIVersion == apiVersion {
pods = append(pods, buildResourceItem(res, querytypes.Workload{
APIVersion: parent.APIVersion,
Kind: parent.Kind,
Name: parent.Name,
Namespace: parent.Namespace,
}, node.Object))
}
return
}
func buildResourceItem(res querytypes.AppliedResource, workload querytypes.Workload, object unstructured.Unstructured) querytypes.ResourceItem {
return querytypes.ResourceItem{
Cluster: res.Cluster,
Workload: workload,
Component: res.Component,
Object: object,
PublishVersion: func() string {
if object.GetAnnotations()[oam.AnnotationPublishVersion] != "" {
return object.GetAnnotations()[oam.AnnotationPublishVersion]
}
return res.PublishVersion
}(),
DeployVersion: func() string {
if object.GetAnnotations()[oam.AnnotationDeployVersion] != "" {
return object.GetAnnotations()[oam.AnnotationDeployVersion]
}
return res.DeployVersion
}(),
}
}

View File

@@ -1,83 +0,0 @@
/*
Copyright 2022. The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package query
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/kubevela/workflow/pkg/cue/model/value"
)
func TestFillQueryResult(t *testing.T) {
testcases := map[string]struct {
queryRes interface{}
json string
}{
"test fill query result which contains *unstructured.Unstructured": {
queryRes: []Resource{
{
Cluster: "local",
Component: "web",
Revision: "v1",
Object: &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"spec": map[string]interface{}{
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"creationTimestamp": nil,
},
},
},
},
},
},
{
Cluster: "ap-southeast-1",
Component: "web",
Revision: "v2",
Object: &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"creationTimestamp": "2022-05-25T12:07:02Z",
},
},
},
},
},
json: `{"list":[{"cluster":"local","component":"web","revision":"v1","object":{"apiVersion":"apps/v1","kind":"Deployment","spec":{"template":{"metadata":{"creationTimestamp":null}}}}},{"cluster":"ap-southeast-1","component":"web","revision":"v2","object":{"apiVersion":"apps/v1","kind":"Deployment","metadata":{"creationTimestamp":"2022-05-25T12:07:02Z"}}}]}`,
},
}
for name, testcase := range testcases {
t.Run(name, func(t *testing.T) {
value, err := value.NewValue("", nil, "")
assert.NoError(t, err)
err = fillQueryResult(value, testcase.queryRes, "list")
assert.NoError(t, err)
json, err := value.CueValue().MarshalJSON()
assert.NoError(t, err)
assert.Equal(t, testcase.json, string(json))
})
}
}

View File

@@ -22,6 +22,7 @@ import (
"testing"
"time"
"github.com/kubevela/pkg/util/singleton"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
@@ -31,8 +32,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"github.com/kubevela/workflow/pkg/cue/packages"
"github.com/oam-dev/kubevela/pkg/utils/common"
)
@@ -52,7 +51,9 @@ var _ = BeforeSuite(func() {
ControlPlaneStartTimeout: time.Minute * 3,
ControlPlaneStopTimeout: time.Minute,
UseExistingCluster: pointer.Bool(false),
CRDDirectoryPaths: []string{"../../charts/vela-core/crds"},
CRDDirectoryPaths: []string{
"../../../../charts/vela-core/crds",
},
}
By("start kube test env")
@@ -67,11 +68,9 @@ var _ = BeforeSuite(func() {
Expect(err).Should(BeNil())
Expect(k8sClient).ToNot(BeNil())
By("new kube client success")
singleton.KubeClient.Set(k8sClient)
pd, err := packages.NewPackageDiscover(cfg)
Expect(err).To(BeNil())
viewHandler = NewViewHandler(k8sClient, cfg, pd)
viewHandler = NewViewHandler(k8sClient, cfg)
ctx := context.Background()
ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "vela-system"}}

View File

@@ -20,9 +20,10 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"cuelang.org/go/cue"
"cuelang.org/go/cue/cuecontext"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -30,37 +31,23 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
pkgtypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
monitorContext "github.com/kubevela/pkg/monitor/context"
"github.com/kubevela/pkg/cue/cuex"
workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1"
"github.com/kubevela/workflow/pkg/cue/model/sets"
"github.com/kubevela/workflow/pkg/cue/model/value"
"github.com/kubevela/workflow/pkg/cue/packages"
"github.com/kubevela/workflow/pkg/executor"
"github.com/kubevela/workflow/pkg/generator"
"github.com/kubevela/workflow/pkg/providers"
"github.com/kubevela/workflow/pkg/providers/kube"
wfTypes "github.com/kubevela/workflow/pkg/types"
providertypes "github.com/kubevela/workflow/pkg/providers/types"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/cue/process"
"github.com/oam-dev/kubevela/pkg/multicluster"
oamutil "github.com/oam-dev/kubevela/pkg/oam/util"
"github.com/oam-dev/kubevela/pkg/stdlib"
"github.com/oam-dev/kubevela/pkg/utils"
"github.com/oam-dev/kubevela/pkg/utils/apply"
"github.com/oam-dev/kubevela/pkg/velaql/providers/query"
"github.com/oam-dev/kubevela/pkg/workflow/providers"
oamprovidertypes "github.com/oam-dev/kubevela/pkg/workflow/providers/legacy/types"
"github.com/oam-dev/kubevela/pkg/workflow/template"
)
func init() {
if err := stdlib.SetupBuiltinImports(); err != nil {
klog.ErrorS(err, "Unable to set up builtin imports on package initialization")
os.Exit(1)
}
}
const (
qlNs = "vela-system"
@@ -72,94 +59,50 @@ const (
type ViewHandler struct {
cli client.Client
cfg *rest.Config
viewTask workflowv1alpha1.WorkflowStep
pd *packages.PackageDiscover
namespace string
}
// NewViewHandler new view handler
func NewViewHandler(cli client.Client, cfg *rest.Config, pd *packages.PackageDiscover) *ViewHandler {
func NewViewHandler(cli client.Client, cfg *rest.Config) *ViewHandler {
return &ViewHandler{
cli: cli,
cfg: cfg,
pd: pd,
namespace: qlNs,
}
}
// QueryView generate view step
func (handler *ViewHandler) QueryView(ctx context.Context, qv QueryView) (*value.Value, error) {
func (handler *ViewHandler) QueryView(ctx context.Context, qv QueryView) (cue.Value, error) {
outputsTemplate := fmt.Sprintf(OutputsTemplate, qv.Export, qv.Export)
queryKey := QueryParameterKey{}
if err := json.Unmarshal([]byte(outputsTemplate), &queryKey); err != nil {
return nil, errors.Errorf("unmarhsal query template: %v", err)
return cue.Value{}, errors.Errorf("unmarhsal query template: %v", err)
}
handler.viewTask = workflowv1alpha1.WorkflowStep{
WorkflowStepBase: workflowv1alpha1.WorkflowStepBase{
Name: fmt.Sprintf("%s-%s", qv.View, qv.Export),
Type: qv.View,
Properties: oamutil.Object2RawExtension(qv.Parameter),
Outputs: queryKey.Outputs,
},
}
instance := &wfTypes.WorkflowInstance{
WorkflowMeta: wfTypes.WorkflowMeta{
Name: fmt.Sprintf("%s-%s", qv.View, qv.Export),
},
Steps: []workflowv1alpha1.WorkflowStep{
{
WorkflowStepBase: workflowv1alpha1.WorkflowStepBase{
Name: fmt.Sprintf("%s-%s", qv.View, qv.Export),
Type: qv.View,
Properties: oamutil.Object2RawExtension(qv.Parameter),
Outputs: queryKey.Outputs,
},
},
},
}
executor.InitializeWorkflowInstance(instance)
handlerProviders := providers.NewProviders()
kube.Install(handlerProviders, handler.cli, nil, &kube.Handlers{
Apply: handler.dispatch,
Delete: handler.delete,
ctx = oamprovidertypes.WithRuntimeParams(ctx, oamprovidertypes.RuntimeParams{
KubeClient: handler.cli,
KubeConfig: handler.cfg,
KubeHandlers: &providertypes.KubeHandlers{Apply: handler.dispatch, Delete: handler.delete},
})
query.Install(handlerProviders, handler.cli, handler.cfg)
loader := template.NewViewTemplateLoader(handler.cli, handler.namespace)
if len(strings.Split(qv.View, "\n")) > 2 {
loader = &template.EchoLoader{}
}
logCtx := monitorContext.NewTraceContext(ctx, "").AddTag("velaql")
runners, err := generator.GenerateRunners(logCtx, instance, wfTypes.StepGeneratorOptions{
Providers: handlerProviders,
PackageDiscover: handler.pd,
ProcessCtx: process.NewContext(process.ContextData{}),
TemplateLoader: loader,
Client: handler.cli,
LogLevel: 3,
})
temp, err := loader.LoadTemplate(ctx, qv.View)
if err != nil {
return nil, err
return cue.Value{}, fmt.Errorf("failed to load query templates: %w", err)
}
viewCtx, err := NewViewContext()
v, err := providers.Compiler.Get().CompileStringWithOptions(ctx, temp, cuex.WithExtraData("parameter", qv.Parameter))
if err != nil {
return nil, errors.Errorf("new view context: %v", err)
return cue.Value{}, fmt.Errorf("failed to compile query: %w", err)
}
for _, runner := range runners {
status, _, err := runner.Run(viewCtx, &wfTypes.TaskRunOptions{})
if err != nil {
return nil, errors.Errorf("run query view: %v", err)
}
if string(status.Phase) != ViewTaskPhaseSucceeded {
return nil, errors.Errorf("failed to query the view %s %s", status.Message, status.Reason)
}
res := v.LookupPath(value.FieldPath(qv.Export))
if !res.Exists() {
return cuecontext.New().CompileString("null"), nil
}
return viewCtx.GetVar(qv.Export)
return res, res.Err()
}
func (handler *ViewHandler) dispatch(ctx context.Context, cluster string, _ string, manifests ...*unstructured.Unstructured) error {
func (handler *ViewHandler) dispatch(ctx context.Context, _ client.Client, cluster string, _ string, manifests ...*unstructured.Unstructured) error {
ctx = multicluster.ContextWithClusterName(ctx, cluster)
applicator := apply.NewAPIApplicator(handler.cli)
for _, manifest := range manifests {
@@ -170,30 +113,32 @@ func (handler *ViewHandler) dispatch(ctx context.Context, cluster string, _ stri
return nil
}
func (handler *ViewHandler) delete(ctx context.Context, _ string, _ string, manifest *unstructured.Unstructured) error {
func (handler *ViewHandler) delete(ctx context.Context, _ client.Client, _ string, _ string, manifest *unstructured.Unstructured) error {
return handler.cli.Delete(ctx, manifest)
}
// ValidateView makes sure the cue provided can use as view.
//
// For now, we only check 1. cue is valid 2. `status` or `view` field exists
func ValidateView(viewStr string) error {
val, err := value.NewValue(viewStr, nil, "")
func ValidateView(ctx context.Context, viewStr string) error {
val, err := providers.Compiler.Get().CompileStringWithOptions(ctx, viewStr, cuex.DisableResolveProviderFunctions{})
if err != nil {
return errors.Errorf("error when parsing view: %v", err)
}
// Make sure `status` or `export` field exists
vStatus, errStatus := val.LookupValue(DefaultExportValue)
vExport, errExport := val.LookupValue(KeyWordExport)
vStatus := val.LookupPath(cue.ParsePath(DefaultExportValue))
errStatus := vStatus.Err()
vExport := val.LookupPath(cue.ParsePath(KeyWordExport))
errExport := vExport.Err()
if errStatus != nil && errExport != nil {
return errors.Errorf("no `status` or `export` field found in view: %v, %v", errStatus, errExport)
}
if errStatus == nil {
_, errStatus = vStatus.String()
_, errStatus = sets.ToString(vStatus)
}
if errExport == nil {
_, errExport = vExport.String()
_, errExport = sets.ToString(vExport)
}
if errStatus != nil && errExport != nil {
return errors.Errorf("connot get string from` status` or `export`: %v, %v", errStatus, errExport)
@@ -204,8 +149,8 @@ func ValidateView(viewStr string) error {
// ParseViewIntoConfigMap parses a CUE string (representing a view) into a ConfigMap
// ready to be stored into etcd.
func ParseViewIntoConfigMap(viewStr, name string) (*v1.ConfigMap, error) {
err := ValidateView(viewStr)
func ParseViewIntoConfigMap(ctx context.Context, viewStr, name string) (*v1.ConfigMap, error) {
err := ValidateView(ctx, viewStr)
if err != nil {
return nil, err
}
@@ -239,7 +184,7 @@ func StoreViewFromFile(ctx context.Context, c client.Client, path, viewName stri
return errors.Errorf("cannot load cue file: %v", err)
}
cm, err := ParseViewIntoConfigMap(string(content), viewName)
cm, err := ParseViewIntoConfigMap(ctx, string(content), viewName)
if err != nil {
return err
}

View File

@@ -24,6 +24,8 @@ import (
"testing"
"time"
"github.com/kubevela/workflow/pkg/cue/model/sets"
"github.com/kubevela/workflow/pkg/cue/model/value"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
@@ -53,7 +55,7 @@ var _ = Describe("Test VelaQL View", func() {
Expect(err).Should(BeNil())
podStatus := corev1.PodStatus{}
Expect(queryValue.UnmarshalTo(&podStatus)).Should(BeNil())
Expect(value.UnmarshalTo(queryValue, &podStatus)).Should(BeNil())
})
It("Test query view with wrong request", func() {
@@ -69,7 +71,7 @@ var _ = Describe("Test VelaQL View", func() {
Expect(err).ShouldNot(HaveOccurred())
v, err := viewHandler.QueryView(context.Background(), query)
Expect(err).ShouldNot(HaveOccurred())
s, err := v.String()
s, err := sets.ToString(v)
Expect(err).ShouldNot(HaveOccurred())
Expect(s).Should(Equal("null\n"))
@@ -136,7 +138,7 @@ export: something`,
},
}
for _, c := range cases {
cm, err := ParseViewIntoConfigMap(c.cueStr, "name")
cm, err := ParseViewIntoConfigMap(context.Background(), c.cueStr, "name")
assert.Equal(t, c.succeed, err == nil, err)
if err == nil {
assert.Equal(t, c.cueStr, cm.Data["template"])