Add disk metrics support.

This commit is contained in:
Xuewei Zhang
2019-05-20 15:20:42 -07:00
parent 23dc265971
commit 7ad5dec712
9 changed files with 604 additions and 2 deletions

View File

@@ -20,4 +20,5 @@ package main
import (
_ "k8s.io/node-problem-detector/pkg/custompluginmonitor"
_ "k8s.io/node-problem-detector/pkg/systemlogmonitor"
_ "k8s.io/node-problem-detector/pkg/systemstatsmonitor"
)

View File

@@ -0,0 +1,19 @@
{
"disk": {
"metricsConfigs": {
"disk/io_time": {
"displayName": "disk/io_time"
},
"disk/weighted_io": {
"displayName": "disk/weighted_io"
},
"disk/avg_queue_len": {
"displayName": "disk/avg_queue_len"
}
},
"includeRootBlk": true,
"includeAllAttachedBlk": true,
"lsblkTimeout": "5s"
},
"invokeInterval": "60s"
}

View File

@@ -261,11 +261,11 @@ func TestCustomPluginConfigValidate(t *testing.T) {
err := utMeta.Conf.Validate()
if err != nil && !utMeta.IsError {
t.Error(desp)
t.Errorf("Error in validating custom plugin configuration %+v. Want an error got nil", utMeta)
t.Errorf("Error in validating custom plugin configuration %+v. Wanted nil got an error", utMeta)
}
if err == nil && utMeta.IsError {
t.Error(desp)
t.Errorf("Error in validating custom plugin configuration %+v. Want nil got an error", utMeta)
t.Errorf("Error in validating custom plugin configuration %+v. Wanted an error got nil", utMeta)
}
}
}

View File

@@ -0,0 +1,142 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemstatsmonitor
import (
"context"
"os/exec"
"strings"
"time"
"github.com/golang/glog"
"github.com/shirou/gopsutil/disk"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
ssmtypes "k8s.io/node-problem-detector/pkg/systemstatsmonitor/types"
)
type diskCollector struct {
keyDevice tag.Key
mIOTime *stats.Int64Measure
mWeightedIO *stats.Int64Measure
mAvgQueueLen *stats.Float64Measure
config *ssmtypes.DiskStatsConfig
historyIOTime map[string]uint64
historyWeightedIO map[string]uint64
}
func NewDiskCollectorOrDie(diskConfig *ssmtypes.DiskStatsConfig) *diskCollector {
dc := diskCollector{config: diskConfig}
dc.keyDevice, _ = tag.NewKey("device")
dc.mIOTime = newInt64Metric(
diskConfig.MetricsConfigs["disk/io_time"].DisplayName,
"The IO time spent on the disk",
"second",
view.LastValue(),
[]tag.Key{dc.keyDevice})
dc.mWeightedIO = newInt64Metric(
diskConfig.MetricsConfigs["disk/weighted_io"].DisplayName,
"The weighted IO on the disk",
"second",
view.LastValue(),
[]tag.Key{dc.keyDevice})
dc.mAvgQueueLen = newFloat64Metric(
diskConfig.MetricsConfigs["disk/avg_queue_len"].DisplayName,
"The average queue length on the disk",
"second",
view.LastValue(),
[]tag.Key{dc.keyDevice})
dc.historyIOTime = make(map[string]uint64)
dc.historyWeightedIO = make(map[string]uint64)
return &dc
}
func (dc *diskCollector) collect() {
if dc == nil {
return
}
blks := []string{}
if dc.config.IncludeRootBlk {
blks = append(blks, listRootBlockDevices(dc.config.LsblkTimeout)...)
}
if dc.config.IncludeAllAttachedBlk {
blks = append(blks, listAttachedBlockDevices()...)
}
ioCountersStats, _ := disk.IOCounters(blks...)
for deviceName, ioCountersStat := range ioCountersStats {
// Calculate average IO queue length since last measurement.
lastIOTime := dc.historyIOTime[deviceName]
lastWeightedIO := dc.historyWeightedIO[deviceName]
dc.historyIOTime[deviceName] = ioCountersStat.IoTime
dc.historyWeightedIO[deviceName] = ioCountersStat.WeightedIO
avg_queue_len := float64(0.0)
if lastIOTime != ioCountersStat.IoTime {
avg_queue_len = float64(ioCountersStat.WeightedIO-lastWeightedIO) / float64(ioCountersStat.IoTime-lastIOTime)
}
// Attach label {"device": deviceName} to the metrics.
device_ctx, _ := tag.New(context.Background(), tag.Upsert(dc.keyDevice, deviceName))
if dc.mIOTime != nil {
stats.Record(device_ctx, dc.mIOTime.M(int64(ioCountersStat.IoTime)))
}
if dc.mWeightedIO != nil {
stats.Record(device_ctx, dc.mWeightedIO.M(int64(ioCountersStat.WeightedIO)))
}
if dc.mAvgQueueLen != nil {
stats.Record(device_ctx, dc.mAvgQueueLen.M(avg_queue_len))
}
}
}
// listRootBlockDevices lists all block devices that's not a slave or holder.
func listRootBlockDevices(timeout time.Duration) []string {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// "-d" prevents printing slave or holder devices. i.e. /dev/sda1, /dev/sda2...
// "-n" prevents printing the headings.
// "-p NAME" specifies to only print the device name.
cmd := exec.CommandContext(ctx, "lsblk", "-d", "-n", "-o", "NAME")
stdout, err := cmd.Output()
if err != nil {
glog.Errorf("Error calling lsblk")
}
return strings.Split(strings.TrimSpace(string(stdout)), "\n")
}
// listAttachedBlockDevices lists all currently attached block devices.
func listAttachedBlockDevices() []string {
partitions, _ := disk.Partitions(false)
blks := []string{}
for _, partition := range partitions {
blks = append(blks, partition.Device)
}
return blks
}

View File

@@ -0,0 +1,57 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemstatsmonitor
import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
// newInt64Metric create a stats.Int64 metrics, returns nil when name is empty.
func newInt64Metric(name string, description string, unit string, aggregation *view.Aggregation, tagKeys []tag.Key) *stats.Int64Measure {
if name == "" {
return nil
}
measure := stats.Int64(name, description, unit)
newView := &view.View{
Name: name,
Measure: measure,
Description: description,
Aggregation: aggregation,
TagKeys: tagKeys,
}
view.Register(newView)
return measure
}
// newFloat64Metric create a stats.Float64 metrics, returns nil when name is empty.
func newFloat64Metric(name string, description string, unit string, aggregation *view.Aggregation, tagKeys []tag.Key) *stats.Float64Measure {
if name == "" {
return nil
}
measure := stats.Float64(name, description, unit)
newView := &view.View{
Name: name,
Measure: measure,
Description: description,
Aggregation: aggregation,
TagKeys: tagKeys,
}
view.Register(newView)
return measure
}

View File

@@ -0,0 +1,112 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemstatsmonitor
import (
"encoding/json"
"io/ioutil"
"time"
"github.com/golang/glog"
"k8s.io/node-problem-detector/pkg/problemdaemon"
ssmtypes "k8s.io/node-problem-detector/pkg/systemstatsmonitor/types"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util/tomb"
)
const SystemStatsMonitorName = "system-stats-monitor"
func init() {
problemdaemon.Register(SystemStatsMonitorName, types.ProblemDaemonHandler{
CreateProblemDaemonOrDie: NewSystemStatsMonitorOrDie,
CmdOptionDescription: "Set to config file paths."})
}
type systemStatsMonitor struct {
config ssmtypes.SystemStatsConfig
diskCollector *diskCollector
tomb *tomb.Tomb
}
// NewSystemStatsMonitorOrDie creates a system stats monitor.
func NewSystemStatsMonitorOrDie(configPath string) types.Monitor {
ssm := systemStatsMonitor{
tomb: tomb.NewTomb(),
}
// Apply configurations.
f, err := ioutil.ReadFile(configPath)
if err != nil {
glog.Fatalf("Failed to read configuration file %q: %v", configPath, err)
}
err = json.Unmarshal(f, &ssm.config)
if err != nil {
glog.Fatalf("Failed to unmarshal configuration file %q: %v", configPath, err)
}
err = ssm.config.ApplyConfiguration()
if err != nil {
glog.Fatalf("Failed to apply configuration for %q: %v", configPath, err)
}
err = ssm.config.Validate()
if err != nil {
glog.Fatalf("Failed to validate configuration %+v: %v", ssm.config, err)
}
// Initialize diskCollector if needed.
if len(ssm.config.DiskConfig.MetricsConfigs) > 0 {
ssm.diskCollector = NewDiskCollectorOrDie(&ssm.config.DiskConfig)
}
return &ssm
}
func (ssm *systemStatsMonitor) Start() (<-chan *types.Status, error) {
glog.Info("Start system stats monitor")
go ssm.monitorLoop()
return nil, nil
}
func (ssm *systemStatsMonitor) monitorLoop() {
defer ssm.tomb.Done()
runTicker := time.NewTicker(ssm.config.InvokeInterval)
defer runTicker.Stop()
select {
case <-ssm.tomb.Stopping():
glog.Infof("System stats monitor stopped")
return
default:
ssm.diskCollector.collect()
}
for {
select {
case <-runTicker.C:
ssm.diskCollector.collect()
case <-ssm.tomb.Stopping():
glog.Infof("System stats monitor stopped")
return
}
}
}
func (ssm *systemStatsMonitor) Stop() {
glog.Info("Stop system stats monitor")
ssm.tomb.Stop()
}

View File

@@ -0,0 +1,31 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package systemstatsmonitor
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/node-problem-detector/pkg/problemdaemon"
)
func TestRegistration(t *testing.T) {
assert.NotPanics(t,
func() { problemdaemon.GetProblemDaemonHandlerOrDie(SystemStatsMonitorName) },
"System stats monitor failed to register itself as a problem daemon.")
}

View File

@@ -0,0 +1,82 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"fmt"
"time"
)
var (
defaultInvokeIntervalString = (60 * time.Second).String()
defaultlsblkTimeoutString = (5 * time.Second).String()
)
type MetricConfig struct {
DisplayName string `json:"displayName"`
}
type DiskStatsConfig struct {
MetricsConfigs map[string]MetricConfig `json:"metricsConfigs"`
IncludeRootBlk bool `json:"includeRootBlk"`
IncludeAllAttachedBlk bool `json:"includeAllAttachedBlk"`
LsblkTimeoutString string `json:"lsblkTimeout"`
LsblkTimeout time.Duration `json:"-"`
}
type SystemStatsConfig struct {
DiskConfig DiskStatsConfig `json:"disk"`
InvokeIntervalString string `json:"invokeInterval"`
InvokeInterval time.Duration `json:"-"`
}
// ApplyConfiguration applies default configurations.
func (ssc *SystemStatsConfig) ApplyConfiguration() error {
if ssc.InvokeIntervalString == "" {
ssc.InvokeIntervalString = defaultInvokeIntervalString
}
if ssc.DiskConfig.LsblkTimeoutString == "" {
ssc.DiskConfig.LsblkTimeoutString = defaultlsblkTimeoutString
}
var err error
ssc.InvokeInterval, err = time.ParseDuration(ssc.InvokeIntervalString)
if err != nil {
return fmt.Errorf("error in parsing InvokeIntervalString %q: %v", ssc.InvokeIntervalString, err)
}
ssc.DiskConfig.LsblkTimeout, err = time.ParseDuration(ssc.DiskConfig.LsblkTimeoutString)
if err != nil {
return fmt.Errorf("error in parsing LsblkTimeoutString %q: %v", ssc.DiskConfig.LsblkTimeoutString, err)
}
return nil
}
// Validate verifies whether the settings are valid.
func (ssc *SystemStatsConfig) Validate() error {
if ssc.InvokeInterval <= time.Duration(0) {
return fmt.Errorf("InvokeInterval %v must be above 0s", ssc.InvokeInterval)
}
if ssc.DiskConfig.LsblkTimeout <= time.Duration(0) {
return fmt.Errorf("LsblkTimeout %v must be above 0s", ssc.DiskConfig.LsblkTimeout)
}
if ssc.DiskConfig.LsblkTimeout > ssc.InvokeInterval {
return fmt.Errorf("LsblkTimeout %v must be shorter than ssc.InvokeInterval %v", ssc.DiskConfig.LsblkTimeout, ssc.InvokeInterval)
}
return nil
}

View File

@@ -0,0 +1,158 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
import (
"reflect"
"testing"
"time"
)
func TestApplyConfiguration(t *testing.T) {
testCases := []struct {
name string
orignalConfig SystemStatsConfig
wantedConfig SystemStatsConfig
isError bool
}{
{
name: "normal",
orignalConfig: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeoutString: "5s",
},
InvokeIntervalString: "60s",
},
isError: false,
wantedConfig: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeout: 5 * time.Second,
LsblkTimeoutString: "5s",
},
InvokeIntervalString: "60s",
InvokeInterval: 60 * time.Second,
},
},
{
name: "empty",
orignalConfig: SystemStatsConfig{
DiskConfig: DiskStatsConfig{},
},
isError: false,
wantedConfig: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeout: 5 * time.Second,
LsblkTimeoutString: "5s",
},
InvokeIntervalString: "1m0s",
InvokeInterval: 60 * time.Second,
},
},
{
name: "error",
orignalConfig: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeoutString: "foo",
},
},
isError: true,
wantedConfig: SystemStatsConfig{
DiskConfig: DiskStatsConfig{},
},
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
err := test.orignalConfig.ApplyConfiguration()
if err == nil && test.isError {
t.Errorf("Wanted an error got nil")
}
if err != nil && !test.isError {
t.Errorf("Wanted nil got an error")
}
if !test.isError && !reflect.DeepEqual(test.orignalConfig, test.wantedConfig) {
t.Errorf("Wanted: %+v. \nGot: %+v", test.wantedConfig, test.orignalConfig)
}
})
}
}
func TestValidate(t *testing.T) {
testCases := []struct {
name string
config SystemStatsConfig
isError bool
}{
{
name: "normal",
config: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeoutString: "5s",
},
InvokeIntervalString: "60s",
},
isError: false,
},
{
name: "negative-invoke-interval",
config: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeoutString: "5s",
},
InvokeIntervalString: "-1s",
},
isError: true,
},
{
name: "negative-lsblk-timeout",
config: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeoutString: "-1s",
},
InvokeIntervalString: "60s",
},
isError: true,
},
{
name: "lsblk-timeout-bigger-than-invoke-interval",
config: SystemStatsConfig{
DiskConfig: DiskStatsConfig{
LsblkTimeoutString: "90s",
},
InvokeIntervalString: "60s",
},
isError: true,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
if err := test.config.ApplyConfiguration(); err != nil {
t.Errorf("Wanted no error with config %+v, got %v", test.config, err)
}
err := test.config.Validate()
if test.isError && err == nil {
t.Errorf("Wanted an error with config %+v, got nil", test.config)
}
if !test.isError && err != nil {
t.Errorf("Wanted nil with config %+v got an error", test.config)
}
})
}
}