From 7ad5dec712d741d2346bd24c6aab8d261b396e6f Mon Sep 17 00:00:00 2001 From: Xuewei Zhang Date: Mon, 20 May 2019 15:20:42 -0700 Subject: [PATCH] Add disk metrics support. --- cmd/plugins.go | 1 + config/system-stats-monitor.json | 19 +++ pkg/custompluginmonitor/types/config_test.go | 4 +- pkg/systemstatsmonitor/disk_collector.go | 142 ++++++++++++++++ pkg/systemstatsmonitor/metric_helper.go | 57 +++++++ .../system_stats_monitor.go | 112 +++++++++++++ .../system_stats_monitor_test.go | 31 ++++ pkg/systemstatsmonitor/types/config.go | 82 +++++++++ pkg/systemstatsmonitor/types/config_test.go | 158 ++++++++++++++++++ 9 files changed, 604 insertions(+), 2 deletions(-) create mode 100644 config/system-stats-monitor.json create mode 100644 pkg/systemstatsmonitor/disk_collector.go create mode 100644 pkg/systemstatsmonitor/metric_helper.go create mode 100644 pkg/systemstatsmonitor/system_stats_monitor.go create mode 100644 pkg/systemstatsmonitor/system_stats_monitor_test.go create mode 100644 pkg/systemstatsmonitor/types/config.go create mode 100644 pkg/systemstatsmonitor/types/config_test.go diff --git a/cmd/plugins.go b/cmd/plugins.go index 1e1ab116..9f4f82b5 100644 --- a/cmd/plugins.go +++ b/cmd/plugins.go @@ -20,4 +20,5 @@ package main import ( _ "k8s.io/node-problem-detector/pkg/custompluginmonitor" _ "k8s.io/node-problem-detector/pkg/systemlogmonitor" + _ "k8s.io/node-problem-detector/pkg/systemstatsmonitor" ) diff --git a/config/system-stats-monitor.json b/config/system-stats-monitor.json new file mode 100644 index 00000000..d9bfd603 --- /dev/null +++ b/config/system-stats-monitor.json @@ -0,0 +1,19 @@ +{ + "disk": { + "metricsConfigs": { + "disk/io_time": { + "displayName": "disk/io_time" + }, + "disk/weighted_io": { + "displayName": "disk/weighted_io" + }, + "disk/avg_queue_len": { + "displayName": "disk/avg_queue_len" + } + }, + "includeRootBlk": true, + "includeAllAttachedBlk": true, + "lsblkTimeout": "5s" + }, + "invokeInterval": "60s" +} diff --git a/pkg/custompluginmonitor/types/config_test.go b/pkg/custompluginmonitor/types/config_test.go index ee431d75..c90459e2 100644 --- a/pkg/custompluginmonitor/types/config_test.go +++ b/pkg/custompluginmonitor/types/config_test.go @@ -261,11 +261,11 @@ func TestCustomPluginConfigValidate(t *testing.T) { err := utMeta.Conf.Validate() if err != nil && !utMeta.IsError { t.Error(desp) - t.Errorf("Error in validating custom plugin configuration %+v. Want an error got nil", utMeta) + t.Errorf("Error in validating custom plugin configuration %+v. Wanted nil got an error", utMeta) } if err == nil && utMeta.IsError { t.Error(desp) - t.Errorf("Error in validating custom plugin configuration %+v. Want nil got an error", utMeta) + t.Errorf("Error in validating custom plugin configuration %+v. Wanted an error got nil", utMeta) } } } diff --git a/pkg/systemstatsmonitor/disk_collector.go b/pkg/systemstatsmonitor/disk_collector.go new file mode 100644 index 00000000..5297ad4b --- /dev/null +++ b/pkg/systemstatsmonitor/disk_collector.go @@ -0,0 +1,142 @@ +/* +Copyright 2019 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package systemstatsmonitor + +import ( + "context" + "os/exec" + "strings" + "time" + + "github.com/golang/glog" + "github.com/shirou/gopsutil/disk" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + ssmtypes "k8s.io/node-problem-detector/pkg/systemstatsmonitor/types" +) + +type diskCollector struct { + keyDevice tag.Key + mIOTime *stats.Int64Measure + mWeightedIO *stats.Int64Measure + mAvgQueueLen *stats.Float64Measure + + config *ssmtypes.DiskStatsConfig + + historyIOTime map[string]uint64 + historyWeightedIO map[string]uint64 +} + +func NewDiskCollectorOrDie(diskConfig *ssmtypes.DiskStatsConfig) *diskCollector { + dc := diskCollector{config: diskConfig} + dc.keyDevice, _ = tag.NewKey("device") + + dc.mIOTime = newInt64Metric( + diskConfig.MetricsConfigs["disk/io_time"].DisplayName, + "The IO time spent on the disk", + "second", + view.LastValue(), + []tag.Key{dc.keyDevice}) + + dc.mWeightedIO = newInt64Metric( + diskConfig.MetricsConfigs["disk/weighted_io"].DisplayName, + "The weighted IO on the disk", + "second", + view.LastValue(), + []tag.Key{dc.keyDevice}) + + dc.mAvgQueueLen = newFloat64Metric( + diskConfig.MetricsConfigs["disk/avg_queue_len"].DisplayName, + "The average queue length on the disk", + "second", + view.LastValue(), + []tag.Key{dc.keyDevice}) + + dc.historyIOTime = make(map[string]uint64) + dc.historyWeightedIO = make(map[string]uint64) + + return &dc +} + +func (dc *diskCollector) collect() { + if dc == nil { + return + } + + blks := []string{} + if dc.config.IncludeRootBlk { + blks = append(blks, listRootBlockDevices(dc.config.LsblkTimeout)...) + } + if dc.config.IncludeAllAttachedBlk { + blks = append(blks, listAttachedBlockDevices()...) + } + + ioCountersStats, _ := disk.IOCounters(blks...) + + for deviceName, ioCountersStat := range ioCountersStats { + // Calculate average IO queue length since last measurement. + lastIOTime := dc.historyIOTime[deviceName] + lastWeightedIO := dc.historyWeightedIO[deviceName] + + dc.historyIOTime[deviceName] = ioCountersStat.IoTime + dc.historyWeightedIO[deviceName] = ioCountersStat.WeightedIO + + avg_queue_len := float64(0.0) + if lastIOTime != ioCountersStat.IoTime { + avg_queue_len = float64(ioCountersStat.WeightedIO-lastWeightedIO) / float64(ioCountersStat.IoTime-lastIOTime) + } + + // Attach label {"device": deviceName} to the metrics. + device_ctx, _ := tag.New(context.Background(), tag.Upsert(dc.keyDevice, deviceName)) + if dc.mIOTime != nil { + stats.Record(device_ctx, dc.mIOTime.M(int64(ioCountersStat.IoTime))) + } + if dc.mWeightedIO != nil { + stats.Record(device_ctx, dc.mWeightedIO.M(int64(ioCountersStat.WeightedIO))) + } + if dc.mAvgQueueLen != nil { + stats.Record(device_ctx, dc.mAvgQueueLen.M(avg_queue_len)) + } + } +} + +// listRootBlockDevices lists all block devices that's not a slave or holder. +func listRootBlockDevices(timeout time.Duration) []string { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // "-d" prevents printing slave or holder devices. i.e. /dev/sda1, /dev/sda2... + // "-n" prevents printing the headings. + // "-p NAME" specifies to only print the device name. + cmd := exec.CommandContext(ctx, "lsblk", "-d", "-n", "-o", "NAME") + stdout, err := cmd.Output() + if err != nil { + glog.Errorf("Error calling lsblk") + } + return strings.Split(strings.TrimSpace(string(stdout)), "\n") +} + +// listAttachedBlockDevices lists all currently attached block devices. +func listAttachedBlockDevices() []string { + partitions, _ := disk.Partitions(false) + blks := []string{} + for _, partition := range partitions { + blks = append(blks, partition.Device) + } + return blks +} diff --git a/pkg/systemstatsmonitor/metric_helper.go b/pkg/systemstatsmonitor/metric_helper.go new file mode 100644 index 00000000..fb82b230 --- /dev/null +++ b/pkg/systemstatsmonitor/metric_helper.go @@ -0,0 +1,57 @@ +/* +Copyright 2019 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package systemstatsmonitor + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +// newInt64Metric create a stats.Int64 metrics, returns nil when name is empty. +func newInt64Metric(name string, description string, unit string, aggregation *view.Aggregation, tagKeys []tag.Key) *stats.Int64Measure { + if name == "" { + return nil + } + measure := stats.Int64(name, description, unit) + newView := &view.View{ + Name: name, + Measure: measure, + Description: description, + Aggregation: aggregation, + TagKeys: tagKeys, + } + view.Register(newView) + return measure +} + +// newFloat64Metric create a stats.Float64 metrics, returns nil when name is empty. +func newFloat64Metric(name string, description string, unit string, aggregation *view.Aggregation, tagKeys []tag.Key) *stats.Float64Measure { + if name == "" { + return nil + } + measure := stats.Float64(name, description, unit) + newView := &view.View{ + Name: name, + Measure: measure, + Description: description, + Aggregation: aggregation, + TagKeys: tagKeys, + } + view.Register(newView) + return measure +} diff --git a/pkg/systemstatsmonitor/system_stats_monitor.go b/pkg/systemstatsmonitor/system_stats_monitor.go new file mode 100644 index 00000000..03a66e54 --- /dev/null +++ b/pkg/systemstatsmonitor/system_stats_monitor.go @@ -0,0 +1,112 @@ +/* +Copyright 2019 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package systemstatsmonitor + +import ( + "encoding/json" + "io/ioutil" + "time" + + "github.com/golang/glog" + "k8s.io/node-problem-detector/pkg/problemdaemon" + ssmtypes "k8s.io/node-problem-detector/pkg/systemstatsmonitor/types" + "k8s.io/node-problem-detector/pkg/types" + "k8s.io/node-problem-detector/pkg/util/tomb" +) + +const SystemStatsMonitorName = "system-stats-monitor" + +func init() { + problemdaemon.Register(SystemStatsMonitorName, types.ProblemDaemonHandler{ + CreateProblemDaemonOrDie: NewSystemStatsMonitorOrDie, + CmdOptionDescription: "Set to config file paths."}) +} + +type systemStatsMonitor struct { + config ssmtypes.SystemStatsConfig + diskCollector *diskCollector + tomb *tomb.Tomb +} + +// NewSystemStatsMonitorOrDie creates a system stats monitor. +func NewSystemStatsMonitorOrDie(configPath string) types.Monitor { + ssm := systemStatsMonitor{ + tomb: tomb.NewTomb(), + } + + // Apply configurations. + f, err := ioutil.ReadFile(configPath) + if err != nil { + glog.Fatalf("Failed to read configuration file %q: %v", configPath, err) + } + err = json.Unmarshal(f, &ssm.config) + if err != nil { + glog.Fatalf("Failed to unmarshal configuration file %q: %v", configPath, err) + } + + err = ssm.config.ApplyConfiguration() + if err != nil { + glog.Fatalf("Failed to apply configuration for %q: %v", configPath, err) + } + + err = ssm.config.Validate() + if err != nil { + glog.Fatalf("Failed to validate configuration %+v: %v", ssm.config, err) + } + + // Initialize diskCollector if needed. + if len(ssm.config.DiskConfig.MetricsConfigs) > 0 { + ssm.diskCollector = NewDiskCollectorOrDie(&ssm.config.DiskConfig) + } + return &ssm +} + +func (ssm *systemStatsMonitor) Start() (<-chan *types.Status, error) { + glog.Info("Start system stats monitor") + go ssm.monitorLoop() + return nil, nil +} + +func (ssm *systemStatsMonitor) monitorLoop() { + defer ssm.tomb.Done() + + runTicker := time.NewTicker(ssm.config.InvokeInterval) + defer runTicker.Stop() + + select { + case <-ssm.tomb.Stopping(): + glog.Infof("System stats monitor stopped") + return + default: + ssm.diskCollector.collect() + } + + for { + select { + case <-runTicker.C: + ssm.diskCollector.collect() + case <-ssm.tomb.Stopping(): + glog.Infof("System stats monitor stopped") + return + } + } +} + +func (ssm *systemStatsMonitor) Stop() { + glog.Info("Stop system stats monitor") + ssm.tomb.Stop() +} diff --git a/pkg/systemstatsmonitor/system_stats_monitor_test.go b/pkg/systemstatsmonitor/system_stats_monitor_test.go new file mode 100644 index 00000000..5beb550c --- /dev/null +++ b/pkg/systemstatsmonitor/system_stats_monitor_test.go @@ -0,0 +1,31 @@ +/* +Copyright 2019 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package systemstatsmonitor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "k8s.io/node-problem-detector/pkg/problemdaemon" +) + +func TestRegistration(t *testing.T) { + assert.NotPanics(t, + func() { problemdaemon.GetProblemDaemonHandlerOrDie(SystemStatsMonitorName) }, + "System stats monitor failed to register itself as a problem daemon.") +} diff --git a/pkg/systemstatsmonitor/types/config.go b/pkg/systemstatsmonitor/types/config.go new file mode 100644 index 00000000..4ffa9623 --- /dev/null +++ b/pkg/systemstatsmonitor/types/config.go @@ -0,0 +1,82 @@ +/* +Copyright 2019 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import ( + "fmt" + "time" +) + +var ( + defaultInvokeIntervalString = (60 * time.Second).String() + defaultlsblkTimeoutString = (5 * time.Second).String() +) + +type MetricConfig struct { + DisplayName string `json:"displayName"` +} + +type DiskStatsConfig struct { + MetricsConfigs map[string]MetricConfig `json:"metricsConfigs"` + IncludeRootBlk bool `json:"includeRootBlk"` + IncludeAllAttachedBlk bool `json:"includeAllAttachedBlk"` + LsblkTimeoutString string `json:"lsblkTimeout"` + LsblkTimeout time.Duration `json:"-"` +} + +type SystemStatsConfig struct { + DiskConfig DiskStatsConfig `json:"disk"` + InvokeIntervalString string `json:"invokeInterval"` + InvokeInterval time.Duration `json:"-"` +} + +// ApplyConfiguration applies default configurations. +func (ssc *SystemStatsConfig) ApplyConfiguration() error { + if ssc.InvokeIntervalString == "" { + ssc.InvokeIntervalString = defaultInvokeIntervalString + } + if ssc.DiskConfig.LsblkTimeoutString == "" { + ssc.DiskConfig.LsblkTimeoutString = defaultlsblkTimeoutString + } + + var err error + ssc.InvokeInterval, err = time.ParseDuration(ssc.InvokeIntervalString) + if err != nil { + return fmt.Errorf("error in parsing InvokeIntervalString %q: %v", ssc.InvokeIntervalString, err) + } + ssc.DiskConfig.LsblkTimeout, err = time.ParseDuration(ssc.DiskConfig.LsblkTimeoutString) + if err != nil { + return fmt.Errorf("error in parsing LsblkTimeoutString %q: %v", ssc.DiskConfig.LsblkTimeoutString, err) + } + + return nil +} + +// Validate verifies whether the settings are valid. +func (ssc *SystemStatsConfig) Validate() error { + if ssc.InvokeInterval <= time.Duration(0) { + return fmt.Errorf("InvokeInterval %v must be above 0s", ssc.InvokeInterval) + } + if ssc.DiskConfig.LsblkTimeout <= time.Duration(0) { + return fmt.Errorf("LsblkTimeout %v must be above 0s", ssc.DiskConfig.LsblkTimeout) + } + if ssc.DiskConfig.LsblkTimeout > ssc.InvokeInterval { + return fmt.Errorf("LsblkTimeout %v must be shorter than ssc.InvokeInterval %v", ssc.DiskConfig.LsblkTimeout, ssc.InvokeInterval) + } + + return nil +} diff --git a/pkg/systemstatsmonitor/types/config_test.go b/pkg/systemstatsmonitor/types/config_test.go new file mode 100644 index 00000000..dc6382f8 --- /dev/null +++ b/pkg/systemstatsmonitor/types/config_test.go @@ -0,0 +1,158 @@ +/* +Copyright 2019 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package types + +import ( + "reflect" + "testing" + "time" +) + +func TestApplyConfiguration(t *testing.T) { + testCases := []struct { + name string + orignalConfig SystemStatsConfig + wantedConfig SystemStatsConfig + isError bool + }{ + { + name: "normal", + orignalConfig: SystemStatsConfig{ + DiskConfig: DiskStatsConfig{ + LsblkTimeoutString: "5s", + }, + InvokeIntervalString: "60s", + }, + isError: false, + wantedConfig: SystemStatsConfig{ + DiskConfig: DiskStatsConfig{ + LsblkTimeout: 5 * time.Second, + LsblkTimeoutString: "5s", + }, + InvokeIntervalString: "60s", + InvokeInterval: 60 * time.Second, + }, + }, + { + name: "empty", + orignalConfig: SystemStatsConfig{ + DiskConfig: DiskStatsConfig{}, + }, + isError: false, + wantedConfig: SystemStatsConfig{ + DiskConfig: DiskStatsConfig{ + LsblkTimeout: 5 * time.Second, + LsblkTimeoutString: "5s", + }, + InvokeIntervalString: "1m0s", + InvokeInterval: 60 * time.Second, + }, + }, + { + name: "error", + orignalConfig: SystemStatsConfig{ + DiskConfig: DiskStatsConfig{ + LsblkTimeoutString: "foo", + }, + }, + isError: true, + wantedConfig: SystemStatsConfig{ + DiskConfig: DiskStatsConfig{}, + }, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + err := test.orignalConfig.ApplyConfiguration() + if err == nil && test.isError { + t.Errorf("Wanted an error got nil") + } + if err != nil && !test.isError { + t.Errorf("Wanted nil got an error") + } + if !test.isError && !reflect.DeepEqual(test.orignalConfig, test.wantedConfig) { + t.Errorf("Wanted: %+v. \nGot: %+v", test.wantedConfig, test.orignalConfig) + } + }) + } +} + +func TestValidate(t *testing.T) { + testCases := []struct { + name string + config SystemStatsConfig + isError bool + }{ + { + name: "normal", + config: SystemStatsConfig{ + DiskConfig: DiskStatsConfig{ + LsblkTimeoutString: "5s", + }, + InvokeIntervalString: "60s", + }, + isError: false, + }, + { + name: "negative-invoke-interval", + config: SystemStatsConfig{ + DiskConfig: DiskStatsConfig{ + LsblkTimeoutString: "5s", + }, + InvokeIntervalString: "-1s", + }, + isError: true, + }, + { + name: "negative-lsblk-timeout", + config: SystemStatsConfig{ + DiskConfig: DiskStatsConfig{ + LsblkTimeoutString: "-1s", + }, + InvokeIntervalString: "60s", + }, + isError: true, + }, + { + name: "lsblk-timeout-bigger-than-invoke-interval", + config: SystemStatsConfig{ + DiskConfig: DiskStatsConfig{ + LsblkTimeoutString: "90s", + }, + InvokeIntervalString: "60s", + }, + isError: true, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + if err := test.config.ApplyConfiguration(); err != nil { + t.Errorf("Wanted no error with config %+v, got %v", test.config, err) + } + + err := test.config.Validate() + if test.isError && err == nil { + t.Errorf("Wanted an error with config %+v, got nil", test.config) + } + if !test.isError && err != nil { + t.Errorf("Wanted nil with config %+v got an error", test.config) + } + }) + } +}