Feat: remove workflow step logs to vela workflow logs (#4883)

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
This commit is contained in:
Tianxin Dong
2022-10-20 11:18:48 +08:00
committed by GitHub
parent 7fc3d7c23b
commit 95b3b31b11
6 changed files with 302 additions and 138 deletions

View File

@@ -244,9 +244,6 @@ func GetCUEParameterValue(cueStr string, pd *packages.PackageDiscover) (cue.Valu
if err != nil {
return cue.Value{}, err
}
if template.Error() != nil {
return cue.Value{}, template.Error()
}
val, err := template.LookupValue(process.ParameterFieldName)
if err != nil || !val.CueValue().Exists() {
return cue.Value{}, velacue.ErrParameterNotExist

View File

@@ -293,7 +293,7 @@ func TestGetCUEParameterValue4RareCases(t *testing.T) {
reason: "cue string is invalid",
cueStr: `name`,
want: want{
errMsg: "reference \"name\" not found",
errMsg: "parameter not exist",
},
},
}

View File

@@ -227,14 +227,9 @@ func (d *debugOpts) getDebugOptions(app *v1beta1.Application) (string, []string,
switch {
case app.Status.Workflow != nil:
for _, step := range app.Status.Workflow.Steps {
stepName := step.Name
switch step.Phase {
case workflowv1alpha1.WorkflowStepPhaseSucceeded:
stepName = emojiSucceed + step.Name
case workflowv1alpha1.WorkflowStepPhaseFailed:
stepName = emojiFail + step.Name
stepName := wrapStepName(step.StepStatus)
if strings.HasPrefix(stepName, emojiFail) {
errMap[step.Name] = step.Message
default:
}
stepList = append(stepList, stepName)
}
@@ -250,14 +245,34 @@ func (d *debugOpts) getDebugOptions(app *v1beta1.Application) (string, []string,
return s, stepList, errMap
}
func wrapStepName(step workflowv1alpha1.StepStatus) string {
var stepName string
switch step.Phase {
case workflowv1alpha1.WorkflowStepPhaseSucceeded:
stepName = emojiSucceed + step.Name
case workflowv1alpha1.WorkflowStepPhaseFailed:
stepName = emojiFail + step.Name
case workflowv1alpha1.WorkflowStepPhaseSkipped:
stepName = emojiSkip + step.Name
default:
stepName = emojiExecuting + step.Name
}
return stepName
}
func unwrapStepName(step string) string {
if strings.HasPrefix(step, emojiSucceed) {
switch {
case strings.HasPrefix(step, emojiSucceed):
return strings.TrimPrefix(step, emojiSucceed)
}
if strings.HasPrefix(step, emojiFail) {
case strings.HasPrefix(step, emojiFail):
return strings.TrimPrefix(step, emojiFail)
case strings.HasPrefix(step, emojiSkip):
return strings.TrimPrefix(step, emojiSkip)
case strings.HasPrefix(step, emojiExecuting):
return strings.TrimPrefix(step, emojiExecuting)
default:
return step
}
return step
}
func (d *debugOpts) getDebugRawValue(ctx context.Context, cli client.Client, pd *packages.PackageDiscover, app *v1beta1.Application) (*value.Value, string, error) {

View File

@@ -20,13 +20,11 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"regexp"
"strings"
"text/template"
"time"
"github.com/AlecAivazis/survey/v2"
"github.com/fatih/color"
"github.com/pkg/errors"
"github.com/spf13/cobra"
@@ -34,11 +32,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1"
wfTypes "github.com/kubevela/workflow/pkg/types"
wfUtils "github.com/kubevela/workflow/pkg/utils"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/types"
@@ -65,18 +58,6 @@ func NewLogsCommand(c common.Args, order string, ioStreams util.IOStreams) *cobr
}
largs.Name = args[0]
ctx := context.Background()
if largs.StepName != "" {
cli, err := c.GetClient()
if err != nil {
return err
}
ctxName, label, err := getContextFromInstance(ctx, cli, largs.Namespace, largs.Name)
if err != nil {
return err
}
largs.CtxName = ctxName
return largs.printStepLogs(ctx, ioStreams, label)
}
app, err := appfile.LoadApplication(largs.Namespace, args[0], c)
if err != nil {
return err
@@ -98,7 +79,6 @@ func NewLogsCommand(c common.Args, order string, ioStreams util.IOStreams) *cobr
cmd.Flags().StringVarP(&largs.ClusterName, "cluster", "", "", "filter the pod by the cluster name")
cmd.Flags().StringVarP(&largs.PodName, "pod", "p", "", "specify the pod name")
cmd.Flags().StringVarP(&largs.ContainerName, "container", "", "", "specify the container name")
cmd.Flags().StringVarP(&largs.StepName, "step", "s", "", "specify the step name, note that this flag cannot be used together with the pod, container or component flags")
addNamespaceAndEnvArg(cmd)
return cmd
}
@@ -118,88 +98,6 @@ type Args struct {
App *v1beta1.Application
}
func (l *Args) printStepLogs(ctx context.Context, ioStreams util.IOStreams, label map[string]string) error {
cli, err := l.Args.GetClient()
if err != nil {
return err
}
logConfig, err := wfUtils.GetLogConfigFromStep(ctx, cli, l.CtxName, l.Name, l.Namespace, l.StepName)
if err != nil {
return err
}
if err := selectStepLogSource(logConfig); err != nil {
return err
}
switch {
case logConfig.Data:
return l.printResourceLogs(ctx, cli, ioStreams, []wfTypes.Resource{{
Namespace: types.DefaultKubeVelaNS,
LabelSelector: label,
}}, []string{fmt.Sprintf(`step_name="%s"`, l.StepName), fmt.Sprintf("%s/%s", l.Namespace, l.Name)})
case logConfig.Source != nil:
if len(logConfig.Source.Resources) > 0 {
return l.printResourceLogs(ctx, cli, ioStreams, logConfig.Source.Resources, nil)
}
if logConfig.Source.URL != "" {
readCloser, err := wfUtils.GetLogsFromURL(ctx, logConfig.Source.URL)
if err != nil {
return err
}
//nolint:errcheck
defer readCloser.Close()
if _, err := io.Copy(ioStreams.Out, readCloser); err != nil {
return err
}
}
}
return nil
}
func selectStepLogSource(logConfig *wfTypes.LogConfig) error {
var source string
if logConfig.Data && logConfig.Source != nil {
prompt := &survey.Select{
Message: "Select logs from data or source",
Options: []string{"data", "source"},
}
err := survey.AskOne(prompt, &source, survey.WithValidator(survey.Required))
if err != nil {
return fmt.Errorf("failed to select %s: %w", source, err)
}
if source != "data" {
logConfig.Data = false
}
}
return nil
}
func (l *Args) printResourceLogs(ctx context.Context, cli client.Client, ioStreams util.IOStreams, resources []wfTypes.Resource, filters []string) error {
pods, err := wfUtils.GetPodListFromResources(ctx, cli, resources)
if err != nil {
return err
}
podList := make([]querytypes.PodBase, 0)
for _, pod := range pods {
podBase := querytypes.PodBase{}
podBase.Metadata.Name = pod.Name
podBase.Metadata.Namespace = pod.Namespace
podList = append(podList, podBase)
}
if len(pods) == 0 {
return errors.New("no pod found")
}
var selectPod *querytypes.PodBase
if len(pods) > 1 {
selectPod, err = AskToChooseOnePod(podList)
if err != nil {
return err
}
} else {
selectPod = &podList[0]
}
return l.printPodLogs(ctx, ioStreams, selectPod, filters)
}
func (l *Args) printPodLogs(ctx context.Context, ioStreams util.IOStreams, selectPod *querytypes.PodBase, filters []string) error {
pod, err := regexp.Compile(selectPod.Metadata.Name + ".*")
if err != nil {
@@ -365,21 +263,3 @@ func (l *Args) Run(ctx context.Context, ioStreams util.IOStreams) error {
}
return l.printPodLogs(ctx, ioStreams, selectPod, nil)
}
func getContextFromInstance(ctx context.Context, cli client.Client, namespace, name string) (string, map[string]string, error) {
app := &v1beta1.Application{}
if err := cli.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, app); err == nil {
if app.Status.Workflow != nil && app.Status.Workflow.ContextBackend != nil {
return app.Status.Workflow.ContextBackend.Name, map[string]string{"app.kubernetes.io/name": "vela-core"}, nil
}
return "", nil, fmt.Errorf("no context found in application %s", name)
}
wr := &workflowv1alpha1.WorkflowRun{}
if err := cli.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, wr); err == nil {
if wr.Status.ContextBackend != nil {
return wr.Status.ContextBackend.Name, map[string]string{"app.kubernetes.io/name": "vela-workflow"}, nil
}
return "", nil, fmt.Errorf("no context found in workflowrun %s", name)
}
return "", nil, fmt.Errorf("no context found in application %s", name)
}

View File

@@ -35,8 +35,10 @@ var (
// emoji used in vela cmd for printing
var (
emojiSucceed = emoji.Sprint(":check_mark_button:")
emojiFail = emoji.Sprint(":cross_mark:")
emojiSucceed = emoji.Sprint(":check_mark_button:")
emojiFail = emoji.Sprint(":cross_mark:")
emojiExecuting = emoji.Sprint(":hourglass:")
emojiSkip = emoji.Sprint(":no_entry:")
)
// newUITable creates a new table with fixed MaxColWidth

View File

@@ -19,12 +19,19 @@ package cli
import (
"context"
"fmt"
"io"
"os"
"github.com/AlecAivazis/survey/v2"
"github.com/pkg/errors"
"github.com/spf13/cobra"
k8stypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
pkgmulticluster "github.com/kubevela/pkg/multicluster"
workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1"
wfTypes "github.com/kubevela/workflow/pkg/types"
wfUtils "github.com/kubevela/workflow/pkg/utils"
common2 "github.com/oam-dev/kubevela/apis/core.oam.dev/common"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
@@ -32,6 +39,7 @@ import (
"github.com/oam-dev/kubevela/pkg/oam"
"github.com/oam-dev/kubevela/pkg/utils/common"
cmdutil "github.com/oam-dev/kubevela/pkg/utils/util"
querytypes "github.com/oam-dev/kubevela/pkg/velaql/providers/query/types"
"github.com/oam-dev/kubevela/pkg/workflow/operation"
"github.com/oam-dev/kubevela/references/appfile"
)
@@ -41,7 +49,7 @@ func NewWorkflowCommand(c common.Args, ioStreams cmdutil.IOStreams) *cobra.Comma
cmd := &cobra.Command{
Use: "workflow",
Short: "Operate application delivery workflow.",
Long: "Operate the Workflow during Application Delivery.",
Long: "Operate the Workflow during Application Delivery. Note that workflow command is both valid for Application Workflow and WorkflowRun. The command will try to find the Application first, if not found, it will try to find WorkflowRun. You can also specify the resource type by using --type flag.",
Annotations: map[string]string{
types.TagCommandType: types.TypeCD,
},
@@ -52,6 +60,7 @@ func NewWorkflowCommand(c common.Args, ioStreams cmdutil.IOStreams) *cobra.Comma
NewWorkflowTerminateCommand(c, ioStreams),
NewWorkflowRestartCommand(c, ioStreams),
NewWorkflowRollbackCommand(c, ioStreams),
NewWorkflowLogsCommand(c, ioStreams),
)
return cmd
}
@@ -264,6 +273,267 @@ func NewWorkflowRollbackCommand(c common.Args, ioStream cmdutil.IOStreams) *cobr
return cmd
}
// NewWorkflowLogsCommand create workflow logs command
func NewWorkflowLogsCommand(c common.Args, ioStream cmdutil.IOStreams) *cobra.Command {
wargs := &WorkflowArgs{Args: c}
cmd := &cobra.Command{
Use: "logs",
Short: "Tail logs for workflow steps",
Long: "Tail logs for workflow steps, note that you need to use op.#Logs in step definition to set the log config of the step.",
Example: "vela workflow logs <application-name>",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) < 1 {
return fmt.Errorf("must specify Application or WorkflowRun name")
}
namespace, err := GetFlagNamespaceOrEnv(cmd, c)
if err != nil {
return err
}
cli, err := c.GetClient()
if err != nil {
return err
}
ctx := context.Background()
if err := wargs.getWorkflowInstance(ctx, cli, namespace, args[0]); err != nil {
return err
}
return wargs.printStepLogs(ctx, cli, ioStream)
},
}
cmd.Flags().StringVarP(&wargs.StepName, "step", "s", "", "specify the step name in the workflow")
cmd.Flags().StringVarP(&wargs.Output, "output", "o", "default", "output format for logs, support: [default, raw, json]")
cmd.Flags().StringVarP(&wargs.Type, "type", "t", "", "the type of the resource, support: [app, workflow]")
addNamespaceAndEnvArg(cmd)
return cmd
}
// WorkflowArgs is the args for workflow command
type WorkflowArgs struct {
Type string
Output string
ControllerLabels map[string]string
Args common.Args
StepName string
App *v1beta1.Application
WorkflowRun *workflowv1alpha1.WorkflowRun
WorkflowInstance *wfTypes.WorkflowInstance
}
const (
instanceTypeApplication string = "app"
instanceTypeWorkflowRun string = "workflow"
)
func (w *WorkflowArgs) getWorkflowInstance(ctx context.Context, cli client.Client, namespace, name string) error {
switch w.Type {
case "":
app := &v1beta1.Application{}
if err := cli.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, app); err == nil {
w.Type = instanceTypeApplication
w.App = app
} else {
wr := &workflowv1alpha1.WorkflowRun{}
if err := cli.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, wr); err == nil {
w.Type = instanceTypeWorkflowRun
w.WorkflowRun = wr
}
}
if w.Type == "" {
return fmt.Errorf("can't find application or workflowrun %s", name)
}
case instanceTypeApplication:
app := &v1beta1.Application{}
if err := cli.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, app); err != nil {
return err
}
w.App = app
case instanceTypeWorkflowRun:
wr := &workflowv1alpha1.WorkflowRun{}
if err := cli.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, wr); err != nil {
return err
}
w.WorkflowRun = wr
default:
}
return w.generateWorkflowInstance(ctx, cli)
}
func (w *WorkflowArgs) generateWorkflowInstance(ctx context.Context, cli client.Client) error {
switch w.Type {
case instanceTypeApplication:
if w.App.Status.Workflow == nil {
return fmt.Errorf("the workflow in application %s is not start", w.App.Name)
}
status := w.App.Status.Workflow
w.WorkflowInstance = &wfTypes.WorkflowInstance{
WorkflowMeta: wfTypes.WorkflowMeta{
Name: w.App.Name,
Namespace: w.App.Namespace,
},
Steps: w.App.Spec.Workflow.Steps,
Status: workflowv1alpha1.WorkflowRunStatus{
Phase: status.Phase,
Message: status.Message,
Suspend: status.Suspend,
SuspendState: status.SuspendState,
Terminated: status.Terminated,
Finished: status.Finished,
ContextBackend: status.ContextBackend,
Steps: status.Steps,
StartTime: status.StartTime,
EndTime: status.EndTime,
},
}
w.ControllerLabels = map[string]string{"app.kubernetes.io/name": "vela-core"}
case instanceTypeWorkflowRun:
var steps []workflowv1alpha1.WorkflowStep
if w.WorkflowRun.Spec.WorkflowRef != "" {
workflow := &workflowv1alpha1.Workflow{}
if err := cli.Get(ctx, client.ObjectKey{Namespace: w.WorkflowRun.Namespace, Name: w.WorkflowRun.Spec.WorkflowRef}, workflow); err != nil {
return err
}
steps = workflow.Steps
} else {
steps = w.WorkflowRun.Spec.WorkflowSpec.Steps
}
w.WorkflowInstance = &wfTypes.WorkflowInstance{
WorkflowMeta: wfTypes.WorkflowMeta{
Name: w.WorkflowRun.Name,
Namespace: w.WorkflowRun.Namespace,
},
Steps: steps,
Status: w.WorkflowRun.Status,
}
w.ControllerLabels = map[string]string{"app.kubernetes.io/name": "vela-workflow"}
default:
return fmt.Errorf("unknown workflow instance type %s", w.Type)
}
return nil
}
func (w *WorkflowArgs) printStepLogs(ctx context.Context, cli client.Client, ioStreams cmdutil.IOStreams) error {
if w.StepName == "" {
if err := w.selectWorkflowStep(); err != nil {
return err
}
}
if w.WorkflowInstance.Status.ContextBackend == nil {
return fmt.Errorf("the workflow context backend is not set")
}
logConfig, err := wfUtils.GetLogConfigFromStep(ctx, cli, w.WorkflowInstance.Status.ContextBackend.Name, w.WorkflowInstance.Name, w.WorkflowInstance.Namespace, w.StepName)
if err != nil {
return err
}
if err := selectStepLogSource(logConfig); err != nil {
return err
}
switch {
case logConfig.Data:
return w.printResourceLogs(ctx, cli, ioStreams, []wfTypes.Resource{{
Namespace: types.DefaultKubeVelaNS,
LabelSelector: w.ControllerLabels,
}}, []string{fmt.Sprintf(`step_name="%s"`, w.StepName), fmt.Sprintf("%s/%s", w.WorkflowInstance.Namespace, w.WorkflowInstance.Name), "cue logs"})
case logConfig.Source != nil:
if len(logConfig.Source.Resources) > 0 {
return w.printResourceLogs(ctx, cli, ioStreams, logConfig.Source.Resources, nil)
}
if logConfig.Source.URL != "" {
readCloser, err := wfUtils.GetLogsFromURL(ctx, logConfig.Source.URL)
if err != nil {
return err
}
//nolint:errcheck
defer readCloser.Close()
if _, err := io.Copy(ioStreams.Out, readCloser); err != nil {
return err
}
}
}
return nil
}
func (w *WorkflowArgs) selectWorkflowStep() error {
steps := make(map[string]workflowv1alpha1.WorkflowStepStatus)
stepsKey := make([]string, 0)
for _, step := range w.WorkflowInstance.Status.Steps {
stepsKey = append(stepsKey, wrapStepName(step.StepStatus))
}
prompt := &survey.Select{
Message: "Select a step to show logs:",
Options: stepsKey,
}
var stepName string
err := survey.AskOne(prompt, &stepName, survey.WithValidator(survey.Required))
if err != nil {
return fmt.Errorf("failed to select %s: %w", w.StepName, err)
}
if step := steps[w.StepName]; step.Type == wfTypes.WorkflowStepTypeStepGroup {
stepsKey := make([]string, 0)
for _, sub := range step.SubStepsStatus {
stepsKey = append(stepsKey, wrapStepName(sub))
}
prompt := &survey.Select{
Message: "Select a sub step to show logs:",
Options: stepsKey,
}
err := survey.AskOne(prompt, &stepName, survey.WithValidator(survey.Required))
if err != nil {
return fmt.Errorf("failed to select %s: %w", w.StepName, err)
}
}
w.StepName = unwrapStepName(stepName)
return nil
}
func selectStepLogSource(logConfig *wfTypes.LogConfig) error {
var source string
if logConfig.Data && logConfig.Source != nil {
prompt := &survey.Select{
Message: "Select logs from data or source",
Options: []string{"data", "source"},
}
err := survey.AskOne(prompt, &source, survey.WithValidator(survey.Required))
if err != nil {
return fmt.Errorf("failed to select %s: %w", source, err)
}
if source != "data" {
logConfig.Data = false
}
}
return nil
}
func (w *WorkflowArgs) printResourceLogs(ctx context.Context, cli client.Client, ioStreams cmdutil.IOStreams, resources []wfTypes.Resource, filters []string) error {
pods, err := wfUtils.GetPodListFromResources(ctx, cli, resources)
if err != nil {
return err
}
podList := make([]querytypes.PodBase, 0)
for _, pod := range pods {
podBase := querytypes.PodBase{}
podBase.Metadata.Name = pod.Name
podBase.Metadata.Namespace = pod.Namespace
podList = append(podList, podBase)
}
if len(pods) == 0 {
return errors.New("no pod found")
}
var selectPod *querytypes.PodBase
if len(pods) > 1 {
selectPod, err = AskToChooseOnePod(podList)
if err != nil {
return err
}
} else {
selectPod = &podList[0]
}
l := Args{
Args: w.Args,
Output: w.Output,
}
return l.printPodLogs(ctx, ioStreams, selectPod, filters)
}
func checkApplicationNotRunning(c common.Args) func(cmd *cobra.Command, args []string) {
return func(cmd *cobra.Command, args []string) {
// Any error will be returned to let the normal execution report the error