mirror of
https://github.com/kubevela/kubevela.git
synced 2026-02-14 10:00:06 +00:00
faster refresh capabilities
Signed-off-by: roywang <seiwy2010@gmail.com>
This commit is contained in:
@@ -103,6 +103,10 @@ linters-settings:
|
||||
rangeValCopy:
|
||||
sizeThreshold: 32
|
||||
|
||||
makezero:
|
||||
# Allow only slices initialized with a length of zero. Default is false.
|
||||
always: false
|
||||
|
||||
linters:
|
||||
enable:
|
||||
- megacheck
|
||||
|
||||
1
go.mod
1
go.mod
@@ -24,6 +24,7 @@ require (
|
||||
github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174
|
||||
github.com/kyokomi/emoji v2.2.4+incompatible
|
||||
github.com/mholt/archiver/v3 v3.3.0
|
||||
github.com/mitchellh/hashstructure/v2 v2.0.1
|
||||
github.com/oam-dev/trait-injector v0.0.0-20200331033130-0a27b176ffc4
|
||||
github.com/onsi/ginkgo v1.13.0
|
||||
github.com/onsi/gomega v1.10.3
|
||||
|
||||
3
go.sum
3
go.sum
@@ -1321,7 +1321,10 @@ github.com/mitchellh/gox v0.4.0 h1:lfGJxY7ToLJQjHHwi0EX6uYBdK78egf954SQl13PQJc=
|
||||
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
|
||||
github.com/mitchellh/gox v1.0.1 h1:x0jD3dcHk9a9xPSDN6YEL4xL6Qz0dvNYm8yZqui5chI=
|
||||
github.com/mitchellh/gox v1.0.1/go.mod h1:ED6BioOGXMswlXa2zxfh/xdd5QhwYliBFn9V18Ap4z4=
|
||||
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452 h1:hOY53G+kBFhbYFpRVxHl5eS7laP6B1+Cq+Z9Dry1iMU=
|
||||
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ=
|
||||
github.com/mitchellh/hashstructure/v2 v2.0.1 h1:L60q1+q7cXE4JeEJJKMnh2brFIe3rZxCihYAB61ypAY=
|
||||
github.com/mitchellh/hashstructure/v2 v2.0.1/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
|
||||
github.com/mitchellh/iochan v1.0.0 h1:C+X3KsSTLFVBr/tK1eYN/vs4rJcvsiLU338UhYPJWeY=
|
||||
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
|
||||
github.com/mitchellh/ioprogress v0.0.0-20180201004757-6a23b12fa88e/go.mod h1:waEya8ee1Ro/lgxpVhkJI4BVASzkm3UZqkx/cFJiYHM=
|
||||
|
||||
@@ -70,7 +70,7 @@ func NewEnvInitCommand(c types.Args, ioStreams cmdutil.IOStreams) *cobra.Command
|
||||
return err
|
||||
}
|
||||
if syncCluster {
|
||||
if err := RefreshDefinitions(ctx, c, ioStreams, true); err != nil {
|
||||
if err := RefreshDefinitions(ctx, c, ioStreams, true, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,9 +2,16 @@ package commands
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/fatih/color"
|
||||
"github.com/gosuri/uitable"
|
||||
hashstructure "github.com/mitchellh/hashstructure/v2"
|
||||
|
||||
"github.com/oam-dev/kubevela/apis/types"
|
||||
cmdutil "github.com/oam-dev/kubevela/pkg/commands/util"
|
||||
@@ -21,14 +28,27 @@ const (
|
||||
deleted refreshStatus = "Deleted"
|
||||
)
|
||||
|
||||
// RefreshDefinitions will sync local capabilities with cluster installed ones
|
||||
func RefreshDefinitions(ctx context.Context, c types.Args, ioStreams cmdutil.IOStreams, silentOutput bool) error {
|
||||
dir, _ := system.GetCapabilityDir()
|
||||
const (
|
||||
refreshInterval = 5 * time.Minute
|
||||
)
|
||||
|
||||
// RefreshDefinitions will sync local capabilities with cluster installed ones
|
||||
func RefreshDefinitions(ctx context.Context, c types.Args, ioStreams cmdutil.IOStreams, silentOutput, enforceRefresh bool) error {
|
||||
dir, _ := system.GetCapabilityDir()
|
||||
oldCaps, err := plugins.LoadAllInstalledCapability()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
useCached, err := useCacheInsteadRefresh(dir, refreshInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !enforceRefresh && useCached {
|
||||
// use local capabilities instead of fetching from cluster
|
||||
printRefreshReport(nil, oldCaps, ioStreams, silentOutput, true)
|
||||
return nil
|
||||
}
|
||||
|
||||
var syncedTemplates []types.Capability
|
||||
|
||||
templates, templateErrors, err := plugins.GetWorkloadsFromCluster(ctx, types.DefaultKubeVelaNS, c, dir, nil)
|
||||
@@ -56,13 +76,23 @@ func RefreshDefinitions(ctx context.Context, c types.Args, ioStreams cmdutil.IOS
|
||||
plugins.SinkTemp2Local(templates, dir)
|
||||
plugins.RemoveLegacyTemps(syncedTemplates, dir)
|
||||
|
||||
printRefreshReport(syncedTemplates, oldCaps, ioStreams, silentOutput)
|
||||
printRefreshReport(syncedTemplates, oldCaps, ioStreams, silentOutput, false)
|
||||
return nil
|
||||
}
|
||||
|
||||
// silent indicates whether output existing caps if no change occurs. If false, output all existing caps.
|
||||
func printRefreshReport(newCaps, oldCaps []types.Capability, io cmdutil.IOStreams, silent bool) {
|
||||
report := refreshResultReport(newCaps, oldCaps)
|
||||
func printRefreshReport(newCaps, oldCaps []types.Capability, io cmdutil.IOStreams, silent, useCached bool) {
|
||||
var report map[refreshStatus][]types.Capability
|
||||
if useCached {
|
||||
report = map[refreshStatus][]types.Capability{
|
||||
added: make([]types.Capability, 0),
|
||||
updated: make([]types.Capability, 0),
|
||||
unchanged: oldCaps,
|
||||
deleted: make([]types.Capability, 0),
|
||||
}
|
||||
} else {
|
||||
report = refreshResultReport(newCaps, oldCaps)
|
||||
}
|
||||
table := newUITable()
|
||||
table.MaxColWidth = 80
|
||||
table.AddRow("TYPE", "CATEGORY", "DESCRIPTION")
|
||||
@@ -130,6 +160,17 @@ func addStsRow(sts refreshStatus, report map[refreshStatus][]types.Capability, t
|
||||
}
|
||||
|
||||
func refreshResultReport(newCaps, oldCaps []types.Capability) map[refreshStatus][]types.Capability {
|
||||
dir, _ := system.GetCapabilityDir()
|
||||
cachedHash := readCapDefHashFromLocal(dir)
|
||||
newHash := map[string]string{}
|
||||
for _, newCap := range newCaps {
|
||||
h, err := hashstructure.Hash(newCap, hashstructure.FormatV2, nil)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
newHash[newCap.Name] = strconv.FormatUint(h, 10)
|
||||
}
|
||||
|
||||
report := map[refreshStatus][]types.Capability{
|
||||
added: make([]types.Capability, 0),
|
||||
updated: make([]types.Capability, 0),
|
||||
@@ -153,6 +194,16 @@ func refreshResultReport(newCaps, oldCaps []types.Capability) map[refreshStatus]
|
||||
for _, newCap := range newCaps {
|
||||
if oldCap.Name == newCap.Name {
|
||||
found = true
|
||||
// use cached hash to determine whether the cap is changed
|
||||
if h, ok := cachedHash[newCap.Name]; ok {
|
||||
if h == newHash[newCap.Name] {
|
||||
report[unchanged] = append(report[unchanged], newCap)
|
||||
} else {
|
||||
report[updated] = append(report[updated], newCap)
|
||||
}
|
||||
break
|
||||
}
|
||||
// in case of missing cache, use Equal func to compare
|
||||
if types.EqualCapability(oldCap, newCap) {
|
||||
report[unchanged] = append(report[unchanged], newCap)
|
||||
} else {
|
||||
@@ -165,5 +216,89 @@ func refreshResultReport(newCaps, oldCaps []types.Capability) map[refreshStatus]
|
||||
report[deleted] = append(report[deleted], oldCap)
|
||||
}
|
||||
}
|
||||
_ = writeCapDefHashIntoLocal(dir, newHash)
|
||||
return report
|
||||
}
|
||||
|
||||
// useCacheInsteadRefresh checks whether use cached capabilities instead of refresh from cluster
|
||||
// a timestamp records the time when refresh from cluster last time
|
||||
// if duration since last time refresh DOES NOT exceed `cacheExpiredDuration`
|
||||
// use cached capabilities instead of refresh from cluster
|
||||
// else refresh from cluster and refresh the timestamp
|
||||
func useCacheInsteadRefresh(capDir string, cacheExpiredDuration time.Duration) (bool, error) {
|
||||
currentTimestamp := strconv.FormatInt(time.Now().Unix(), 10)
|
||||
tmpDir := filepath.Join(capDir, ".tmp")
|
||||
timeFilePath := filepath.Join(tmpDir, ".lasttimerefresh")
|
||||
exist, _ := system.CreateIfNotExist(tmpDir)
|
||||
if !exist {
|
||||
// file saving timestamp is not created yet, create and refresh the timestamp
|
||||
if err := ioutil.WriteFile(timeFilePath, []byte(currentTimestamp), 0600); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
r, err := ioutil.ReadFile(filepath.Clean(timeFilePath))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
// tmpDir exists but `.lasttimerefresh` file doesn't
|
||||
if err := ioutil.WriteFile(timeFilePath, []byte(currentTimestamp), 0600); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
i, err := strconv.ParseInt(string(r), 10, 64)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
lt := time.Unix(i, 0)
|
||||
if time.Since(lt) > cacheExpiredDuration {
|
||||
// cache is expired, refresh the timestamp
|
||||
if err := ioutil.WriteFile(timeFilePath, []byte(currentTimestamp), 0600); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
// cache is not expired
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// each capability has a hash value cached in local capability dir
|
||||
// hash value is used to compare local capability with one from cluster
|
||||
// refresh report will show all changed capabilities
|
||||
func readCapDefHashFromLocal(capDir string) map[string]string {
|
||||
r := map[string]string{}
|
||||
tmpDir := filepath.Join(capDir, ".tmp")
|
||||
hashFilePath := filepath.Join(tmpDir, ".capabilityhash")
|
||||
if exist, err := system.CreateIfNotExist(tmpDir); !exist || err != nil {
|
||||
return r
|
||||
}
|
||||
if _, err := os.Stat(hashFilePath); os.IsNotExist(err) {
|
||||
return r
|
||||
}
|
||||
hashData, err := ioutil.ReadFile(filepath.Clean(hashFilePath))
|
||||
if err != nil {
|
||||
return r
|
||||
}
|
||||
if err := json.Unmarshal(hashData, &r); err != nil {
|
||||
return r
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func writeCapDefHashIntoLocal(capDir string, hashData map[string]string) error {
|
||||
tmpDir := filepath.Join(capDir, ".tmp")
|
||||
hashFilePath := filepath.Join(tmpDir, ".capabilityhash")
|
||||
if _, err := system.CreateIfNotExist(tmpDir); err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := json.Marshal(hashData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ioutil.WriteFile(hashFilePath, data, 0600); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
49
pkg/commands/refresh_test.go
Normal file
49
pkg/commands/refresh_test.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package commands
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestCheckAndUpdateRefreshInterval(t *testing.T) {
|
||||
testdir := "testdir-refresh-interval"
|
||||
testMaxInterval := time.Second
|
||||
|
||||
err := os.MkdirAll(testdir, 0755)
|
||||
assert.NoError(t, err)
|
||||
defer os.RemoveAll(testdir)
|
||||
|
||||
r, err := useCacheInsteadRefresh(testdir, testMaxInterval)
|
||||
assert.Equal(t, r, false, "should not use cache for tmp file is not created")
|
||||
assert.NoError(t, err)
|
||||
|
||||
r, err = useCacheInsteadRefresh(testdir, testMaxInterval)
|
||||
assert.Equal(t, r, true, "should use cache for interval is not expired")
|
||||
assert.NoError(t, err)
|
||||
|
||||
time.Sleep(2 * testMaxInterval)
|
||||
r, err = useCacheInsteadRefresh(testdir, testMaxInterval)
|
||||
assert.Equal(t, r, false, "should not use cache for interval is already expired")
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestWriteAndReadLocalCapHash(t *testing.T) {
|
||||
testdir := "testdir-caphash"
|
||||
err := os.MkdirAll(testdir, 0755)
|
||||
assert.NoError(t, err)
|
||||
defer os.RemoveAll(testdir)
|
||||
|
||||
result := readCapDefHashFromLocal(testdir)
|
||||
assert.Equal(t, result, map[string]string{}, "capability hash data should be empty")
|
||||
fakeHashData := map[string]string{
|
||||
"a": "test1",
|
||||
"b": "test2",
|
||||
}
|
||||
err = writeCapDefHashIntoLocal(testdir, fakeHashData)
|
||||
assert.NoError(t, err, "write new hash data successfully")
|
||||
result = readCapDefHashFromLocal(testdir)
|
||||
assert.Equal(t, result, fakeHashData, "read hash data successfully")
|
||||
}
|
||||
@@ -168,7 +168,7 @@ func (i *initCmd) run(ioStreams cmdutil.IOStreams, chartSource string) error {
|
||||
"try running 'vela workloads' or 'vela traits' to check after a while, details %v", err)
|
||||
return nil
|
||||
}
|
||||
if err := RefreshDefinitions(context.Background(), i.c, ioStreams, false); err != nil {
|
||||
if err := RefreshDefinitions(context.Background(), i.c, ioStreams, false, true); err != nil {
|
||||
return err
|
||||
}
|
||||
ioStreams.Info("- Finished successfully.")
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
// NewTraitsCommand creates `traits` command
|
||||
func NewTraitsCommand(c types.Args, ioStreams cmdutil.IOStreams) *cobra.Command {
|
||||
var workloadName string
|
||||
var syncCluster bool
|
||||
var syncCluster, enforceRefresh bool
|
||||
ctx := context.Background()
|
||||
cmd := &cobra.Command{
|
||||
Use: "traits [--apply-to WORKLOAD_NAME]",
|
||||
@@ -27,7 +27,7 @@ func NewTraitsCommand(c types.Args, ioStreams cmdutil.IOStreams) *cobra.Command
|
||||
},
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if syncCluster {
|
||||
if err := RefreshDefinitions(ctx, c, ioStreams, true); err != nil {
|
||||
if err := RefreshDefinitions(ctx, c, ioStreams, true, enforceRefresh); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -41,6 +41,7 @@ func NewTraitsCommand(c types.Args, ioStreams cmdutil.IOStreams) *cobra.Command
|
||||
cmd.SetOut(ioStreams.Out)
|
||||
cmd.Flags().StringVar(&workloadName, "apply-to", "", "Workload name")
|
||||
cmd.Flags().BoolVarP(&syncCluster, "sync", "s", true, "Synchronize capabilities from cluster into local")
|
||||
cmd.Flags().BoolVarP(&enforceRefresh, "", "r", false, "Enforce refresh from cluster even if cache is not expired")
|
||||
return cmd
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
|
||||
// NewWorkloadsCommand creates `workloads` command
|
||||
func NewWorkloadsCommand(c types.Args, ioStreams cmdutil.IOStreams) *cobra.Command {
|
||||
var syncCluster bool
|
||||
var syncCluster, enforceRefresh bool
|
||||
ctx := context.Background()
|
||||
cmd := &cobra.Command{
|
||||
Use: "workloads",
|
||||
@@ -25,7 +25,7 @@ func NewWorkloadsCommand(c types.Args, ioStreams cmdutil.IOStreams) *cobra.Comma
|
||||
},
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if syncCluster {
|
||||
if err := RefreshDefinitions(ctx, c, ioStreams, true); err != nil {
|
||||
if err := RefreshDefinitions(ctx, c, ioStreams, true, enforceRefresh); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -41,6 +41,7 @@ func NewWorkloadsCommand(c types.Args, ioStreams cmdutil.IOStreams) *cobra.Comma
|
||||
}
|
||||
cmd.SetOut(ioStreams.Out)
|
||||
cmd.Flags().BoolVarP(&syncCluster, "sync", "s", true, "Synchronize capabilities from cluster into local")
|
||||
cmd.Flags().BoolVarP(&enforceRefresh, "", "r", false, "Enforce refresh from cluster even if cache is not expired")
|
||||
return cmd
|
||||
}
|
||||
|
||||
|
||||
@@ -397,6 +397,7 @@ func (p PeerHealthConditions) MergePeerWorkloadsConditions(basic *WorkloadHealth
|
||||
// copy to keep idempotent
|
||||
peerHCs := make(PeerHealthConditions, len(p))
|
||||
copy(peerHCs, p)
|
||||
//nolint:makezero
|
||||
peerHCs = append(peerHCs, *basic.DeepCopy())
|
||||
|
||||
// sort by revision number in descending order
|
||||
|
||||
Reference in New Issue
Block a user