Enable NPD to run as a Windows Service.

This commit is contained in:
Jeremy Edwards
2021-01-29 06:04:34 +00:00
parent 1e8008bded
commit a7f78c5668
34 changed files with 2877 additions and 17 deletions

View File

@@ -17,10 +17,7 @@ limitations under the License.
package main
import (
"os"
"github.com/golang/glog"
"github.com/spf13/pflag"
_ "k8s.io/node-problem-detector/cmd/nodeproblemdetector/exporterplugins"
_ "k8s.io/node-problem-detector/cmd/nodeproblemdetector/problemdaemonplugins"
@@ -34,15 +31,19 @@ import (
"k8s.io/node-problem-detector/pkg/version"
)
func main() {
npdo := options.NewNodeProblemDetectorOptions()
npdo.AddFlags(pflag.CommandLine)
func npdInteractive(npdo *options.NodeProblemDetectorOptions) {
termCh := make(chan error, 1)
defer close(termCh)
pflag.Parse()
if err := npdMain(npdo, termCh); err != nil {
glog.Fatalf("Problem detector failed with error: %v", err)
}
}
func npdMain(npdo *options.NodeProblemDetectorOptions, termCh <-chan error) error {
if npdo.PrintVersion {
version.PrintVersion()
os.Exit(0)
return nil
}
npdo.SetNodeNameOrDie()
@@ -78,7 +79,5 @@ func main() {
// Initialize NPD core.
p := problemdetector.NewProblemDetector(problemDaemons, npdExporters)
if err := p.Run(); err != nil {
glog.Fatalf("Problem detector failed with error: %v", err)
}
return p.Run(termCh)
}

View File

@@ -0,0 +1,30 @@
/*
Copyright 2021 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"github.com/spf13/pflag"
"k8s.io/node-problem-detector/cmd/options"
)
func main() {
npdo := options.NewNodeProblemDetectorOptions()
npdo.AddFlags(pflag.CommandLine)
pflag.Parse()
npdInteractive(npdo)
}

View File

@@ -0,0 +1,133 @@
/*
Copyright 2021 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
"testing"
_ "k8s.io/node-problem-detector/cmd/nodeproblemdetector/exporterplugins"
_ "k8s.io/node-problem-detector/cmd/nodeproblemdetector/problemdaemonplugins"
"k8s.io/node-problem-detector/cmd/options"
"k8s.io/node-problem-detector/pkg/exporters"
"k8s.io/node-problem-detector/pkg/types"
)
const (
fakeConfigFilePattern = `
{
"plugin": "filelog",
"pluginConfig": {
"timestamp": "^time=\"(\\S*)\"",
"message": "msg=\"([^\n]*)\"",
"timestampFormat": "2006-01-02T15:04:05.999999999-07:00"
},
"logPath": "%s",
"lookback": "5m",
"bufferSize": 10,
"source": "containerd",
"conditions": [],
"rules": [
{
"type": "temporary",
"reason": "MissingPigz",
"pattern": "unpigz not found.*"
},
{
"type": "temporary",
"reason": "IncompatibleContainer",
"pattern": ".*CreateComputeSystem.*"
}
]
}
`
)
func init() {
exporters.Register("nil", types.ExporterHandler{
CreateExporterOrDie: func(types.CommandLineOptions) types.Exporter {
return &nullExporter{}
},
})
}
type nullExporter struct {
}
func (ne *nullExporter) ExportProblems(*types.Status) {
}
func TestNPDMain(t *testing.T) {
npdo, cleanup := setupNPD(t)
defer cleanup()
termCh := make(chan error, 2)
termCh <- errors.New("close")
defer close(termCh)
if err := npdMain(npdo, termCh); err != nil {
t.Errorf("termination signal should not return error got, %v", err)
}
}
func writeTempFile(t *testing.T, ext string, contents string) (string, error) {
f, err := ioutil.TempFile("", "*."+ext)
if err != nil {
return "", fmt.Errorf("cannot create temp file, %v", err)
}
fileName := f.Name()
if err := ioutil.WriteFile(fileName, []byte(contents), 0644); err != nil {
os.Remove(fileName)
return "", fmt.Errorf("cannot write config to temp file %s, %v", fileName, err)
}
return fileName, nil
}
func setupNPD(t *testing.T) (*options.NodeProblemDetectorOptions, func()) {
fakeLogFileName, err := writeTempFile(t, "log", "")
if err != nil {
os.Remove(fakeLogFileName)
t.Fatalf("cannot create temp config file, %v", err)
}
fakeConfigFileContents := fmt.Sprintf(fakeConfigFilePattern, strings.ReplaceAll(fakeLogFileName, "\\", "\\\\"))
fakeConfigFileName, err := writeTempFile(t, "json", fakeConfigFileContents)
if err != nil {
os.Remove(fakeLogFileName)
os.Remove(fakeConfigFileName)
t.Fatalf("cannot create temp config file, %v", err)
}
return &options.NodeProblemDetectorOptions{
MonitorConfigPaths: map[types.ProblemDaemonType]*[]string{
"system-log-monitor": {
fakeConfigFileName,
},
},
}, func() {
os.Remove(fakeLogFileName)
os.Remove(fakeConfigFileName)
}
}

View File

@@ -0,0 +1,181 @@
/*
Copyright 2021 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"errors"
"fmt"
"sync"
"time"
"github.com/golang/glog"
"github.com/spf13/pflag"
"golang.org/x/sys/windows/svc"
"golang.org/x/sys/windows/svc/debug"
"golang.org/x/sys/windows/svc/eventlog"
"k8s.io/node-problem-detector/cmd/options"
)
const (
svcName = "NodeProblemDetector"
svcDescription = "Identifies problems that likely disrupt the operation of Kubernetes workloads."
svcCommandsAccepted = svc.AcceptStop | svc.AcceptShutdown
appEventLogName = svcName
windowsEventLogID = 1
)
var (
elog debug.Log
)
func main() {
npdo := options.NewNodeProblemDetectorOptions()
npdo.AddFlags(pflag.CommandLine)
pflag.Parse()
handler := &npdService{
options: npdo,
}
runFunc := initializeRun()
if err := runFunc(svcName, handler); err != nil {
elog.Error(windowsEventLogID, err.Error())
}
}
func isRunningAsWindowsService() bool {
runningAsService, err := svc.IsWindowsService()
if err != nil {
glog.Errorf("cannot determine if running as Windows Service assuming standalone, %v", err)
return false
}
return runningAsService
}
func setupLogging(runningAsService bool) {
if runningAsService {
var err error
elog, err = eventlog.Open(appEventLogName)
// If the event log is unavailable then at least print out to standard output.
if err != nil {
elog = debug.New(appEventLogName)
elog.Info(windowsEventLogID, fmt.Sprintf("cannot connect to event log using standard out, %v", err))
}
} else {
elog = debug.New(appEventLogName)
}
}
func initializeRun() func(string, svc.Handler) error {
runningAsService := isRunningAsWindowsService()
setupLogging(runningAsService)
if runningAsService {
return svc.Run
}
return debug.Run
}
type npdService struct {
sync.Mutex
options *options.NodeProblemDetectorOptions
}
func (s *npdService) Execute(args []string, r <-chan svc.ChangeRequest, changes chan<- svc.Status) (bool, uint32) {
appTermCh := make(chan error, 1)
svcLoopTermCh := make(chan error, 1)
defer func() {
close(appTermCh)
close(svcLoopTermCh)
}()
changes <- svc.Status{State: svc.StartPending}
changes <- svc.Status{State: svc.Running, Accepts: svcCommandsAccepted}
var appWG sync.WaitGroup
var svcWG sync.WaitGroup
options := s.options
// NPD application goroutine.
appWG.Add(1)
go func() {
defer appWG.Done()
if err := npdMain(options, appTermCh); err != nil {
elog.Warning(windowsEventLogID, err.Error())
}
changes <- svc.Status{State: svc.StopPending}
}()
// Windows service control goroutine.
svcWG.Add(1)
go func() {
defer svcWG.Done()
serviceLoop(r, changes, appTermCh, svcLoopTermCh)
}()
// Wait for the application go routine to die.
appWG.Wait()
// Ensure that the service control loop is killed.
svcLoopTermCh <- nil
// Wait for the service control loop to terminate.
// Otherwise it's possible that the channel closures cause the application to panic.
svcWG.Wait()
// Send a signal to the Windows service control that the application has stopped.
changes <- svc.Status{State: svc.Stopped, Accepts: svcCommandsAccepted}
return false, uint32(0)
}
func serviceLoop(r <-chan svc.ChangeRequest, changes chan<- svc.Status, appTermCh chan error, svcLoopTermCh chan error) {
for {
select {
case <-svcLoopTermCh:
return
case c := <-r:
switch c.Cmd {
case svc.Interrogate:
changes <- c.CurrentStatus
// Testing deadlock from https://code.google.com/p/winsvc/issues/detail?id=4
time.Sleep(100 * time.Millisecond)
changes <- c.CurrentStatus
case svc.Stop, svc.Shutdown:
elog.Info(windowsEventLogID, fmt.Sprintf("Stopping %s service, %v", svcName, c.Context))
appTermCh <- errors.New("stopping service")
case svc.Pause:
elog.Info(windowsEventLogID, "ignoring pause command from Windows service control, not supported")
changes <- svc.Status{State: svc.Paused, Accepts: svcCommandsAccepted}
case svc.Continue:
elog.Info(windowsEventLogID, "ignoring continue command from Windows service control, not supported")
changes <- svc.Status{State: svc.Running, Accepts: svcCommandsAccepted}
default:
elog.Error(windowsEventLogID, fmt.Sprintf("unexpected control request #%d", c))
}
}
}
}

View File

@@ -0,0 +1,56 @@
/*
Copyright 2021 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"testing"
"golang.org/x/sys/windows/svc"
)
func TestWindowsServiceLoop(t *testing.T) {
npdo, cleanup := setupNPD(t)
defer cleanup()
setupLogging(false)
s := &npdService{
options: npdo,
}
r := make(chan svc.ChangeRequest, 2)
changes := make(chan svc.Status, 4)
defer func() {
close(r)
close(changes)
}()
r <- svc.ChangeRequest{
Cmd: svc.Shutdown,
}
r <- svc.ChangeRequest{
Cmd: svc.Shutdown,
}
ssec, errno := s.Execute([]string{}, r, changes)
if ssec != false {
t.Error("ssec should be false")
}
if errno != 0 {
t.Error("errno should be 0")
}
}

View File

@@ -125,7 +125,6 @@ func (npdo *NodeProblemDetectorOptions) AddFlags(fs *pflag.FlagSet) {
20257, "The port to bind the Prometheus scrape endpoint. Prometheus exporter is enabled by default at port 20257. Use 0 to disable.")
fs.StringVar(&npdo.PrometheusServerAddress, "prometheus-address",
"127.0.0.1", "The address to bind the Prometheus scrape endpoint.")
for _, exporterName := range exporters.GetExporterNames() {
exporterHandler := exporters.GetExporterHandlerOrDie(exporterName)
exporterHandler.Options.SetFlags(fs)