mirror of
https://github.com/kubevela/kubevela.git
synced 2026-02-14 10:00:06 +00:00
Feat: 7019 Support re-running workflows and ensure passed data is updated during dispatch (#7025)
Signed-off-by: Brian Kane <briankane1@gmail.com>
This commit is contained in:
@@ -225,6 +225,12 @@ type AppStatus struct {
|
|||||||
// Workflow record the status of workflow
|
// Workflow record the status of workflow
|
||||||
Workflow *WorkflowStatus `json:"workflow,omitempty"`
|
Workflow *WorkflowStatus `json:"workflow,omitempty"`
|
||||||
|
|
||||||
|
// WorkflowRestartScheduledAt schedules a workflow restart at the specified time.
|
||||||
|
// This field is automatically set when the app.oam.dev/restart-workflow annotation is present,
|
||||||
|
// and is cleared after the restart is triggered. Use RFC3339 format or set to current time for immediate restart.
|
||||||
|
// +optional
|
||||||
|
WorkflowRestartScheduledAt *metav1.Time `json:"workflowRestartScheduledAt,omitempty"`
|
||||||
|
|
||||||
// LatestRevision of the application configuration it generates
|
// LatestRevision of the application configuration it generates
|
||||||
// +optional
|
// +optional
|
||||||
LatestRevision *Revision `json:"latestRevision,omitempty"`
|
LatestRevision *Revision `json:"latestRevision,omitempty"`
|
||||||
|
|||||||
@@ -48,6 +48,10 @@ func (in *AppStatus) DeepCopyInto(out *AppStatus) {
|
|||||||
*out = new(WorkflowStatus)
|
*out = new(WorkflowStatus)
|
||||||
(*in).DeepCopyInto(*out)
|
(*in).DeepCopyInto(*out)
|
||||||
}
|
}
|
||||||
|
if in.WorkflowRestartScheduledAt != nil {
|
||||||
|
in, out := &in.WorkflowRestartScheduledAt, &out.WorkflowRestartScheduledAt
|
||||||
|
*out = (*in).DeepCopy()
|
||||||
|
}
|
||||||
if in.LatestRevision != nil {
|
if in.LatestRevision != nil {
|
||||||
in, out := &in.LatestRevision, &out.LatestRevision
|
in, out := &in.LatestRevision, &out.LatestRevision
|
||||||
*out = new(Revision)
|
*out = new(Revision)
|
||||||
|
|||||||
@@ -817,6 +817,13 @@ spec:
|
|||||||
- suspend
|
- suspend
|
||||||
- terminated
|
- terminated
|
||||||
type: object
|
type: object
|
||||||
|
workflowRestartScheduledAt:
|
||||||
|
description: |-
|
||||||
|
WorkflowRestartScheduledAt schedules a workflow restart at the specified time.
|
||||||
|
This field is automatically set when the app.oam.dev/restart-workflow annotation is present,
|
||||||
|
and is cleared after the restart is triggered. Use RFC3339 format or set to current time for immediate restart.
|
||||||
|
format: date-time
|
||||||
|
type: string
|
||||||
type: object
|
type: object
|
||||||
type: object
|
type: object
|
||||||
componentDefinitions:
|
componentDefinitions:
|
||||||
|
|||||||
@@ -760,6 +760,13 @@ spec:
|
|||||||
- suspend
|
- suspend
|
||||||
- terminated
|
- terminated
|
||||||
type: object
|
type: object
|
||||||
|
workflowRestartScheduledAt:
|
||||||
|
description: |-
|
||||||
|
WorkflowRestartScheduledAt schedules a workflow restart at the specified time.
|
||||||
|
This field is automatically set when the app.oam.dev/restart-workflow annotation is present,
|
||||||
|
and is cleared after the restart is triggered. Use RFC3339 format or set to current time for immediate restart.
|
||||||
|
format: date-time
|
||||||
|
type: string
|
||||||
type: object
|
type: object
|
||||||
type: object
|
type: object
|
||||||
served: true
|
served: true
|
||||||
|
|||||||
115
charts/vela-core/templates/defwithtemplate/restart-workflow.yaml
Normal file
115
charts/vela-core/templates/defwithtemplate/restart-workflow.yaml
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
# Code generated by KubeVela templates. DO NOT EDIT. Please edit the original cue file.
|
||||||
|
# Definition source cue file: vela-templates/definitions/internal/restart-workflow.cue
|
||||||
|
apiVersion: core.oam.dev/v1beta1
|
||||||
|
kind: WorkflowStepDefinition
|
||||||
|
metadata:
|
||||||
|
annotations:
|
||||||
|
custom.definition.oam.dev/category: Workflow Control
|
||||||
|
definition.oam.dev/description: Schedule the current Application's workflow to restart at a specific time, after a duration, or at recurring intervals
|
||||||
|
labels:
|
||||||
|
custom.definition.oam.dev/scope: Application
|
||||||
|
name: restart-workflow
|
||||||
|
namespace: {{ include "systemDefinitionNamespace" . }}
|
||||||
|
spec:
|
||||||
|
schematic:
|
||||||
|
cue:
|
||||||
|
template: |
|
||||||
|
import "vela/kube"
|
||||||
|
import "vela/builtin"
|
||||||
|
|
||||||
|
// Count how many parameters are provided
|
||||||
|
_paramCount: len([
|
||||||
|
if parameter.at != _|_ {1},
|
||||||
|
if parameter.after != _|_ {1},
|
||||||
|
if parameter.every != _|_ {1},
|
||||||
|
])
|
||||||
|
|
||||||
|
// Fail if not exactly one parameter is provided
|
||||||
|
if _paramCount != 1 {
|
||||||
|
validateParams: builtin.#Fail & {
|
||||||
|
$params: message: "Exactly one of 'at', 'after', or 'every' parameters must be specified (found \(_paramCount))"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the bash script to calculate annotation value
|
||||||
|
_script: string
|
||||||
|
if parameter.at != _|_ {
|
||||||
|
// Fixed timestamp mode - use as-is
|
||||||
|
_script: """
|
||||||
|
VALUE="\(parameter.at)"
|
||||||
|
kubectl annotate application \(context.name) -n \(context.namespace) app.oam.dev/restart-workflow="$VALUE" --overwrite
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
if parameter.after != _|_ {
|
||||||
|
// Relative time mode - calculate timestamp using date
|
||||||
|
// Convert duration format (5m, 1h, 2d) to seconds, then calculate
|
||||||
|
_script: """
|
||||||
|
DURATION="\(parameter.after)"
|
||||||
|
|
||||||
|
# Convert duration to seconds
|
||||||
|
SECONDS=0
|
||||||
|
if [[ "$DURATION" =~ ^([0-9]+)m$ ]]; then
|
||||||
|
SECONDS=$((${BASH_REMATCH[1]} * 60))
|
||||||
|
elif [[ "$DURATION" =~ ^([0-9]+)h$ ]]; then
|
||||||
|
SECONDS=$((${BASH_REMATCH[1]} * 3600))
|
||||||
|
elif [[ "$DURATION" =~ ^([0-9]+)d$ ]]; then
|
||||||
|
SECONDS=$((${BASH_REMATCH[1]} * 86400))
|
||||||
|
else
|
||||||
|
echo "ERROR: Invalid duration format: $DURATION (expected format: 5m, 1h, or 2d)"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Calculate future timestamp using seconds offset
|
||||||
|
VALUE=$(date -u -d "@$(($(date +%s) + SECONDS))" +%Y-%m-%dT%H:%M:%SZ)
|
||||||
|
echo "Calculated timestamp for after '$DURATION' ($SECONDS seconds): $VALUE"
|
||||||
|
kubectl annotate application \(context.name) -n \(context.namespace) app.oam.dev/restart-workflow="$VALUE" --overwrite
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
if parameter.every != _|_ {
|
||||||
|
// Recurring interval mode - pass duration directly
|
||||||
|
_script: """
|
||||||
|
VALUE="\(parameter.every)"
|
||||||
|
kubectl annotate application \(context.name) -n \(context.namespace) app.oam.dev/restart-workflow="$VALUE" --overwrite
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run kubectl to annotate the Application
|
||||||
|
job: kube.#Apply & {
|
||||||
|
$params: value: {
|
||||||
|
apiVersion: "batch/v1"
|
||||||
|
kind: "Job"
|
||||||
|
metadata: {
|
||||||
|
name: "\(context.name)-restart-workflow-\(context.stepSessionID)"
|
||||||
|
namespace: "vela-system"
|
||||||
|
}
|
||||||
|
spec: {
|
||||||
|
backoffLimit: 3
|
||||||
|
template: spec: {
|
||||||
|
containers: [{
|
||||||
|
name: "kubectl-annotate"
|
||||||
|
image: "bitnami/kubectl:latest"
|
||||||
|
command: ["/bin/sh", "-c"]
|
||||||
|
args: [_script]
|
||||||
|
}]
|
||||||
|
restartPolicy: "Never"
|
||||||
|
serviceAccountName: "kubevela-vela-core"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wait: builtin.#ConditionalWait & {
|
||||||
|
if job.$returns.value.status != _|_ if job.$returns.value.status.succeeded != _|_ {
|
||||||
|
$params: continue: job.$returns.value.status.succeeded > 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
parameter: {
|
||||||
|
// +usage=Schedule restart at a specific RFC3339 timestamp (e.g., "2025-01-15T14:30:00Z")
|
||||||
|
at?: string & =~"^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\\.[0-9]+)?(Z|[+-][0-9]{2}:[0-9]{2})$"
|
||||||
|
// +usage=Schedule restart after a relative duration from now (e.g., "5m", "1h", "2d")
|
||||||
|
after?: string & =~"^[0-9]+(m|h|d)$"
|
||||||
|
// +usage=Schedule recurring restarts every specified duration (e.g., "5m", "1h", "24h")
|
||||||
|
every?: string & =~"^[0-9]+(m|h|d)$"
|
||||||
|
}
|
||||||
|
|
||||||
@@ -106,7 +106,7 @@ tidy:
|
|||||||
.PHONY: sync-crds
|
.PHONY: sync-crds
|
||||||
PKG_MODULE = github.com/kubevela/pkg # fetch common crds from the pkg repo instead of generating locally
|
PKG_MODULE = github.com/kubevela/pkg # fetch common crds from the pkg repo instead of generating locally
|
||||||
sync-crds: ## Copy CRD from pinned module version in go.mod
|
sync-crds: ## Copy CRD from pinned module version in go.mod
|
||||||
@moddir=$$(go list -m -f '{{.Dir}}' $(PKG_MODULE) 2>/dev/null); \
|
@moddir=$$(go mod download -json $(PKG_MODULE) 2>/dev/null | grep '"Dir"' | cut -d'"' -f4); \
|
||||||
mkdir -p config/crd/base; \
|
mkdir -p config/crd/base; \
|
||||||
for file in $(COMMON_CRD_FILES); do \
|
for file in $(COMMON_CRD_FILES); do \
|
||||||
src="$$moddir/crds/$$file"; \
|
src="$$moddir/crds/$$file"; \
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
/*
|
||||||
/*
|
/*
|
||||||
Copyright 2021 The KubeVela Authors.
|
Copyright 2021 The KubeVela Authors.
|
||||||
|
|
||||||
@@ -142,6 +143,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), common.ApplicationStarting)
|
return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), common.ApplicationStarting)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle workflow restart requests - converts annotation to status field
|
||||||
|
r.handleWorkflowRestartAnnotation(ctx, app)
|
||||||
|
|
||||||
endReconcile, result, err := r.handleFinalizers(logCtx, app, handler)
|
endReconcile, result, err := r.handleFinalizers(logCtx, app, handler)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if app.GetDeletionTimestamp() == nil {
|
if app.GetDeletionTimestamp() == nil {
|
||||||
@@ -190,7 +195,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
|
|||||||
app.Status.SetConditions(condition.ReadyCondition(common.PolicyCondition.String()))
|
app.Status.SetConditions(condition.ReadyCondition(common.PolicyCondition.String()))
|
||||||
r.Recorder.Event(app, event.Normal(velatypes.ReasonPolicyGenerated, velatypes.MessagePolicyGenerated))
|
r.Recorder.Event(app, event.Normal(velatypes.ReasonPolicyGenerated, velatypes.MessagePolicyGenerated))
|
||||||
|
|
||||||
handler.CheckWorkflowRestart(logCtx, app)
|
// Check if workflow needs restart (combines scheduled restart + revision-based restart)
|
||||||
|
r.checkWorkflowRestart(logCtx, app, handler)
|
||||||
|
|
||||||
workflowInstance, runners, err := handler.GenerateApplicationSteps(logCtx, app, appParser, appFile)
|
workflowInstance, runners, err := handler.GenerateApplicationSteps(logCtx, app, appParser, appFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -18,13 +18,17 @@ package application
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo/v2"
|
. "github.com/onsi/ginkgo/v2"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
|
||||||
|
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
|
||||||
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
|
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
|
||||||
velatypes "github.com/oam-dev/kubevela/apis/types"
|
velatypes "github.com/oam-dev/kubevela/apis/types"
|
||||||
|
"github.com/oam-dev/kubevela/pkg/appfile"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Describe("Test dispatch stage", func() {
|
var _ = Describe("Test dispatch stage", func() {
|
||||||
@@ -73,3 +77,217 @@ var _ = Describe("Test dispatch stage", func() {
|
|||||||
Expect(stage).Should(BeEquivalentTo(DefaultDispatch))
|
Expect(stage).Should(BeEquivalentTo(DefaultDispatch))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var _ = Describe("Test componentPropertiesChanged", func() {
|
||||||
|
It("should return true when component not in revision (first deployment)", func() {
|
||||||
|
comp := &appfile.Component{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "webservice",
|
||||||
|
Params: map[string]interface{}{
|
||||||
|
"image": "nginx:latest",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
appRev := &v1beta1.ApplicationRevision{
|
||||||
|
Spec: v1beta1.ApplicationRevisionSpec{
|
||||||
|
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
|
||||||
|
Application: v1beta1.Application{
|
||||||
|
Spec: v1beta1.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
Expect(componentPropertiesChanged(comp, appRev)).Should(BeTrue())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should return false when component properties unchanged", func() {
|
||||||
|
properties := map[string]interface{}{
|
||||||
|
"image": "nginx:latest",
|
||||||
|
"port": 80,
|
||||||
|
}
|
||||||
|
propertiesJSON, _ := json.Marshal(properties)
|
||||||
|
|
||||||
|
comp := &appfile.Component{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "webservice",
|
||||||
|
Params: properties,
|
||||||
|
}
|
||||||
|
appRev := &v1beta1.ApplicationRevision{
|
||||||
|
Spec: v1beta1.ApplicationRevisionSpec{
|
||||||
|
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
|
||||||
|
Application: v1beta1.Application{
|
||||||
|
Spec: v1beta1.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "webservice",
|
||||||
|
Properties: &runtime.RawExtension{
|
||||||
|
Raw: propertiesJSON,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
Expect(componentPropertiesChanged(comp, appRev)).Should(BeFalse())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should return true when component properties changed", func() {
|
||||||
|
oldProperties := map[string]interface{}{
|
||||||
|
"image": "nginx:1.0",
|
||||||
|
"port": 80,
|
||||||
|
}
|
||||||
|
oldPropertiesJSON, _ := json.Marshal(oldProperties)
|
||||||
|
|
||||||
|
newProperties := map[string]interface{}{
|
||||||
|
"image": "nginx:2.0",
|
||||||
|
"port": 80,
|
||||||
|
}
|
||||||
|
|
||||||
|
comp := &appfile.Component{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "webservice",
|
||||||
|
Params: newProperties,
|
||||||
|
}
|
||||||
|
appRev := &v1beta1.ApplicationRevision{
|
||||||
|
Spec: v1beta1.ApplicationRevisionSpec{
|
||||||
|
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
|
||||||
|
Application: v1beta1.Application{
|
||||||
|
Spec: v1beta1.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "webservice",
|
||||||
|
Properties: &runtime.RawExtension{
|
||||||
|
Raw: oldPropertiesJSON,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
Expect(componentPropertiesChanged(comp, appRev)).Should(BeTrue())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should return true when component type changed", func() {
|
||||||
|
properties := map[string]interface{}{
|
||||||
|
"image": "nginx:latest",
|
||||||
|
}
|
||||||
|
propertiesJSON, _ := json.Marshal(properties)
|
||||||
|
|
||||||
|
comp := &appfile.Component{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "worker",
|
||||||
|
Params: properties,
|
||||||
|
}
|
||||||
|
appRev := &v1beta1.ApplicationRevision{
|
||||||
|
Spec: v1beta1.ApplicationRevisionSpec{
|
||||||
|
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
|
||||||
|
Application: v1beta1.Application{
|
||||||
|
Spec: v1beta1.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "webservice",
|
||||||
|
Properties: &runtime.RawExtension{
|
||||||
|
Raw: propertiesJSON,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
Expect(componentPropertiesChanged(comp, appRev)).Should(BeTrue())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should return false when both properties are nil", func() {
|
||||||
|
comp := &appfile.Component{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "webservice",
|
||||||
|
Params: nil,
|
||||||
|
}
|
||||||
|
appRev := &v1beta1.ApplicationRevision{
|
||||||
|
Spec: v1beta1.ApplicationRevisionSpec{
|
||||||
|
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
|
||||||
|
Application: v1beta1.Application{
|
||||||
|
Spec: v1beta1.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "webservice",
|
||||||
|
Properties: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
Expect(componentPropertiesChanged(comp, appRev)).Should(BeFalse())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should return true when properties removed (nil current, non-empty previous)", func() {
|
||||||
|
comp := &appfile.Component{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "webservice",
|
||||||
|
Params: nil, // Properties removed
|
||||||
|
}
|
||||||
|
appRev := &v1beta1.ApplicationRevision{
|
||||||
|
Spec: v1beta1.ApplicationRevisionSpec{
|
||||||
|
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
|
||||||
|
Application: v1beta1.Application{
|
||||||
|
Spec: v1beta1.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "webservice",
|
||||||
|
Properties: &runtime.RawExtension{
|
||||||
|
Raw: []byte(`{"image":"nginx:1.0","port":80}`),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
Expect(componentPropertiesChanged(comp, appRev)).Should(BeTrue())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should return true on JSON unmarshal error (conservative)", func() {
|
||||||
|
comp := &appfile.Component{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "webservice",
|
||||||
|
Params: map[string]interface{}{
|
||||||
|
"image": "nginx:latest",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
appRev := &v1beta1.ApplicationRevision{
|
||||||
|
Spec: v1beta1.ApplicationRevisionSpec{
|
||||||
|
ApplicationRevisionCompressibleFields: v1beta1.ApplicationRevisionCompressibleFields{
|
||||||
|
Application: v1beta1.Application{
|
||||||
|
Spec: v1beta1.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{
|
||||||
|
Name: "test-component",
|
||||||
|
Type: "webservice",
|
||||||
|
Properties: &runtime.RawExtension{
|
||||||
|
Raw: []byte("invalid json"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
Expect(componentPropertiesChanged(comp, appRev)).Should(BeTrue())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|||||||
@@ -18,10 +18,12 @@ package application
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/api/equality"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
|
||||||
@@ -144,7 +146,19 @@ func (h *AppHandler) generateDispatcher(appRev *v1beta1.ApplicationRevision, rea
|
|||||||
isAutoUpdateEnabled = true
|
isAutoUpdateEnabled = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if isHealth, err := dispatcher.healthCheck(ctx, comp, appRev); !isHealth || err != nil || (!comp.SkipApplyWorkload && isAutoUpdateEnabled) {
|
isHealth, err := dispatcher.healthCheck(ctx, comp, appRev)
|
||||||
|
|
||||||
|
// Check if component properties have changed (only for healthy components)
|
||||||
|
// Note: componentPropertiesChanged handles nil comp.Params correctly, so we don't check it here
|
||||||
|
propertiesChanged := false
|
||||||
|
if isHealth && err == nil {
|
||||||
|
propertiesChanged = componentPropertiesChanged(comp, appRev)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dispatch if: unhealthy, health error, properties changed, or auto-update enabled
|
||||||
|
requiresDispatch := !isHealth || err != nil || propertiesChanged || (!comp.SkipApplyWorkload && isAutoUpdateEnabled)
|
||||||
|
|
||||||
|
if requiresDispatch {
|
||||||
if err := h.Dispatch(ctx, h.Client, clusterName, common.WorkflowResourceCreator, dispatchManifests...); err != nil {
|
if err := h.Dispatch(ctx, h.Client, clusterName, common.WorkflowResourceCreator, dispatchManifests...); err != nil {
|
||||||
return false, errors.WithMessage(err, "Dispatch")
|
return false, errors.WithMessage(err, "Dispatch")
|
||||||
}
|
}
|
||||||
@@ -235,3 +249,45 @@ func getTraitDispatchStage(client client.Client, traitType string, appRev *v1bet
|
|||||||
}
|
}
|
||||||
return stageType, nil
|
return stageType, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// componentPropertiesChanged compares current component properties with the last
|
||||||
|
// applied version in ApplicationRevision. Returns true if properties have changed
|
||||||
|
func componentPropertiesChanged(comp *appfile.Component, appRev *v1beta1.ApplicationRevision) bool {
|
||||||
|
var revComponent *common.ApplicationComponent
|
||||||
|
for i := range appRev.Spec.Application.Spec.Components {
|
||||||
|
if appRev.Spec.Application.Spec.Components[i].Name == comp.Name {
|
||||||
|
revComponent = &appRev.Spec.Application.Spec.Components[i]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// First deployment or new component
|
||||||
|
if revComponent == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type changed
|
||||||
|
if revComponent.Type != comp.Type {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compare properties as JSON to handle type normalization (e.g. int vs float64)
|
||||||
|
currentProperties := comp.Params
|
||||||
|
if currentProperties == nil {
|
||||||
|
currentProperties = make(map[string]interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
currentJSON, err := json.Marshal(currentProperties)
|
||||||
|
if err != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
var revJSON []byte
|
||||||
|
if revComponent.Properties != nil && len(revComponent.Properties.Raw) > 0 {
|
||||||
|
revJSON = revComponent.Properties.Raw
|
||||||
|
} else {
|
||||||
|
revJSON, _ = json.Marshal(map[string]interface{}{})
|
||||||
|
}
|
||||||
|
|
||||||
|
return !equality.Semantic.DeepEqual(currentJSON, revJSON)
|
||||||
|
}
|
||||||
|
|||||||
@@ -40,7 +40,6 @@ import (
|
|||||||
wfTypes "github.com/kubevela/workflow/pkg/types"
|
wfTypes "github.com/kubevela/workflow/pkg/types"
|
||||||
|
|
||||||
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
|
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
|
||||||
"github.com/oam-dev/kubevela/apis/core.oam.dev/condition"
|
|
||||||
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
|
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
|
||||||
"github.com/oam-dev/kubevela/apis/types"
|
"github.com/oam-dev/kubevela/apis/types"
|
||||||
"github.com/oam-dev/kubevela/pkg/appfile"
|
"github.com/oam-dev/kubevela/pkg/appfile"
|
||||||
@@ -129,60 +128,28 @@ func (h *AppHandler) GenerateApplicationSteps(ctx monitorContext.Context,
|
|||||||
return instance, runners, nil
|
return instance, runners, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckWorkflowRestart check if application workflow need restart and return the desired
|
// copyWorkflowStatusToInstance copies Application workflow status to WorkflowInstance status.
|
||||||
// rev to be set in status
|
// Returns a WorkflowRunStatus with Mode set and other fields copied from app.Status.Workflow if it exists.
|
||||||
// 1. If workflow status is empty, it means no previous running record, the
|
func copyWorkflowStatusToInstance(app *v1beta1.Application, mode *workflowv1alpha1.WorkflowExecuteMode) workflowv1alpha1.WorkflowRunStatus {
|
||||||
// workflow will restart (cold start)
|
status := workflowv1alpha1.WorkflowRunStatus{
|
||||||
// 2. If workflow status is not empty, and publishVersion is set, the desired
|
Mode: *mode,
|
||||||
// rev will be the publishVersion
|
|
||||||
// 3. If workflow status is not empty, the desired rev will be the
|
|
||||||
// ApplicationRevision name. For backward compatibility, the legacy style
|
|
||||||
// <rev>:<hash> will be recognized and reduced into <rev>
|
|
||||||
func (h *AppHandler) CheckWorkflowRestart(ctx monitorContext.Context, app *v1beta1.Application) {
|
|
||||||
desiredRev, currentRev := h.currentAppRev.Name, ""
|
|
||||||
if app.Status.Workflow != nil {
|
|
||||||
currentRev = app.Status.Workflow.AppRevision
|
|
||||||
}
|
|
||||||
if metav1.HasAnnotation(app.ObjectMeta, oam.AnnotationPublishVersion) {
|
|
||||||
desiredRev = app.GetAnnotations()[oam.AnnotationPublishVersion]
|
|
||||||
} else { // nolint
|
|
||||||
// backward compatibility
|
|
||||||
// legacy versions use <rev>:<hash> as currentRev, extract <rev>
|
|
||||||
if idx := strings.LastIndexAny(currentRev, ":"); idx >= 0 {
|
|
||||||
currentRev = currentRev[:idx]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if currentRev != "" && desiredRev == currentRev {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// record in revision
|
|
||||||
if h.latestAppRev != nil && h.latestAppRev.Status.Workflow == nil && app.Status.Workflow != nil {
|
|
||||||
app.Status.Workflow.Terminated = true
|
|
||||||
app.Status.Workflow.Finished = true
|
|
||||||
if app.Status.Workflow.EndTime.IsZero() {
|
|
||||||
app.Status.Workflow.EndTime = metav1.Now()
|
|
||||||
}
|
|
||||||
h.UpdateApplicationRevisionStatus(ctx, h.latestAppRev, app.Status.Workflow)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// clean recorded resources info.
|
// Copy status fields if workflow status exists (may be nil on first run)
|
||||||
app.Status.Services = nil
|
if wfStatus := app.Status.Workflow; wfStatus != nil {
|
||||||
app.Status.AppliedResources = nil
|
status.Phase = wfStatus.Phase
|
||||||
|
status.Message = wfStatus.Message
|
||||||
|
status.Suspend = wfStatus.Suspend
|
||||||
|
status.SuspendState = wfStatus.SuspendState
|
||||||
|
status.Terminated = wfStatus.Terminated
|
||||||
|
status.Finished = wfStatus.Finished
|
||||||
|
status.ContextBackend = wfStatus.ContextBackend
|
||||||
|
status.Steps = wfStatus.Steps
|
||||||
|
status.StartTime = wfStatus.StartTime
|
||||||
|
status.EndTime = wfStatus.EndTime
|
||||||
|
}
|
||||||
|
|
||||||
// clean conditions after render
|
return status
|
||||||
var reservedConditions []condition.Condition
|
|
||||||
for i, cond := range app.Status.Conditions {
|
|
||||||
condTpy, err := common.ParseApplicationConditionType(string(cond.Type))
|
|
||||||
if err == nil {
|
|
||||||
if condTpy <= common.RenderCondition {
|
|
||||||
reservedConditions = append(reservedConditions, app.Status.Conditions[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
app.Status.Conditions = reservedConditions
|
|
||||||
app.Status.Workflow = &common.WorkflowStatus{
|
|
||||||
AppRevision: desiredRev,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateWorkflowInstance(af *appfile.Appfile, app *v1beta1.Application) *wfTypes.WorkflowInstance {
|
func generateWorkflowInstance(af *appfile.Appfile, app *v1beta1.Application) *wfTypes.WorkflowInstance {
|
||||||
@@ -207,20 +174,7 @@ func generateWorkflowInstance(af *appfile.Appfile, app *v1beta1.Application) *wf
|
|||||||
Steps: af.WorkflowSteps,
|
Steps: af.WorkflowSteps,
|
||||||
Mode: af.WorkflowMode,
|
Mode: af.WorkflowMode,
|
||||||
}
|
}
|
||||||
status := app.Status.Workflow
|
instance.Status = copyWorkflowStatusToInstance(app, af.WorkflowMode)
|
||||||
instance.Status = workflowv1alpha1.WorkflowRunStatus{
|
|
||||||
Mode: *af.WorkflowMode,
|
|
||||||
Phase: status.Phase,
|
|
||||||
Message: status.Message,
|
|
||||||
Suspend: status.Suspend,
|
|
||||||
SuspendState: status.SuspendState,
|
|
||||||
Terminated: status.Terminated,
|
|
||||||
Finished: status.Finished,
|
|
||||||
ContextBackend: status.ContextBackend,
|
|
||||||
Steps: status.Steps,
|
|
||||||
StartTime: status.StartTime,
|
|
||||||
EndTime: status.EndTime,
|
|
||||||
}
|
|
||||||
switch app.Status.Phase {
|
switch app.Status.Phase {
|
||||||
case common.ApplicationRunning:
|
case common.ApplicationRunning:
|
||||||
instance.Status.Phase = workflowv1alpha1.WorkflowStateSucceeded
|
instance.Status.Phase = workflowv1alpha1.WorkflowStateSucceeded
|
||||||
|
|||||||
@@ -118,7 +118,6 @@ var _ = Describe("Test Application workflow generator", func() {
|
|||||||
|
|
||||||
logCtx := monitorContext.NewTraceContext(ctx, "")
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
handler.currentAppRev = &oamcore.ApplicationRevision{}
|
handler.currentAppRev = &oamcore.ApplicationRevision{}
|
||||||
handler.CheckWorkflowRestart(logCtx, app)
|
|
||||||
|
|
||||||
_, taskRunner, err := handler.GenerateApplicationSteps(logCtx, app, appParser, af)
|
_, taskRunner, err := handler.GenerateApplicationSteps(logCtx, app, appParser, af)
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
@@ -162,7 +161,6 @@ var _ = Describe("Test Application workflow generator", func() {
|
|||||||
|
|
||||||
logCtx := monitorContext.NewTraceContext(ctx, "")
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
handler.currentAppRev = &oamcore.ApplicationRevision{}
|
handler.currentAppRev = &oamcore.ApplicationRevision{}
|
||||||
handler.CheckWorkflowRestart(logCtx, app)
|
|
||||||
_, taskRunner, err := handler.GenerateApplicationSteps(logCtx, app, appParser, af)
|
_, taskRunner, err := handler.GenerateApplicationSteps(logCtx, app, appParser, af)
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
Expect(len(taskRunner)).Should(BeEquivalentTo(2))
|
Expect(len(taskRunner)).Should(BeEquivalentTo(2))
|
||||||
@@ -204,7 +202,6 @@ var _ = Describe("Test Application workflow generator", func() {
|
|||||||
|
|
||||||
logCtx := monitorContext.NewTraceContext(ctx, "")
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
handler.currentAppRev = &oamcore.ApplicationRevision{}
|
handler.currentAppRev = &oamcore.ApplicationRevision{}
|
||||||
handler.CheckWorkflowRestart(logCtx, app)
|
|
||||||
_, taskRunner, err := handler.GenerateApplicationSteps(logCtx, app, appParser, af)
|
_, taskRunner, err := handler.GenerateApplicationSteps(logCtx, app, appParser, af)
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
Expect(len(taskRunner)).Should(BeEquivalentTo(2))
|
Expect(len(taskRunner)).Should(BeEquivalentTo(2))
|
||||||
@@ -246,7 +243,6 @@ var _ = Describe("Test Application workflow generator", func() {
|
|||||||
|
|
||||||
logCtx := monitorContext.NewTraceContext(ctx, "")
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
handler.currentAppRev = &oamcore.ApplicationRevision{}
|
handler.currentAppRev = &oamcore.ApplicationRevision{}
|
||||||
handler.CheckWorkflowRestart(logCtx, app)
|
|
||||||
_, _, err = handler.GenerateApplicationSteps(logCtx, app, appParser, af)
|
_, _, err = handler.GenerateApplicationSteps(logCtx, app, appParser, af)
|
||||||
Expect(err).NotTo(BeNil())
|
Expect(err).NotTo(BeNil())
|
||||||
})
|
})
|
||||||
@@ -285,7 +281,6 @@ var _ = Describe("Test Application workflow generator", func() {
|
|||||||
|
|
||||||
logCtx := monitorContext.NewTraceContext(ctx, "")
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
handler.currentAppRev = &oamcore.ApplicationRevision{}
|
handler.currentAppRev = &oamcore.ApplicationRevision{}
|
||||||
handler.CheckWorkflowRestart(logCtx, app)
|
|
||||||
_, _, err = handler.GenerateApplicationSteps(logCtx, app, appParser, af)
|
_, _, err = handler.GenerateApplicationSteps(logCtx, app, appParser, af)
|
||||||
Expect(err).NotTo(BeNil())
|
Expect(err).NotTo(BeNil())
|
||||||
})
|
})
|
||||||
@@ -316,4 +311,410 @@ var _ = Describe("Test Application workflow generator", func() {
|
|||||||
Expect(ctxData.AppLabels).To(BeNil())
|
Expect(ctxData.AppLabels).To(BeNil())
|
||||||
Expect(ctxData.AppAnnotations).To(BeNil())
|
Expect(ctxData.AppAnnotations).To(BeNil())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// NOTE: Workflow restart tests have been migrated to workflow_restart_test.go
|
||||||
|
// They test the new annotation-based restart functionality:
|
||||||
|
// - handleWorkflowRestartAnnotation() - parses annotation and sets status field
|
||||||
|
// - checkWorkflowRestart() - triggers restart based on status field
|
||||||
|
|
||||||
|
/*
|
||||||
|
// Original tests commented out below for reference - DO NOT UNCOMMENT
|
||||||
|
// See workflow_restart_test.go for the new tests
|
||||||
|
/*
|
||||||
|
It("Test workflow restart via annotation with immediate restart", func() {
|
||||||
|
// Use a past timestamp for immediate restart
|
||||||
|
pastTime := time.Now().Add(-1 * time.Hour)
|
||||||
|
pastTimeStr := pastTime.Format(time.RFC3339)
|
||||||
|
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-restart-annotation",
|
||||||
|
Namespace: namespaceName,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
oam.AnnotationWorkflowRestart: pastTimeStr, // Past timestamp = immediate
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{
|
||||||
|
Name: "myweb",
|
||||||
|
Type: "worker-with-health",
|
||||||
|
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-with-restart-annotation-v1",
|
||||||
|
Finished: true,
|
||||||
|
},
|
||||||
|
Services: []common.ApplicationComponentStatus{
|
||||||
|
{Name: "myweb", Healthy: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
handler, err := NewAppHandler(ctx, reconciler, app)
|
||||||
|
Expect(err).Should(Succeed())
|
||||||
|
|
||||||
|
appRev := &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-restart-annotation-v2",
|
||||||
|
Namespace: namespaceName,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
handler.currentAppRev = appRev
|
||||||
|
handler.latestAppRev = appRev
|
||||||
|
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
|
||||||
|
// Before annotation handling
|
||||||
|
Expect(app.Annotations).To(HaveKey(oam.AnnotationWorkflowRestart))
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil())
|
||||||
|
Expect(app.Status.Workflow).NotTo(BeNil())
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeTrue())
|
||||||
|
Expect(app.Status.Services).To(HaveLen(1))
|
||||||
|
|
||||||
|
// Simulate controller processing annotation - sets status field (annotation removed by controller)
|
||||||
|
app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: pastTime}
|
||||||
|
delete(app.Annotations, oam.AnnotationWorkflowRestart)
|
||||||
|
|
||||||
|
// Status field set, annotation removed (done by controller)
|
||||||
|
Expect(app.Annotations).NotTo(HaveKey(oam.AnnotationWorkflowRestart))
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", pastTime, 1*time.Second))
|
||||||
|
|
||||||
|
// Check workflow restart - should trigger restart because time has passed
|
||||||
|
handler.CheckWorkflowRestart(logCtx, app)
|
||||||
|
|
||||||
|
// After restart - status field cleared, workflow restarted
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) // Status field cleared
|
||||||
|
Expect(app.Status.Workflow).NotTo(BeNil())
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-with-restart-annotation-v2"))
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeFalse()) // Workflow reset
|
||||||
|
Expect(app.Status.Services).To(BeNil()) // Services cleared
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart via annotation with past timestamp", func() {
|
||||||
|
pastTime := time.Now().Add(-1 * time.Hour)
|
||||||
|
pastTimeStr := pastTime.Format(time.RFC3339)
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-past-timestamp",
|
||||||
|
Namespace: namespaceName,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
oam.AnnotationWorkflowRestart: pastTimeStr,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{
|
||||||
|
Name: "myweb",
|
||||||
|
Type: "worker-with-health",
|
||||||
|
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
handler, err := NewAppHandler(ctx, reconciler, app)
|
||||||
|
Expect(err).Should(Succeed())
|
||||||
|
|
||||||
|
appRev := &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v2", Namespace: namespaceName},
|
||||||
|
}
|
||||||
|
handler.currentAppRev = appRev
|
||||||
|
handler.latestAppRev = appRev
|
||||||
|
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
|
||||||
|
// Simulate controller processing annotation
|
||||||
|
app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: pastTime}
|
||||||
|
delete(app.Annotations, oam.AnnotationWorkflowRestart)
|
||||||
|
|
||||||
|
Expect(app.Annotations).NotTo(HaveKey(oam.AnnotationWorkflowRestart)) // Annotation removed
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) // Status field set
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", pastTime, 1*time.Second))
|
||||||
|
|
||||||
|
// Trigger restart - should restart because time has passed
|
||||||
|
handler.CheckWorkflowRestart(logCtx, app)
|
||||||
|
|
||||||
|
// Status field cleared, workflow restarted
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) // Cleared after restart
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v2"))
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeFalse())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart via annotation with future timestamp", func() {
|
||||||
|
futureTime := time.Now().Add(1 * time.Hour)
|
||||||
|
futureTimeStr := futureTime.Format(time.RFC3339)
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-future-timestamp",
|
||||||
|
Namespace: namespaceName,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
oam.AnnotationWorkflowRestart: futureTimeStr,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{
|
||||||
|
Name: "myweb",
|
||||||
|
Type: "worker-with-health",
|
||||||
|
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: true,
|
||||||
|
},
|
||||||
|
Services: []common.ApplicationComponentStatus{
|
||||||
|
{Name: "myweb", Healthy: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
handler, err := NewAppHandler(ctx, reconciler, app)
|
||||||
|
Expect(err).Should(Succeed())
|
||||||
|
|
||||||
|
appRev := &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v2", Namespace: namespaceName},
|
||||||
|
}
|
||||||
|
handler.currentAppRev = appRev
|
||||||
|
handler.latestAppRev = appRev
|
||||||
|
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
|
||||||
|
// Simulate controller processing annotation
|
||||||
|
app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: futureTime}
|
||||||
|
delete(app.Annotations, oam.AnnotationWorkflowRestart)
|
||||||
|
|
||||||
|
Expect(app.Annotations).NotTo(HaveKey(oam.AnnotationWorkflowRestart)) // Annotation removed
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) // Status field set
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", futureTime, 1*time.Second))
|
||||||
|
|
||||||
|
// Trigger check - should NOT restart because time hasn't arrived
|
||||||
|
handler.CheckWorkflowRestart(logCtx, app)
|
||||||
|
|
||||||
|
// Workflow NOT restarted - status field still present
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) // Status field remains (time not arrived)
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) // Still old revision
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeTrue()) // Still finished
|
||||||
|
Expect(app.Status.Services).To(HaveLen(1)) // Services not cleared
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart via annotation with duration", func() {
|
||||||
|
// Workflow finished 2 minutes ago
|
||||||
|
workflowEndTime := time.Now().Add(-2 * time.Minute)
|
||||||
|
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-duration",
|
||||||
|
Namespace: namespaceName,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
oam.AnnotationWorkflowRestart: "5m", // Restart 5 minutes after last completion
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{
|
||||||
|
Name: "myweb",
|
||||||
|
Type: "worker-with-health",
|
||||||
|
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: true,
|
||||||
|
EndTime: metav1.Time{Time: workflowEndTime},
|
||||||
|
},
|
||||||
|
Services: []common.ApplicationComponentStatus{
|
||||||
|
{Name: "myweb", Healthy: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
handler, err := NewAppHandler(ctx, reconciler, app)
|
||||||
|
Expect(err).Should(Succeed())
|
||||||
|
|
||||||
|
appRev := &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v2", Namespace: namespaceName},
|
||||||
|
}
|
||||||
|
handler.currentAppRev = appRev
|
||||||
|
handler.latestAppRev = appRev
|
||||||
|
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
|
||||||
|
// Before annotation handling
|
||||||
|
Expect(app.Annotations).To(HaveKey(oam.AnnotationWorkflowRestart))
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil())
|
||||||
|
|
||||||
|
// Simulate controller processing duration annotation
|
||||||
|
expectedTime := workflowEndTime.Add(5 * time.Minute) // Last end + 5m = 3m from now
|
||||||
|
app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: expectedTime}
|
||||||
|
// For durations, annotation persists (not removed by controller)
|
||||||
|
|
||||||
|
// For durations, annotation PERSISTS (recurring behavior), status field set
|
||||||
|
Expect(app.Annotations).To(HaveKey(oam.AnnotationWorkflowRestart)) // Annotation KEPT for recurring
|
||||||
|
Expect(app.Annotations[oam.AnnotationWorkflowRestart]).To(Equal("5m"))
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", expectedTime, 1*time.Second))
|
||||||
|
|
||||||
|
// Check workflow restart - should NOT restart yet (time not arrived)
|
||||||
|
handler.CheckWorkflowRestart(logCtx, app)
|
||||||
|
|
||||||
|
// Status field still present, workflow NOT restarted
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) // Still old revision
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeTrue()) // Still finished
|
||||||
|
Expect(app.Status.Services).To(HaveLen(1)) // Services not cleared
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart with duration recurs after completion", func() {
|
||||||
|
// Initial workflow finished 10 minutes ago
|
||||||
|
firstEndTime := time.Now().Add(-10 * time.Minute)
|
||||||
|
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-recurring-duration",
|
||||||
|
Namespace: namespaceName,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
oam.AnnotationWorkflowRestart: "5m", // Recurring every 5m
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{
|
||||||
|
Name: "myweb",
|
||||||
|
Type: "worker-with-health",
|
||||||
|
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: true,
|
||||||
|
EndTime: metav1.Time{Time: firstEndTime},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
handler, err := NewAppHandler(ctx, reconciler, app)
|
||||||
|
Expect(err).Should(Succeed())
|
||||||
|
|
||||||
|
appRev := &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v2", Namespace: namespaceName},
|
||||||
|
}
|
||||||
|
handler.currentAppRev = appRev
|
||||||
|
handler.latestAppRev = appRev
|
||||||
|
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
|
||||||
|
// Simulate controller processing first scheduling: firstEndTime + 5m (5 minutes ago, ready to trigger)
|
||||||
|
firstScheduledTime := firstEndTime.Add(5 * time.Minute)
|
||||||
|
app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: firstScheduledTime}
|
||||||
|
// Duration annotation persists
|
||||||
|
Expect(app.Annotations).To(HaveKey(oam.AnnotationWorkflowRestart)) // Annotation persists
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", firstScheduledTime, 1*time.Second))
|
||||||
|
|
||||||
|
// Trigger restart (time has passed)
|
||||||
|
handler.CheckWorkflowRestart(logCtx, app)
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) // Cleared after restart
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v2"))
|
||||||
|
|
||||||
|
// Simulate workflow completing again (new EndTime)
|
||||||
|
secondEndTime := time.Now().Add(-2 * time.Minute)
|
||||||
|
app.Status.Workflow.Finished = true
|
||||||
|
app.Status.Workflow.EndTime = metav1.Time{Time: secondEndTime}
|
||||||
|
|
||||||
|
// Simulate controller processing second scheduling: should recalculate based on NEW EndTime
|
||||||
|
secondScheduledTime := secondEndTime.Add(5 * time.Minute) // 2 min ago + 5m = 3m from now
|
||||||
|
app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: secondScheduledTime}
|
||||||
|
Expect(app.Annotations).To(HaveKey(oam.AnnotationWorkflowRestart)) // Still persists
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) // Rescheduled
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", secondScheduledTime, 1*time.Second))
|
||||||
|
|
||||||
|
// This time it shouldn't trigger yet (time not arrived)
|
||||||
|
handler.CheckWorkflowRestart(logCtx, app)
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) // Still scheduled
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v2")) // No change
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart ignored when workflow is not finished", func() {
|
||||||
|
pastTime := time.Now().Add(-1 * time.Hour)
|
||||||
|
pastTimeStr := pastTime.Format(time.RFC3339)
|
||||||
|
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-running-workflow",
|
||||||
|
Namespace: namespaceName,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
oam.AnnotationWorkflowRestart: pastTimeStr,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{
|
||||||
|
Name: "myweb",
|
||||||
|
Type: "worker-with-health",
|
||||||
|
Properties: &runtime.RawExtension{Raw: []byte(`{"cmd":["sleep","1000"],"image":"busybox"}`)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: false, // Workflow is still running
|
||||||
|
},
|
||||||
|
Services: []common.ApplicationComponentStatus{
|
||||||
|
{Name: "myweb", Healthy: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
handler, err := NewAppHandler(ctx, reconciler, app)
|
||||||
|
Expect(err).Should(Succeed())
|
||||||
|
|
||||||
|
appRev := &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v2", Namespace: namespaceName},
|
||||||
|
}
|
||||||
|
handler.currentAppRev = appRev
|
||||||
|
handler.latestAppRev = appRev
|
||||||
|
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
|
||||||
|
// Simulate controller processing annotation
|
||||||
|
app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: pastTime}
|
||||||
|
delete(app.Annotations, oam.AnnotationWorkflowRestart)
|
||||||
|
|
||||||
|
Expect(app.Annotations).NotTo(HaveKey(oam.AnnotationWorkflowRestart)) // Annotation removed
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil()) // Status field set
|
||||||
|
|
||||||
|
// Check workflow restart - should be IGNORED because workflow not finished
|
||||||
|
handler.CheckWorkflowRestart(logCtx, app)
|
||||||
|
|
||||||
|
// Restart ignored - status field cleared but workflow NOT restarted
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil()) // Status field cleared (consumed)
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1")) // Still old revision
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeFalse()) // Still not finished
|
||||||
|
Expect(app.Status.Services).To(HaveLen(1)) // Services NOT cleared
|
||||||
|
})
|
||||||
|
*/
|
||||||
})
|
})
|
||||||
|
|||||||
206
pkg/controller/core.oam.dev/v1beta1/application/workflow.go
Normal file
206
pkg/controller/core.oam.dev/v1beta1/application/workflow.go
Normal file
@@ -0,0 +1,206 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2021 The KubeVela Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package application
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
|
monitorContext "github.com/kubevela/pkg/monitor/context"
|
||||||
|
|
||||||
|
"github.com/oam-dev/kubevela/apis/core.oam.dev/common"
|
||||||
|
"github.com/oam-dev/kubevela/apis/core.oam.dev/condition"
|
||||||
|
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
|
||||||
|
"github.com/oam-dev/kubevela/pkg/oam"
|
||||||
|
)
|
||||||
|
|
||||||
|
// handleWorkflowRestartAnnotation processes the app.oam.dev/restart-workflow annotation
|
||||||
|
// and converts it to status.workflowRestartScheduledAt for GitOps safety.
|
||||||
|
// For timestamps, it deletes the annotation after copying to status (persisted via Client.Update).
|
||||||
|
// For durations, it keeps the annotation and reschedules after each execution based on time comparison.
|
||||||
|
func (r *Reconciler) handleWorkflowRestartAnnotation(ctx context.Context, app *v1beta1.Application) {
|
||||||
|
if !metav1.HasAnnotation(app.ObjectMeta, oam.AnnotationWorkflowRestart) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
restartValue := app.Annotations[oam.AnnotationWorkflowRestart]
|
||||||
|
|
||||||
|
var scheduledTime time.Time
|
||||||
|
var isDuration bool
|
||||||
|
var statusFieldNeedsUpdate bool
|
||||||
|
|
||||||
|
if restartValue == "true" {
|
||||||
|
// "true" is a convenience value supplied for an immediate restart
|
||||||
|
scheduledTime = time.Now()
|
||||||
|
isDuration = false
|
||||||
|
statusFieldNeedsUpdate = true
|
||||||
|
} else if parsedTime, err := time.Parse(time.RFC3339, restartValue); err == nil {
|
||||||
|
// explicit timestamp - restart on first reconcile > time
|
||||||
|
scheduledTime = parsedTime
|
||||||
|
isDuration = false
|
||||||
|
statusFieldNeedsUpdate = true
|
||||||
|
} else if duration, err := time.ParseDuration(restartValue); err == nil {
|
||||||
|
// recurring duration - calculate relative to last successful workflow completion
|
||||||
|
baseTime := time.Now()
|
||||||
|
if app.Status.Workflow != nil && app.Status.Workflow.Finished && !app.Status.Workflow.EndTime.IsZero() {
|
||||||
|
baseTime = app.Status.Workflow.EndTime.Time
|
||||||
|
}
|
||||||
|
scheduledTime = baseTime.Add(duration)
|
||||||
|
isDuration = true
|
||||||
|
|
||||||
|
// Only update if status is nil OR the calculated value differs from current status
|
||||||
|
statusFieldNeedsUpdate = app.Status.WorkflowRestartScheduledAt == nil ||
|
||||||
|
!app.Status.WorkflowRestartScheduledAt.Time.Equal(scheduledTime)
|
||||||
|
} else {
|
||||||
|
klog.Warningf("Invalid workflow restart annotation value for Application %s/%s: %q. Expected 'true', RFC3339 timestamp, or duration (e.g., '5m', '1h')",
|
||||||
|
app.Namespace, app.Name, restartValue)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if statusFieldNeedsUpdate {
|
||||||
|
app.Status.WorkflowRestartScheduledAt = &metav1.Time{Time: scheduledTime}
|
||||||
|
if err := r.Status().Update(ctx, app); err != nil {
|
||||||
|
klog.Errorf("Failed to update workflow restart status for Application %s/%s: %v. Will retry on next reconcile.",
|
||||||
|
app.Namespace, app.Name, err)
|
||||||
|
// Don't fail reconciliation - will retry naturally on next reconcile
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// For timestamps, delete the annotation (one-time behavior)
|
||||||
|
// For durations, keep the annotation (recurring behavior)
|
||||||
|
if !isDuration {
|
||||||
|
delete(app.Annotations, oam.AnnotationWorkflowRestart)
|
||||||
|
if err := r.Client.Update(ctx, app); err != nil {
|
||||||
|
klog.Errorf("Failed to remove workflow restart annotation for Application %s/%s: %v. Will retry on next reconcile.",
|
||||||
|
app.Namespace, app.Name, err)
|
||||||
|
// Don't fail reconciliation - will retry naturally on next reconcile
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkWorkflowRestart checks if application workflow needs restart.
|
||||||
|
// Handles three restart scenarios:
|
||||||
|
// 1. Scheduled restart (via workflowRestartScheduledAt status field)
|
||||||
|
// 2. PublishVersion annotation change
|
||||||
|
// 3. Application revision change
|
||||||
|
func (r *Reconciler) checkWorkflowRestart(ctx monitorContext.Context, app *v1beta1.Application, handler *AppHandler) {
|
||||||
|
// Check for scheduled restart in status field
|
||||||
|
if app.Status.WorkflowRestartScheduledAt != nil {
|
||||||
|
restartTime := app.Status.WorkflowRestartScheduledAt.Time
|
||||||
|
|
||||||
|
if time.Now().Before(restartTime) {
|
||||||
|
// Not yet time to restart, skip for now
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if app.Status.Workflow == nil || !app.Status.Workflow.Finished {
|
||||||
|
// Workflow is still running or hasn't started - don't restart yet
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if app.Status.Workflow != nil && !app.Status.Workflow.EndTime.IsZero() {
|
||||||
|
lastEndTime := app.Status.Workflow.EndTime.Time
|
||||||
|
if !restartTime.After(lastEndTime) {
|
||||||
|
// Restart time is not after last execution, skip
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// All conditions met: time arrived, workflow finished, and restart time > last execution
|
||||||
|
// Clear the status field and proceed with restart
|
||||||
|
app.Status.WorkflowRestartScheduledAt = nil
|
||||||
|
if err := r.Status().Update(ctx, app); err != nil {
|
||||||
|
ctx.Error(err, "failed to clear workflow restart scheduled time")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if app.Status.Workflow != nil {
|
||||||
|
if handler.latestAppRev != nil && handler.latestAppRev.Status.Workflow == nil {
|
||||||
|
app.Status.Workflow.Terminated = true
|
||||||
|
app.Status.Workflow.Finished = true
|
||||||
|
if app.Status.Workflow.EndTime.IsZero() {
|
||||||
|
app.Status.Workflow.EndTime = metav1.Now()
|
||||||
|
}
|
||||||
|
handler.UpdateApplicationRevisionStatus(ctx, handler.latestAppRev, app.Status.Workflow)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
app.Status.Services = nil
|
||||||
|
app.Status.AppliedResources = nil
|
||||||
|
var reservedConditions []condition.Condition
|
||||||
|
for i, cond := range app.Status.Conditions {
|
||||||
|
condTpy, err := common.ParseApplicationConditionType(string(cond.Type))
|
||||||
|
if err == nil {
|
||||||
|
if condTpy <= common.RenderCondition {
|
||||||
|
reservedConditions = append(reservedConditions, app.Status.Conditions[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
app.Status.Conditions = reservedConditions
|
||||||
|
app.Status.Workflow = &common.WorkflowStatus{
|
||||||
|
AppRevision: handler.currentAppRev.Name,
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for revision-based restart (publishVersion or normal revision change)
|
||||||
|
desiredRev, currentRev := handler.currentAppRev.Name, ""
|
||||||
|
if app.Status.Workflow != nil {
|
||||||
|
currentRev = app.Status.Workflow.AppRevision
|
||||||
|
}
|
||||||
|
if metav1.HasAnnotation(app.ObjectMeta, oam.AnnotationPublishVersion) {
|
||||||
|
desiredRev = app.GetAnnotations()[oam.AnnotationPublishVersion]
|
||||||
|
} else { // nolint
|
||||||
|
// backward compatibility
|
||||||
|
// legacy versions use <rev>:<hash> as currentRev, extract <rev>
|
||||||
|
if idx := strings.LastIndexAny(currentRev, ":"); idx >= 0 {
|
||||||
|
currentRev = currentRev[:idx]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if currentRev != "" && desiredRev == currentRev {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restart needed - record in revision and clean up
|
||||||
|
if app.Status.Workflow != nil {
|
||||||
|
if handler.latestAppRev != nil && handler.latestAppRev.Status.Workflow == nil {
|
||||||
|
app.Status.Workflow.Terminated = true
|
||||||
|
app.Status.Workflow.Finished = true
|
||||||
|
if app.Status.Workflow.EndTime.IsZero() {
|
||||||
|
app.Status.Workflow.EndTime = metav1.Now()
|
||||||
|
}
|
||||||
|
handler.UpdateApplicationRevisionStatus(ctx, handler.latestAppRev, app.Status.Workflow)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
app.Status.Services = nil
|
||||||
|
app.Status.AppliedResources = nil
|
||||||
|
var reservedConditions []condition.Condition
|
||||||
|
for i, cond := range app.Status.Conditions {
|
||||||
|
condTpy, err := common.ParseApplicationConditionType(string(cond.Type))
|
||||||
|
if err == nil {
|
||||||
|
if condTpy <= common.RenderCondition {
|
||||||
|
reservedConditions = append(reservedConditions, app.Status.Conditions[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
app.Status.Conditions = reservedConditions
|
||||||
|
app.Status.Workflow = &common.WorkflowStatus{
|
||||||
|
AppRevision: desiredRev,
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -33,9 +33,11 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||||
"sigs.k8s.io/yaml"
|
"sigs.k8s.io/yaml"
|
||||||
|
|
||||||
|
monitorContext "github.com/kubevela/pkg/monitor/context"
|
||||||
workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1"
|
workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1"
|
||||||
wfTypes "github.com/kubevela/workflow/pkg/types"
|
wfTypes "github.com/kubevela/workflow/pkg/types"
|
||||||
|
|
||||||
@@ -836,3 +838,468 @@ spec:
|
|||||||
}
|
}
|
||||||
`
|
`
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Test workflow restart annotation functionality", func() {
|
||||||
|
var ctx context.Context
|
||||||
|
var namespace string
|
||||||
|
var reconciler *Reconciler
|
||||||
|
var scheme *runtime.Scheme
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
ctx = context.Background()
|
||||||
|
namespace = "test-workflow-restart"
|
||||||
|
|
||||||
|
scheme = runtime.NewScheme()
|
||||||
|
Expect(corev1.AddToScheme(scheme)).Should(Succeed())
|
||||||
|
Expect(oamcore.AddToScheme(scheme)).Should(Succeed())
|
||||||
|
|
||||||
|
fakeClient := fake.NewClientBuilder().
|
||||||
|
WithScheme(scheme).
|
||||||
|
WithStatusSubresource(&oamcore.Application{}).
|
||||||
|
Build()
|
||||||
|
|
||||||
|
reconciler = &Reconciler{
|
||||||
|
Client: fakeClient,
|
||||||
|
Scheme: scheme,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart when scheduled time is past but newer than last execution", func() {
|
||||||
|
pastTime := time.Now().Add(-1 * time.Hour)
|
||||||
|
pastTimeStr := pastTime.Format(time.RFC3339)
|
||||||
|
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-restart-annotation",
|
||||||
|
Namespace: namespace,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"app.oam.dev/restart-workflow": pastTimeStr,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{Name: "myweb", Type: "worker"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: true,
|
||||||
|
// EndTime is 2 hours ago - BEFORE the restart time (1 hour ago)
|
||||||
|
EndTime: metav1.Time{Time: time.Now().Add(-2 * time.Hour)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(reconciler.Client.Create(ctx, app)).Should(Succeed())
|
||||||
|
|
||||||
|
reconciler.handleWorkflowRestartAnnotation(ctx, app)
|
||||||
|
|
||||||
|
Expect(app.Annotations).NotTo(HaveKey("app.oam.dev/restart-workflow"))
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", pastTime, 1*time.Second))
|
||||||
|
|
||||||
|
handler := &AppHandler{
|
||||||
|
currentAppRev: &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v1"},
|
||||||
|
},
|
||||||
|
latestAppRev: &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v1"},
|
||||||
|
Status: oamcore.ApplicationRevisionStatus{
|
||||||
|
// Set non-nil to avoid UpdateApplicationRevisionStatus call in tests
|
||||||
|
Workflow: &common.WorkflowStatus{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
reconciler.checkWorkflowRestart(logCtx, app, handler)
|
||||||
|
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil())
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1"))
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeFalse())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart with 'true' annotation triggers immediate restart", func() {
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-true-restart",
|
||||||
|
Namespace: namespace,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"app.oam.dev/restart-workflow": "true",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{Name: "myweb", Type: "worker"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: true,
|
||||||
|
EndTime: metav1.Time{Time: time.Now().Add(-10 * time.Minute)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(reconciler.Client.Create(ctx, app)).Should(Succeed())
|
||||||
|
|
||||||
|
reconciler.handleWorkflowRestartAnnotation(ctx, app)
|
||||||
|
|
||||||
|
Expect(app.Annotations).NotTo(HaveKey("app.oam.dev/restart-workflow"))
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", time.Now(), 2*time.Second))
|
||||||
|
|
||||||
|
handler := &AppHandler{
|
||||||
|
currentAppRev: &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v1"},
|
||||||
|
},
|
||||||
|
latestAppRev: &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v1"},
|
||||||
|
Status: oamcore.ApplicationRevisionStatus{
|
||||||
|
// Set non-nil to avoid UpdateApplicationRevisionStatus call in tests
|
||||||
|
Workflow: &common.WorkflowStatus{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
reconciler.checkWorkflowRestart(logCtx, app, handler)
|
||||||
|
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil())
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart with future timestamp (should NOT restart)", func() {
|
||||||
|
futureTime := time.Now().Add(1 * time.Hour)
|
||||||
|
futureTimeStr := futureTime.Format(time.RFC3339)
|
||||||
|
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-future-timestamp",
|
||||||
|
Namespace: namespace,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"app.oam.dev/restart-workflow": futureTimeStr,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{Name: "myweb", Type: "worker"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: true,
|
||||||
|
EndTime: metav1.Time{Time: time.Now().Add(-10 * time.Minute)},
|
||||||
|
},
|
||||||
|
Services: []common.ApplicationComponentStatus{
|
||||||
|
{Name: "myweb", Healthy: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(reconciler.Client.Create(ctx, app)).Should(Succeed())
|
||||||
|
|
||||||
|
reconciler.handleWorkflowRestartAnnotation(ctx, app)
|
||||||
|
|
||||||
|
Expect(app.Annotations).NotTo(HaveKey("app.oam.dev/restart-workflow"))
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
|
||||||
|
handler := &AppHandler{
|
||||||
|
currentAppRev: &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
reconciler.checkWorkflowRestart(logCtx, app, handler)
|
||||||
|
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1"))
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeTrue())
|
||||||
|
Expect(app.Status.Services).To(HaveLen(1))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart with duration (not yet time)", func() {
|
||||||
|
// Workflow finished 2 minutes ago
|
||||||
|
workflowEndTime := time.Now().Add(-2 * time.Minute)
|
||||||
|
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-duration",
|
||||||
|
Namespace: namespace,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"app.oam.dev/restart-workflow": "5m", // Restart 5 minutes after completion
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{Name: "myweb", Type: "worker"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: true,
|
||||||
|
EndTime: metav1.Time{Time: workflowEndTime},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(reconciler.Client.Create(ctx, app)).Should(Succeed())
|
||||||
|
|
||||||
|
reconciler.handleWorkflowRestartAnnotation(ctx, app)
|
||||||
|
|
||||||
|
Expect(app.Annotations).To(HaveKey("app.oam.dev/restart-workflow"))
|
||||||
|
Expect(app.Annotations["app.oam.dev/restart-workflow"]).To(Equal("5m"))
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
expectedTime := workflowEndTime.Add(5 * time.Minute)
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", expectedTime, 1*time.Second))
|
||||||
|
|
||||||
|
handler := &AppHandler{
|
||||||
|
currentAppRev: &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
reconciler.checkWorkflowRestart(logCtx, app, handler)
|
||||||
|
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1"))
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeTrue())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart with duration (recurring)", func() {
|
||||||
|
// Initial workflow finished 10 minutes ago
|
||||||
|
firstEndTime := time.Now().Add(-10 * time.Minute)
|
||||||
|
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-recurring-duration",
|
||||||
|
Namespace: namespace,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"app.oam.dev/restart-workflow": "5m",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{Name: "myweb", Type: "worker"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: true,
|
||||||
|
EndTime: metav1.Time{Time: firstEndTime},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(reconciler.Client.Create(ctx, app)).Should(Succeed())
|
||||||
|
|
||||||
|
// First scheduling: 10 min ago + 5m = 5 min ago (ready to trigger)
|
||||||
|
reconciler.handleWorkflowRestartAnnotation(ctx, app)
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
|
||||||
|
handler := &AppHandler{
|
||||||
|
currentAppRev: &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v1"},
|
||||||
|
},
|
||||||
|
latestAppRev: &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v1"},
|
||||||
|
Status: oamcore.ApplicationRevisionStatus{
|
||||||
|
// Set non-nil to avoid UpdateApplicationRevisionStatus call in tests
|
||||||
|
Workflow: &common.WorkflowStatus{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
reconciler.checkWorkflowRestart(logCtx, app, handler)
|
||||||
|
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil())
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1"))
|
||||||
|
|
||||||
|
// Simulate workflow completing again (2 minutes ago)
|
||||||
|
secondEndTime := time.Now().Add(-2 * time.Minute)
|
||||||
|
app.Status.Workflow.Finished = true
|
||||||
|
app.Status.Workflow.EndTime = metav1.Time{Time: secondEndTime}
|
||||||
|
|
||||||
|
reconciler.handleWorkflowRestartAnnotation(ctx, app)
|
||||||
|
Expect(app.Annotations).To(HaveKey("app.oam.dev/restart-workflow"))
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
secondScheduledTime := secondEndTime.Add(5 * time.Minute)
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt.Time).To(BeTemporally("~", secondScheduledTime, 1*time.Second))
|
||||||
|
|
||||||
|
reconciler.checkWorkflowRestart(logCtx, app, handler)
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart ignored when workflow not finished", func() {
|
||||||
|
pastTime := time.Now().Add(-1 * time.Hour)
|
||||||
|
pastTimeStr := pastTime.Format(time.RFC3339)
|
||||||
|
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-running-workflow",
|
||||||
|
Namespace: namespace,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"app.oam.dev/restart-workflow": pastTimeStr,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{Name: "myweb", Type: "worker"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: false, // Workflow still running
|
||||||
|
},
|
||||||
|
Services: []common.ApplicationComponentStatus{
|
||||||
|
{Name: "myweb", Healthy: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(reconciler.Client.Create(ctx, app)).Should(Succeed())
|
||||||
|
|
||||||
|
reconciler.handleWorkflowRestartAnnotation(ctx, app)
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
|
||||||
|
handler := &AppHandler{
|
||||||
|
currentAppRev: &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
reconciler.checkWorkflowRestart(logCtx, app, handler)
|
||||||
|
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1"))
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeFalse())
|
||||||
|
Expect(app.Status.Services).To(HaveLen(1))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart prevents duplicate restarts (restartTime <= lastEndTime)", func() {
|
||||||
|
// Workflow finished 5 minutes ago
|
||||||
|
lastEndTime := time.Now().Add(-5 * time.Minute)
|
||||||
|
// Restart scheduled for 10 minutes ago (already passed, but < lastEndTime)
|
||||||
|
restartTime := time.Now().Add(-10 * time.Minute)
|
||||||
|
restartTimeStr := restartTime.Format(time.RFC3339)
|
||||||
|
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-old-restart-time",
|
||||||
|
Namespace: namespace,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"app.oam.dev/restart-workflow": restartTimeStr,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{Name: "myweb", Type: "worker"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: true,
|
||||||
|
EndTime: metav1.Time{Time: lastEndTime},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(reconciler.Client.Create(ctx, app)).Should(Succeed())
|
||||||
|
|
||||||
|
reconciler.handleWorkflowRestartAnnotation(ctx, app)
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
|
||||||
|
handler := &AppHandler{
|
||||||
|
currentAppRev: &oamcore.ApplicationRevision{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "app-v1"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
logCtx := monitorContext.NewTraceContext(ctx, "")
|
||||||
|
reconciler.checkWorkflowRestart(logCtx, app, handler)
|
||||||
|
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).NotTo(BeNil())
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1"))
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeTrue())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart with invalid timestamp format (should be ignored with warning)", func() {
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-invalid-timestamp",
|
||||||
|
Namespace: namespace,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"app.oam.dev/restart-workflow": "2025-13-45T99:99:99Z", // Invalid timestamp
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{Name: "myweb", Type: "worker"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(reconciler.Client.Create(ctx, app)).Should(Succeed())
|
||||||
|
|
||||||
|
reconciler.handleWorkflowRestartAnnotation(ctx, app)
|
||||||
|
|
||||||
|
Expect(app.Annotations).To(HaveKey("app.oam.dev/restart-workflow"))
|
||||||
|
Expect(app.Annotations["app.oam.dev/restart-workflow"]).To(Equal("2025-13-45T99:99:99Z"))
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil())
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1"))
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeTrue())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("Test workflow restart with invalid duration format (should be ignored with warning)", func() {
|
||||||
|
app := &oamcore.Application{
|
||||||
|
TypeMeta: metav1.TypeMeta{Kind: "Application", APIVersion: "core.oam.dev/v1beta1"},
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "app-with-invalid-duration",
|
||||||
|
Namespace: namespace,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
"app.oam.dev/restart-workflow": "5xyz", // Invalid duration
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: oamcore.ApplicationSpec{
|
||||||
|
Components: []common.ApplicationComponent{
|
||||||
|
{Name: "myweb", Type: "worker"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: common.AppStatus{
|
||||||
|
Workflow: &common.WorkflowStatus{
|
||||||
|
AppRevision: "app-v1",
|
||||||
|
Finished: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(reconciler.Client.Create(ctx, app)).Should(Succeed())
|
||||||
|
|
||||||
|
reconciler.handleWorkflowRestartAnnotation(ctx, app)
|
||||||
|
|
||||||
|
Expect(app.Annotations).To(HaveKey("app.oam.dev/restart-workflow"))
|
||||||
|
Expect(app.Annotations["app.oam.dev/restart-workflow"]).To(Equal("5xyz"))
|
||||||
|
Expect(app.Status.WorkflowRestartScheduledAt).To(BeNil())
|
||||||
|
Expect(app.Status.Workflow.AppRevision).To(Equal("app-v1"))
|
||||||
|
Expect(app.Status.Workflow.Finished).To(BeTrue())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|||||||
@@ -162,6 +162,16 @@ const (
|
|||||||
// AnnotationWorkflowName specifies the workflow name for execution.
|
// AnnotationWorkflowName specifies the workflow name for execution.
|
||||||
AnnotationWorkflowName = "app.oam.dev/workflowName"
|
AnnotationWorkflowName = "app.oam.dev/workflowName"
|
||||||
|
|
||||||
|
// AnnotationWorkflowRestart triggers a workflow restart when set. Supported values:
|
||||||
|
// - "true": Immediate restart (sets restart time to current time + 1 second).
|
||||||
|
// Annotation is automatically removed after being processed.
|
||||||
|
// - RFC3339 timestamp (e.g., "2025-01-15T14:30:00Z"): One-time restart at specified time.
|
||||||
|
// Annotation is automatically removed after being processed.
|
||||||
|
// - Duration (e.g., "5m", "1h", "30s"): Recurring restart with minimum interval after each completion.
|
||||||
|
// Annotation persists; automatically reschedules after each workflow completion.
|
||||||
|
// All modes are GitOps-safe: the schedule is stored in status.workflowRestartScheduledAt.
|
||||||
|
AnnotationWorkflowRestart = "app.oam.dev/restart-workflow"
|
||||||
|
|
||||||
// AnnotationAppName specifies the name for application in db.
|
// AnnotationAppName specifies the name for application in db.
|
||||||
// Note: the annotation is only created by velaUX, please don't use it in other Source of Truth.
|
// Note: the annotation is only created by velaUX, please don't use it in other Source of Truth.
|
||||||
AnnotationAppName = "app.oam.dev/appName"
|
AnnotationAppName = "app.oam.dev/appName"
|
||||||
|
|||||||
@@ -0,0 +1,81 @@
|
|||||||
|
Schedule workflow restarts to enable periodic tasks, delayed execution, or time-based orchestration. The step uses exactly one of three timing modes: `at` for a specific timestamp, `after` for a relative delay, or `every` for recurring intervals.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# Example 1: Fixed timestamp - restart at specific time
|
||||||
|
apiVersion: core.oam.dev/v1beta1
|
||||||
|
kind: Application
|
||||||
|
metadata:
|
||||||
|
name: scheduled-app
|
||||||
|
namespace: default
|
||||||
|
spec:
|
||||||
|
components:
|
||||||
|
- name: my-component
|
||||||
|
type: webservice
|
||||||
|
properties:
|
||||||
|
image: nginx:latest
|
||||||
|
port: 80
|
||||||
|
workflow:
|
||||||
|
steps:
|
||||||
|
- name: deploy
|
||||||
|
type: apply-component
|
||||||
|
properties:
|
||||||
|
component: my-component
|
||||||
|
- name: schedule-restart
|
||||||
|
type: restart-workflow
|
||||||
|
properties:
|
||||||
|
at: "2025-01-20T15:00:00Z"
|
||||||
|
---
|
||||||
|
# Example 2: Relative delay - restart after duration
|
||||||
|
apiVersion: core.oam.dev/v1beta1
|
||||||
|
kind: Application
|
||||||
|
metadata:
|
||||||
|
name: delayed-restart-app
|
||||||
|
namespace: default
|
||||||
|
spec:
|
||||||
|
components:
|
||||||
|
- name: batch-processor
|
||||||
|
type: webservice
|
||||||
|
properties:
|
||||||
|
image: myapp/batch-processor:v1
|
||||||
|
port: 8080
|
||||||
|
workflow:
|
||||||
|
steps:
|
||||||
|
- name: deploy
|
||||||
|
type: apply-component
|
||||||
|
properties:
|
||||||
|
component: batch-processor
|
||||||
|
- name: schedule-restart-after
|
||||||
|
type: restart-workflow
|
||||||
|
properties:
|
||||||
|
after: "1h"
|
||||||
|
---
|
||||||
|
# Example 3: Recurring - restart every interval
|
||||||
|
apiVersion: core.oam.dev/v1beta1
|
||||||
|
kind: Application
|
||||||
|
metadata:
|
||||||
|
name: periodic-sync-app
|
||||||
|
namespace: default
|
||||||
|
spec:
|
||||||
|
components:
|
||||||
|
- name: data-sync
|
||||||
|
type: webservice
|
||||||
|
properties:
|
||||||
|
image: myapp/data-sync:v1
|
||||||
|
port: 8080
|
||||||
|
workflow:
|
||||||
|
steps:
|
||||||
|
- name: deploy
|
||||||
|
type: apply-component
|
||||||
|
properties:
|
||||||
|
component: data-sync
|
||||||
|
- name: schedule-recurring-restart
|
||||||
|
type: restart-workflow
|
||||||
|
properties:
|
||||||
|
every: "24h"
|
||||||
|
```
|
||||||
|
|
||||||
|
**Use cases:**
|
||||||
|
|
||||||
|
- **Periodic tasks**: Schedule recurring workflow execution for data synchronization, batch processing, or scheduled maintenance
|
||||||
|
- **Delayed deployment**: Add a delay after initial deployment before triggering workflow restart
|
||||||
|
- **Time-based orchestration**: Coordinate workflows to run at specific times across multiple applications
|
||||||
@@ -0,0 +1,118 @@
|
|||||||
|
import (
|
||||||
|
"vela/kube"
|
||||||
|
"vela/builtin"
|
||||||
|
)
|
||||||
|
|
||||||
|
"restart-workflow": {
|
||||||
|
type: "workflow-step"
|
||||||
|
annotations: {
|
||||||
|
"category": "Workflow Control"
|
||||||
|
}
|
||||||
|
labels: {
|
||||||
|
"scope": "Application"
|
||||||
|
}
|
||||||
|
description: "Schedule the current Application's workflow to restart at a specific time, after a duration, or at recurring intervals"
|
||||||
|
}
|
||||||
|
template: {
|
||||||
|
// Count how many parameters are provided
|
||||||
|
_paramCount: len([
|
||||||
|
if parameter.at != _|_ {1},
|
||||||
|
if parameter.after != _|_ {1},
|
||||||
|
if parameter.every != _|_ {1},
|
||||||
|
])
|
||||||
|
|
||||||
|
// Fail if not exactly one parameter is provided
|
||||||
|
if _paramCount != 1 {
|
||||||
|
validateParams: builtin.#Fail & {
|
||||||
|
$params: {
|
||||||
|
message: "Exactly one of 'at', 'after', or 'every' parameters must be specified (found \(_paramCount))"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the bash script to calculate annotation value
|
||||||
|
_script: string
|
||||||
|
if parameter.at != _|_ {
|
||||||
|
// Fixed timestamp mode - use as-is
|
||||||
|
_script: """
|
||||||
|
VALUE="\(parameter.at)"
|
||||||
|
kubectl annotate application \(context.name) -n \(context.namespace) app.oam.dev/restart-workflow="$VALUE" --overwrite
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
if parameter.after != _|_ {
|
||||||
|
// Relative time mode - calculate timestamp using date
|
||||||
|
// Convert duration format (5m, 1h, 2d) to seconds, then calculate
|
||||||
|
_script: """
|
||||||
|
DURATION="\(parameter.after)"
|
||||||
|
|
||||||
|
# Convert duration to seconds
|
||||||
|
SECONDS=0
|
||||||
|
if [[ "$DURATION" =~ ^([0-9]+)m$ ]]; then
|
||||||
|
SECONDS=$((${BASH_REMATCH[1]} * 60))
|
||||||
|
elif [[ "$DURATION" =~ ^([0-9]+)h$ ]]; then
|
||||||
|
SECONDS=$((${BASH_REMATCH[1]} * 3600))
|
||||||
|
elif [[ "$DURATION" =~ ^([0-9]+)d$ ]]; then
|
||||||
|
SECONDS=$((${BASH_REMATCH[1]} * 86400))
|
||||||
|
else
|
||||||
|
echo "ERROR: Invalid duration format: $DURATION (expected format: 5m, 1h, or 2d)"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Calculate future timestamp using seconds offset
|
||||||
|
VALUE=$(date -u -d "@$(($(date +%s) + SECONDS))" +%Y-%m-%dT%H:%M:%SZ)
|
||||||
|
echo "Calculated timestamp for after '$DURATION' ($SECONDS seconds): $VALUE"
|
||||||
|
kubectl annotate application \(context.name) -n \(context.namespace) app.oam.dev/restart-workflow="$VALUE" --overwrite
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
if parameter.every != _|_ {
|
||||||
|
// Recurring interval mode - pass duration directly
|
||||||
|
_script: """
|
||||||
|
VALUE="\(parameter.every)"
|
||||||
|
kubectl annotate application \(context.name) -n \(context.namespace) app.oam.dev/restart-workflow="$VALUE" --overwrite
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run kubectl to annotate the Application
|
||||||
|
job: kube.#Apply & {
|
||||||
|
$params: {
|
||||||
|
value: {
|
||||||
|
apiVersion: "batch/v1"
|
||||||
|
kind: "Job"
|
||||||
|
metadata: {
|
||||||
|
name: "\(context.name)-restart-workflow-\(context.stepSessionID)"
|
||||||
|
namespace: "vela-system"
|
||||||
|
}
|
||||||
|
spec: {
|
||||||
|
backoffLimit: 3
|
||||||
|
template: {
|
||||||
|
spec: {
|
||||||
|
containers: [{
|
||||||
|
name: "kubectl-annotate"
|
||||||
|
image: "bitnami/kubectl:latest"
|
||||||
|
command: ["/bin/sh", "-c"]
|
||||||
|
args: [_script]
|
||||||
|
}]
|
||||||
|
restartPolicy: "Never"
|
||||||
|
serviceAccountName: "kubevela-vela-core"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wait: builtin.#ConditionalWait & {
|
||||||
|
if job.$returns.value.status != _|_ if job.$returns.value.status.succeeded != _|_ {
|
||||||
|
$params: continue: job.$returns.value.status.succeeded > 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
parameter: {
|
||||||
|
// +usage=Schedule restart at a specific RFC3339 timestamp (e.g., "2025-01-15T14:30:00Z")
|
||||||
|
at?: string & =~"^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(\\.[0-9]+)?(Z|[+-][0-9]{2}:[0-9]{2})$"
|
||||||
|
// +usage=Schedule restart after a relative duration from now (e.g., "5m", "1h", "2d")
|
||||||
|
after?: string & =~"^[0-9]+(m|h|d)$"
|
||||||
|
// +usage=Schedule recurring restarts every specified duration (e.g., "5m", "1h", "24h")
|
||||||
|
every?: string & =~"^[0-9]+(m|h|d)$"
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user