mirror of
https://github.com/kubevela/kubevela.git
synced 2026-05-06 01:17:09 +00:00
This PR spells out the rollout states (#972)
* add rollout state transition: * address comments
This commit is contained in:
28
pkg/controller/common/rollout/rollout_plan_controller.go
Normal file
28
pkg/controller/common/rollout/rollout_plan_controller.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package rollout
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/oam-dev/kubevela/apis/standard.oam.dev/v1alpha1"
|
||||
"github.com/oam-dev/kubevela/pkg/controller/common/rollout/workloads"
|
||||
)
|
||||
|
||||
// ReconcileRolloutPlan generates the rollout plan and reconcile it
|
||||
func ReconcileRolloutPlan(ctx context.Context, client client.Client, rolloutSpec *v1alpha1.RolloutPlan,
|
||||
targetWorkload, sourceWorkload *unstructured.Unstructured, rolloutStatus *v1alpha1.RolloutStatus) (v1alpha1.RolloutStatus, error) {
|
||||
klog.InfoS("generate the rollout plan", "rollout Spec", rolloutSpec,
|
||||
"target workload", klog.KObj(targetWorkload))
|
||||
if sourceWorkload != nil {
|
||||
klog.InfoS("we will do rolling upgrades", "source workload", klog.KObj(sourceWorkload))
|
||||
}
|
||||
klog.Info("check the rollout status ", "rollout state", rolloutStatus.RollingState, "batch rolling state",
|
||||
rolloutStatus.BatchRollingState)
|
||||
|
||||
wf := workloads.NewWorkloadControllerFactory(ctx, client, rolloutSpec, targetWorkload, sourceWorkload)
|
||||
wf.GetController(targetWorkload.GroupVersionKind())
|
||||
return *rolloutStatus, nil
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
package rollout
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/oam-dev/kubevela/apis/standard.oam.dev/v1alpha1"
|
||||
)
|
||||
|
||||
// ReconcileRolloutPlan generates the rollout plan and reconcile it
|
||||
func ReconcileRolloutPlan(ctx context.Context, client client.Client, rolloutSpec *v1alpha1.RolloutPlan,
|
||||
targetWorkload, sourceWorkload *unstructured.Unstructured) error {
|
||||
klog.InfoS("generate the rollout plan", "rollout Spec", rolloutSpec,
|
||||
"target workload", klog.KObj(targetWorkload))
|
||||
return nil
|
||||
}
|
||||
210
pkg/controller/common/rollout/rollout_state.go
Normal file
210
pkg/controller/common/rollout/rollout_state.go
Normal file
@@ -0,0 +1,210 @@
|
||||
package rollout
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/oam-dev/kubevela/apis/standard.oam.dev/v1alpha1"
|
||||
)
|
||||
|
||||
type rolloutEvent string
|
||||
|
||||
const (
|
||||
// rollingSpecVerifiedEvent indicates that we have successfully verified that the rollout spec
|
||||
rollingSpecVerifiedEvent rolloutEvent = "rollingSpecVerifiedEvent"
|
||||
|
||||
// rollingInitializedEvent indicates that we have finished initializing all the workload resources
|
||||
rollingInitializedEvent rolloutEvent = "rollingInitializedEvent"
|
||||
|
||||
// allBatchFinishedEvent indicates that all batches are upgraded
|
||||
allBatchFinishedEvent rolloutEvent = "allBatchFinishedEvent"
|
||||
|
||||
// rollingFailedEvent indicates that the rolling is paused
|
||||
rollingPausedEvent rolloutEvent = "rollingFailedEvent"
|
||||
|
||||
// rollingResumedEvent indicates that the rolling is resumed
|
||||
rollingResumedEvent rolloutEvent = "rollingResumedEvent"
|
||||
|
||||
// rollingFinalizedEvent indicates that we have finalized the rollout which includes but not
|
||||
// limited to the resource garbage collection
|
||||
rollingFinalizedEvent rolloutEvent = "allBatchFinishedEvent"
|
||||
|
||||
// rollingFailedEvent indicates that we encountered an unexpected error during upgrading
|
||||
rollingFailedEvent rolloutEvent = "rollingFailedEvent"
|
||||
|
||||
// initializedOneBatchEvent indicates that we have successfully rolled out one batch
|
||||
initializedOneBatchEvent rolloutEvent = "initializedOneBatchEvent"
|
||||
|
||||
// finishedOneBatchEvent indicates that we have successfully rolled out one batch
|
||||
finishedOneBatchEvent rolloutEvent = "finishedOneBatchEvent"
|
||||
|
||||
// oneBatchAvailableEvent indicates that the batch resource is considered available
|
||||
// this events comes after we have examine the pod readiness check and traffic shifting if needed
|
||||
oneBatchAvailableEvent rolloutEvent = "OneBatchAvailable"
|
||||
|
||||
// batchRolloutContinueEvent indicates that we need to continue to upgrade the pods in the batch
|
||||
batchRolloutContinueEvent rolloutEvent = "batchRolloutContinueEvent"
|
||||
|
||||
// batchRolloutWaitingEvent indicates that we are waiting for the approval of resume one batch
|
||||
batchRolloutWaitingEvent rolloutEvent = "batchWaitRolloutEvent"
|
||||
|
||||
// batchRolloutApprovedEvent indicates that we are waiting for the approval of the
|
||||
batchRolloutApprovedEvent rolloutEvent = "batchWaitRolloutEvent"
|
||||
|
||||
// batchRolloutFailedEvent indicates that we are waiting for the approval of the
|
||||
batchRolloutFailedEvent rolloutEvent = "batchRolloutFailedEvent"
|
||||
|
||||
// workloadModifiedEvent indicates that the res
|
||||
workloadModifiedEvent rolloutEvent = "workloadModifiedEvent"
|
||||
)
|
||||
|
||||
const invalidRollingStateTransition = "the rollout state transition from `%s` state with `%s` is invalid"
|
||||
|
||||
const invalidBatchRollingStateTransition = "the batch rolling state transition from `%s` state with `%s` is invalid"
|
||||
|
||||
// StateMachineTransition is the center place to do rollout state transition
|
||||
// it returns an error if the transition is invalid
|
||||
// it changes the coming rollout state if it's valid
|
||||
func StateMachineTransition(rolloutStatus *v1alpha1.RolloutStatus, event rolloutEvent) error {
|
||||
rollingState := rolloutStatus.RollingState
|
||||
batchRollingState := rolloutStatus.BatchRollingState
|
||||
defer klog.InfoS("try to execute a rollout state transition",
|
||||
"pre rolling state", rollingState,
|
||||
"pre batch rolling state", batchRollingState,
|
||||
"post rolling state", rolloutStatus.RollingState,
|
||||
"post batch rolling state", rolloutStatus.BatchRollingState)
|
||||
|
||||
// we first process the global event
|
||||
if event == rollingFailedEvent {
|
||||
rolloutStatus.RollingState = v1alpha1.RolloutFailedState
|
||||
return nil
|
||||
}
|
||||
if event == rollingPausedEvent {
|
||||
rolloutStatus.RollingState = v1alpha1.PausedState
|
||||
return nil
|
||||
}
|
||||
|
||||
switch rollingState {
|
||||
case v1alpha1.VerifyingState:
|
||||
if event == rollingSpecVerifiedEvent {
|
||||
rolloutStatus.RollingState = v1alpha1.InitializingState
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(invalidRollingStateTransition, rollingState, event)
|
||||
|
||||
case v1alpha1.InitializingState:
|
||||
if event == rollingInitializedEvent {
|
||||
rolloutStatus.RollingState = v1alpha1.RollingInBatchesState
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(invalidRollingStateTransition, rollingState, event)
|
||||
|
||||
case v1alpha1.PausedState:
|
||||
if event == rollingResumedEvent {
|
||||
// we don't know where it was last time, need to start from beginning
|
||||
// since we don't change the batch rolling state when we pause
|
||||
// we should be able to resume if it was rolling before paused
|
||||
rolloutStatus.RollingState = v1alpha1.VerifyingState
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(invalidBatchRollingStateTransition, rollingState, event)
|
||||
|
||||
case v1alpha1.RollingInBatchesState:
|
||||
return batchStateTransition(rolloutStatus, batchRollingState, event)
|
||||
|
||||
case v1alpha1.FinalisingState:
|
||||
if event == rollingFinalizedEvent {
|
||||
rolloutStatus.RollingState = v1alpha1.RolloutSucceedState
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(invalidRollingStateTransition, rollingState, event)
|
||||
|
||||
case v1alpha1.RolloutSucceedState:
|
||||
if event == workloadModifiedEvent {
|
||||
rolloutStatus.RollingState = v1alpha1.VerifyingState
|
||||
return nil
|
||||
}
|
||||
if event == rollingFinalizedEvent {
|
||||
// no op
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(invalidRollingStateTransition, rollingState, event)
|
||||
|
||||
case v1alpha1.RolloutFailedState:
|
||||
if event == workloadModifiedEvent {
|
||||
rolloutStatus.RollingState = v1alpha1.VerifyingState
|
||||
return nil
|
||||
}
|
||||
if event == rollingFailedEvent {
|
||||
// no op
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(invalidRollingStateTransition, rollingState, event)
|
||||
|
||||
default:
|
||||
return fmt.Errorf("invalid rolling state %s", rollingState)
|
||||
}
|
||||
}
|
||||
|
||||
// batchStateTransition handles the state transition when the rollout is in action
|
||||
func batchStateTransition(rolloutStatus *v1alpha1.RolloutStatus,
|
||||
batchRollingState v1alpha1.BatchRollingState, event rolloutEvent) error {
|
||||
switch batchRollingState {
|
||||
case v1alpha1.BatchInitializingState:
|
||||
if event == initializedOneBatchEvent {
|
||||
rolloutStatus.BatchRollingState = v1alpha1.BatchInRollingState
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(invalidBatchRollingStateTransition, batchRollingState, event)
|
||||
|
||||
case v1alpha1.BatchInRollingState:
|
||||
if event == batchRolloutWaitingEvent {
|
||||
rolloutStatus.BatchRollingState = v1alpha1.BatchVerifyingState
|
||||
return nil
|
||||
}
|
||||
if event == batchRolloutContinueEvent {
|
||||
// no op
|
||||
return nil
|
||||
}
|
||||
if event == batchRolloutApprovedEvent {
|
||||
rolloutStatus.BatchRollingState = v1alpha1.BatchReadyState
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(invalidBatchRollingStateTransition, batchRollingState, event)
|
||||
|
||||
case v1alpha1.BatchVerifyingState:
|
||||
if event == batchRolloutApprovedEvent {
|
||||
rolloutStatus.BatchRollingState = v1alpha1.BatchReadyState
|
||||
return nil
|
||||
}
|
||||
if event == batchRolloutFailedEvent {
|
||||
rolloutStatus.BatchRollingState = v1alpha1.BatchVerifyFailedState
|
||||
rolloutStatus.RollingState = v1alpha1.RolloutFailedState
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(invalidBatchRollingStateTransition, batchRollingState, event)
|
||||
|
||||
case v1alpha1.BatchReadyState:
|
||||
if event == oneBatchAvailableEvent {
|
||||
rolloutStatus.BatchRollingState = v1alpha1.BatchAvailableState
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(invalidBatchRollingStateTransition, batchRollingState, event)
|
||||
|
||||
case v1alpha1.BatchAvailableState:
|
||||
if event == finishedOneBatchEvent {
|
||||
rolloutStatus.BatchRollingState = v1alpha1.BatchInitializingState
|
||||
return nil
|
||||
}
|
||||
if event == allBatchFinishedEvent {
|
||||
// transition out of the batch loop
|
||||
rolloutStatus.RollingState = v1alpha1.FinalisingState
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(invalidBatchRollingStateTransition, batchRollingState, event)
|
||||
|
||||
default:
|
||||
return fmt.Errorf("invalid batch rolling state %s", batchRollingState)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package workloads
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/oam-dev/kubevela/apis/standard.oam.dev/v1alpha1"
|
||||
)
|
||||
|
||||
// CloneSetController is responsible for handle Cloneset type of workloads
|
||||
type CloneSetController struct {
|
||||
client client.Client
|
||||
rolloutSpec *v1alpha1.RolloutPlan
|
||||
targetWorkload *unstructured.Unstructured
|
||||
}
|
||||
|
||||
// Initialize first verify that the cloneset status is compatible with the rollout spec
|
||||
// it then set the cloneset partition the same as the replicas (no new pod) and add an annotation
|
||||
func (c *CloneSetController) Initialize() (int32, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// RolloutPods calculates the number of pods we can upgrade once according to the rollout spec
|
||||
// and then set the partition accordingly
|
||||
func (c *CloneSetController) RolloutPods() (int32, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Finalize makes sure the Cloneset is all upgraded and
|
||||
func (c *CloneSetController) Finalize() error {
|
||||
return nil
|
||||
}
|
||||
28
pkg/controller/common/rollout/workloads/controller.go
Normal file
28
pkg/controller/common/rollout/workloads/controller.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package workloads
|
||||
|
||||
// WorkloadController is the interface that all type of workload controller implements
|
||||
type WorkloadController interface {
|
||||
// Initialize makes sure that the resources can be upgraded according to the rollout plan
|
||||
// it returns the number of available pods that are upgrade (with the new spec)
|
||||
Initialize() (int32, error)
|
||||
|
||||
// RolloutPods tries to upgrade pods in the resources following the rollout plan
|
||||
// it will upgrade as many pods as the rollout plan allows at once, the routine does not block on any operations.
|
||||
// Instead, we rely on the go-client's requeue mechanism to drive this towards the spec goal
|
||||
// it returns the number of pods upgraded in this round
|
||||
RolloutPods() (int32, error)
|
||||
|
||||
/*
|
||||
GetMetadata() (string, map[string]int32, error)
|
||||
|
||||
SyncStatus() error
|
||||
|
||||
SetStatusFailedChecks() error
|
||||
|
||||
ScaleToZero() error
|
||||
*/
|
||||
// Finalize makes sure the resources are in a good final state.
|
||||
// For example, we may remove the source object to prevent scalar traits to ever work
|
||||
// or we may add an annotation to indicate the upgrade finished time
|
||||
Finalize() error
|
||||
}
|
||||
47
pkg/controller/common/rollout/workloads/factory.go
Normal file
47
pkg/controller/common/rollout/workloads/factory.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package workloads
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/oam-dev/kubevela/apis/standard.oam.dev/v1alpha1"
|
||||
)
|
||||
|
||||
// WorkloadControllerFactory is the factory that creates controllers for different types of workload
|
||||
type WorkloadControllerFactory struct {
|
||||
client client.Client
|
||||
rolloutSpec *v1alpha1.RolloutPlan
|
||||
targetWorkload *unstructured.Unstructured
|
||||
sourceWorkload *unstructured.Unstructured
|
||||
}
|
||||
|
||||
// NewWorkloadControllerFactory creates a WorkloadControllerFactory
|
||||
func NewWorkloadControllerFactory(ctx context.Context, client client.Client, rolloutSpec *v1alpha1.RolloutPlan,
|
||||
targetWorkload, sourceWorkload *unstructured.Unstructured) *WorkloadControllerFactory {
|
||||
return &WorkloadControllerFactory{
|
||||
client: client,
|
||||
rolloutSpec: rolloutSpec,
|
||||
targetWorkload: targetWorkload,
|
||||
sourceWorkload: sourceWorkload,
|
||||
}
|
||||
}
|
||||
|
||||
// GetController generates the controller depends on the workload type
|
||||
func (f *WorkloadControllerFactory) GetController(kind schema.GroupVersionKind) WorkloadController {
|
||||
cloneSetCtrl := &CloneSetController{
|
||||
client: f.client,
|
||||
rolloutSpec: f.rolloutSpec,
|
||||
targetWorkload: f.targetWorkload,
|
||||
}
|
||||
|
||||
switch kind.Kind {
|
||||
case "CloneSet":
|
||||
return cloneSetCtrl
|
||||
|
||||
default:
|
||||
return cloneSetCtrl
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
corev1alpha2 "github.com/oam-dev/kubevela/apis/core.oam.dev/v1alpha2"
|
||||
"github.com/oam-dev/kubevela/apis/standard.oam.dev/v1alpha1"
|
||||
"github.com/oam-dev/kubevela/pkg/controller/common/rollout"
|
||||
controller "github.com/oam-dev/kubevela/pkg/controller/core.oam.dev"
|
||||
"github.com/oam-dev/kubevela/pkg/controller/core.oam.dev/v1alpha2/application"
|
||||
@@ -42,6 +43,7 @@ type Reconciler struct {
|
||||
// Reconcile is the main logic of applicationdeployment controller
|
||||
func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
|
||||
var appDeploy corev1alpha2.ApplicationDeployment
|
||||
requeueAfterTime := 5 * time.Second
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), reconcileTimeOut)
|
||||
defer cancel()
|
||||
if err := r.Get(ctx, req.NamespacedName, &appDeploy); err != nil {
|
||||
@@ -52,6 +54,7 @@ func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
|
||||
}
|
||||
klog.InfoS("Start to reconcile ", "application deployment", klog.KObj(&appDeploy))
|
||||
|
||||
// TODO: check if the target/source has changed
|
||||
r.handleFinalizer(&appDeploy)
|
||||
|
||||
// Get the target application
|
||||
@@ -88,7 +91,7 @@ func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
|
||||
klog.ErrorS(err, "cannot fetch the workloads to upgrade", "workload Type", workloadType,
|
||||
"workload GVK", *workloadGVK, "target application", klog.KRef(req.Namespace, targetAppName),
|
||||
"source application", klog.KRef(req.Namespace, sourceAppName))
|
||||
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||||
return ctrl.Result{RequeueAfter: requeueAfterTime}, client.IgnoreNotFound(err)
|
||||
}
|
||||
klog.InfoS("get the target workload we need to work on", "targetWorkload", klog.KObj(targetWorkload))
|
||||
|
||||
@@ -108,13 +111,20 @@ func (r *Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
|
||||
}
|
||||
|
||||
// reconcile the rollout part of the spec given the target and source workload
|
||||
err = rollout.ReconcileRolloutPlan(ctx, r, &appDeploy.Spec.RolloutPlan, targetWorkload, sourceWorkload)
|
||||
rolloutStatus, err := rollout.ReconcileRolloutPlan(ctx, r, &appDeploy.Spec.RolloutPlan, targetWorkload,
|
||||
sourceWorkload, &appDeploy.Status)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "cannot reconcile the rollout plan", "rollout spec", appDeploy.Spec.RolloutPlan)
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
return ctrl.Result{}, nil
|
||||
appDeploy.Status = rolloutStatus
|
||||
if rolloutStatus.RollingState == v1alpha1.RolloutFailedState ||
|
||||
rolloutStatus.RollingState == v1alpha1.RolloutSucceedState {
|
||||
// we don't need to keep checking the rollout too frequently if the rollout is at a terminal state
|
||||
requeueAfterTime = 30 * time.Second
|
||||
}
|
||||
return ctrl.Result{RequeueAfter: requeueAfterTime}, r.Update(ctx, &appDeploy)
|
||||
}
|
||||
|
||||
func (r *Reconciler) handleFinalizer(appDeploy *corev1alpha2.ApplicationDeployment) {
|
||||
|
||||
Reference in New Issue
Block a user